FlumeNG1.8.0源码解析——Sink从Channel中读数据的过程

  |   0 评论   |   970 浏览

入口

org.apache.flume.SinkRunner#start

@Override
  public void start() {
    SinkProcessor policy = getPolicy();
    //启动Sink
    policy.start();
    runner = new PollingRunner();
    runner.policy = policy;
    runner.counterGroup = counterGroup;
    runner.shouldStop = new AtomicBoolean();
    runnerThread = new Thread(runner);
    runnerThread.setName("SinkRunner-PollingRunner-" +
        policy.getClass().getSimpleName());
    runnerThread.start();
    lifecycleState = LifecycleState.START;
  }

org.apache.flume.SinkRunner.PollingRunner#run

PollingRunner是一个Runnable对象,用于轮询SinkProcessor并管理事件传递通知,BACKOFF延迟处理等。

@Override
    public void run() {
      logger.debug("Polling sink runner starting");
      while (!shouldStop.get()) {
        try {
          // 调用Processor.process(),并根据返回状态确定是否退避一段时间
          if (policy.process().equals(Sink.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");
            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          logger.debug("Interrupted while processing an event. Exiting.");
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (Exception e) {
          logger.error("Unable to deliver event. Exception follows.", e);
          if (e instanceof EventDeliveryException) {
            counterGroup.incrementAndGet("runner.deliveryErrors");
          } else {
            counterGroup.incrementAndGet("runner.errors");
          }
          try {
            Thread.sleep(maxBackoffSleep);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }
      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }

org.apache.flume.sink.LoggerSink#process

以LoggerSink为例,从channel中获取一条数据,并输出到日志

image.png

org.apache.flume.channel.BasicTransactionSemantics#take

image.png

org.apache.flume.channel.MemoryChannel.MemoryTransaction#doTake

以MemoryChannel为例,从阻塞队列中获取元素

image.png