FlumeNG1.8.0源码解析——Source写数据到Channel的过程

  |   0 评论   |   636 浏览

启动SourceRunner

根据上节《SourceRunner的构造方式》我们知道SourceRunner有两种类型:PollableSourceRunner、EventDrivenSourceRunner

org.apache.flume.source.EventDrivenSourceRunner#start

@Override
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
  }

初始化ChannelProcessor

org.apache.flume.channel.ChannelProcessor#initialize

进行拦截器链的初始化

public void initialize() {
    interceptorChain.initialize();
  }

启动Source

以org.apache.flume.source.ExecSource#start为例

@Override
  public void start() {
    logger.info("Exec source starting with command: {}", command);
    // Start the counter before starting any threads that may access it.
    sourceCounter.start();
    executor = Executors.newSingleThreadExecutor();
    runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart,
                              restartThrottle, logStderr, bufferCount, batchTimeout, charset);
    // Start the runner thread.
    runnerFuture = executor.submit(runner);
    // Mark the Source as RUNNING.
    super.start();
    logger.debug("Exec source started");
  }

org.apache.flume.source.ExecSource.ExecRunnable#run

ExecRunnable是一个线程我们看其run方法

@Override
    public void run() {
      do {
        String exitCode = "unknown";
        BufferedReader reader = null;
        String line = null;
        final List<Event> eventList = new ArrayList<Event>();
        timedFlushService = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat(
                "timedFlushExecService" +
                Thread.currentThread().getId() + "-%d").build());
        try {
          if (shell != null) {
            String[] commandArgs = formulateShellCommand(shell, command);
            process = Runtime.getRuntime().exec(commandArgs);
          }  else {
            //使用配置的参数启动Shell命令
            String[] commandArgs = command.split("\\s+");
            process = new ProcessBuilder(commandArgs).start();
          }
          //设置标准输入流
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));
          // StderrLogger dies as soon as the input stream is invalid
          //设置错误流
          StderrReader stderrReader = new StderrReader(new BufferedReader(
              new InputStreamReader(process.getErrorStream(), charset)), logStderr);
          stderrReader.setName("StderrReader-[" + command + "]");
          stderrReader.setDaemon(true);
          stderrReader.start();
          //启动定时任务,将eventList中数据批量写入到Channel
          future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                try {
                  synchronized (eventList) {
                    if (!eventList.isEmpty() && timeout()) {
                      flushEventBatch(eventList);
                    }
                  }
                } catch (Exception e) {
                  logger.error("Exception occurred when processing event batch", e);
                  if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                  }
                }
              }
          },
          batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
          //按行读取标准输出流的内容,并写入eventList
          while ((line = reader.readLine()) != null) {
            sourceCounter.incrementEventReceivedCount();
            synchronized (eventList) {
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));
              //超出配置的大小或者超时后,将eventList写到Channel
              if (eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }
          synchronized (eventList) {
            if (!eventList.isEmpty()) {
              flushEventBatch(eventList);
            }
          }
        } catch (Exception e) {
          logger.error("Failed while running command: " + command, e);
          if (e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
          }
        } finally {
          if (reader != null) {
            try {
              reader.close();
            } catch (IOException ex) {
              logger.error("Failed to close reader for exec source", ex);
            }
          }
          exitCode = String.valueOf(kill());
        }
        if (restart) {
          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
              exitCode);
          try {
            Thread.sleep(restartThrottle);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        } else {
          logger.info("Command [" + command + "] exited with " + exitCode);
        }
      } while (restart);//如果配置了自动重启,当Shell命令的进程结束时,自动重启命令。
    }

构造org.apache.flume.Event对象,并放入eventList中,等待定时任务将eventList中数据批量写入到Channel

image.png

org.apache.flume.source.ExecSource.ExecRunnable#flushEventBatch

image.png

将事件放入通道

org.apache.flume.channel.ChannelProcessor#processEventBatch

尝试将给定事件放入每个已配置的通道。 如果任何必需的通道抛出ChannelException,则将传播该异常。

请注意,如果配置了多个通道,则某些事务可能已经提交,而其他事务可能会在异常情况下回滚。

这里区分了必须的可选的channels

  1. 对于必须的channel,任何异常都会导致该事务回滚并重试

  2. 而对于可选的channel,除非是Error的异常,只会打印异常日志,然后忽略该Event

public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");
    events = interceptorChain.intercept(events);
    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();
    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();
    for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);
      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }
      List<Channel> optChannels = selector.getOptionalChannels(event);
      for (Channel ch : optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }
    }
    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();
        List<Event> batch = reqChannelQueue.get(reqChannel);
        for (Event event : batch) {
          reqChannel.put(event);
        }
        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " + reqChannel, t);
          throw (Error) t;
        } else if (t instanceof ChannelException) {
          throw (ChannelException) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();
        List<Event> batch = optChannelQueue.get(optChannel);
        for (Event event : batch) {
          optChannel.put(event);
        }
        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

将Event放进Channel

org.apache.flume.channel.BasicTransactionSemantics#put

protected void put(Event event) {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "put() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "put() called when transaction is %s!", state);
    Preconditions.checkArgument(event != null,
        "put() called with null event!");
    try {
      doPut(event);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }

org.apache.flume.channel.MemoryChannel.MemoryTransaction#doPut

以MemoryChannel为例,主要做这两件事

  1. 将事件添加到putList

  2. 递增计数器

image.png

putList是一个java.util.concurrent.LinkedBlockingDeque类型的阻塞队列