FlumeNG1.8.0源码解析——Application的启动过程

  |   0 评论   |   702 浏览

Flume使用方法

我们一般是这样启动Flume的

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

详细可参考《FlumeNG简介

查看Shell脚本flume-ng知,程序的入口是org.apache.flume.node.Application

image.png

org.apache.flume.node.Application

org.apache.flume.node.Application#main是程序主入口

首先利用commons-cli解析命令行参数,然后决定Application的构造方式:

  1. 如果配置了Zookeeper,则从Zookeeper中获取Agent配置信息;

  2. 如果指定了no-reload-conf,则配置文件变化时不自动重载配置,否则,构造ConfigurationProvider并加入components中;

  3. 默认情况下,从Property配置文件中加载Agent配置信息;

主要流程

List<LifecycleAware> components = Lists.newArrayList();

if (reload) {
	EventBus eventBus = new EventBus(agentName + "-event-bus");
	PollingPropertiesFileConfigurationProvider configurationProvider =
			new PollingPropertiesFileConfigurationProvider(
					agentName, configurationFile, eventBus, 30);
	components.add(configurationProvider);
	//创建Application对象,包含初始化组件列表(components),初始化LifecycleSupervisor。
	application = new Application(components);
	eventBus.register(application);
} else {
	PropertiesFileConfigurationProvider configurationProvider =
			new PropertiesFileConfigurationProvider(agentName, configurationFile);
	application = new Application();
	application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
//start方法用于检查所有组件是否是启动状态,如果不是则启动该组件。
application.start();
//监听程序关闭事件,用于当程序被kill后能够执行一些清理工作。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
	@Override
	public void run() {
	appReference.stop();
	}
});

解析配置文件并实例化对象

org.apache.flume.node.AbstractConfigurationProvider#getConfiguration

构造对象并填充MaterializedConfiguration

注意这里会将没用到的Channel删除

public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        // 加载Component
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        
        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        // 向conf中添加Channel
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            // 如果channel没有配置任何source和sink,将会被移除
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap =
                channelCache.get(channelComponent.channel.getClass());
            if (nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        // 向conf中添加Source
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        // 向conf中添加Sink
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

依次构造Channel、Source、Sink

注意这里Channel和Sink是在工厂模式中利用反射实例化的,而Source是利用forSource构造出来的

Channel和Sink的反射方式

org.apache.flume.sink.DefaultSinkFactory

@Override
  public Sink create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");
    logger.info("Creating instance of sink: {}, type: {}", name, type);
    Class<? extends Sink> sinkClass = getClass(type);
    try {
      Sink sink = sinkClass.newInstance();
      sink.setName(name);
      return sink;
    } catch (Exception ex) {
      throw new FlumeException("Unable to create sink: " + name
          + ", type: " + type + ", class: " + sinkClass.getName(), ex);
    }
  }
  @SuppressWarnings("unchecked")
  @Override
  public Class<? extends Sink> getClass(String type) throws FlumeException {
    String sinkClassName = type;
    SinkType sinkType = SinkType.OTHER;
    try {
      sinkType = SinkType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      logger.debug("Sink type {} is a custom type", type);
    }
    if (!sinkType.equals(SinkType.OTHER)) {
      sinkClassName = sinkType.getSinkClassName();
    }
    try {
      return (Class<? extends Sink>) Class.forName(sinkClassName);
    } catch (Exception ex) {
      throw new FlumeException("Unable to load sink type: " + type
          + ", class: " + sinkClassName, ex);
    }
  }

SourceRunner的构造方式

org.apache.flume.SourceRunner#forSource

public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;
    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }
    return runner;
  }

根据Source的不同类型分别构造不同的SourceRunner

  1. org.apache.flume.PollableSource:需要外部驱动程序进行轮询以确定是否存在可从源中获取的事件的源。

  2. org.apache.flume.EventDrivenSource:一个Source,它不需要外部驱动程序来轮询要摄取的事件; 它提供了自己的事件驱动机制来调用事件处理。

org.apache.flume.source.PollableSourceRunner

SourceRunner的一个实现,可以驱动PollableSource。

PollableSourceRunner将PollableSource包装在所需的运行循环中,以使其运行。 在内部,保留度量和计数器,使得返回BACKOFF的PollableSource.Status的源导致运行循环完全执行该操作。 最大退避时间为500毫秒。 将立即调用返回READY的源。 请注意,BACKOFF仅仅是对跑步者的暗示; 它不需要严格遵守。

org.apache.flume.source.EventDrivenSourceRunner#EventDrivenSourceRunner

Starts, stops, 以及管理event-driven sources.

处理配置更改事件

org.apache.flume.node.Application#handleConfigurationEvent

@Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }

该方法由com.google.common.eventbus.Subscribe注解标注

将方法标记为事件处理程序,由AnnotatedHandlerFinder和EventBus使用。

事件的类型将由方法的第一个(也是唯一的)参数指示。 如果此批注应用于具有零参数或多个参数的方法,则包含该方法的对象将无法从EventBus注册事件传递。

除非也使用@AllowConcurrentEvents进行注释,否则事件处理程序方法将由它们注册的每个事件总线串行调用。

org.apache.flume.node.Application#startAllComponents

启动所有组件

利用org.apache.flume.lifecycle.LifecycleSupervisor依次启动Channel、Sink、Source

private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
  logger.info("Starting new configuration:{}", materializedConfiguration);

  this.materializedConfiguration = materializedConfiguration;

  for (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
    try {
      logger.info("Starting Channel " + entry.getKey());
      supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    } catch (Exception e) {
      logger.error("Error while starting {}", entry.getValue(), e);
    }
  }

  /*
   * Wait for all channels to start.
   */
  for (Channel ch : materializedConfiguration.getChannels().values()) {
    while (ch.getLifecycleState() != LifecycleState.START
        && !supervisor.isComponentInErrorState(ch)) {
      try {
        logger.info("Waiting for channel: " + ch.getName() +
            " to start. Sleeping for 500 ms");
        Thread.sleep(500);
      } catch (InterruptedException e) {
        logger.error("Interrupted while waiting for channel to start.", e);
        Throwables.propagate(e);
      }
    }
  }

  for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
    try {
      logger.info("Starting Sink " + entry.getKey());
      supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    } catch (Exception e) {
      logger.error("Error while starting {}", entry.getValue(), e);
    }
  }

  for (Entry<String, SourceRunner> entry :
       materializedConfiguration.getSourceRunners().entrySet()) {
    try {
      logger.info("Starting Source " + entry.getKey());
      supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    } catch (Exception e) {
      logger.error("Error while starting {}", entry.getValue(), e);
    }
  }

  this.loadMonitoring();
}

org.apache.flume.lifecycle.LifecycleSupervisor

LifecycleSupervisor是一个管理程序,类似于supervisord

可参考我之前写过的一篇文章《利用supervisor管理你的服务

利用一个org.apache.flume.lifecycle.LifecycleSupervisor.MonitorRunnable线程来包裹Channel、Sink、Source等程序

org.apache.flume.node.Application#start

启动所有components,如上面添加的ConfigurationProvider等

public synchronized void start() {
    for (LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }