HBase2.0.1源码解析——put写数据的过程

  |   0 评论   |   1,577 浏览

概述

Java API写数据的一般方法

static void put(TableName tableName) throws Exception {
	Configuration conf = HBaseConfiguration.create();
	Connection connection = ConnectionFactory.createConnection(conf);
	Table table = NULL;
	try {
		table = connection.getTable(tableName);
		Put put = new Put(Bytes.toBytes("row"));
		put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
		put.setCellVisibility(new CellVisibility(labelExp));
		table.put(put);
	} finally {
		if (table != null) {
			table.flushCommits();
		}
	}
}

主要流程如下:

  1. 构造配置对象Configuration;

  2. 获取连接对象Connection;

  3. 获取Table;

  4. 构造Put对象

  5. 调用table.put提交

下面分步介绍put的实现过程

构造配置对象Configuration

org.apache.hadoop.hbase.HBaseConfiguration#create()

image.png

org.apache.hadoop.hbase.HBaseConfiguration#addHbaseResources

image.png

org.apache.hadoop.hbase.HBaseConfiguration#checkDefaultsVersion

image.png

利用反射创建连接对象Connection

org.apache.hadoop.hbase.client.ConnectionFactory#createConnection

使用传递的conf实例创建一个新的Connection实例。 Connection封装了与集群连接的所有内务管理。

从返回的连接创建的所有表和接口共享zookeeper连接,meta缓存以及连接到RegionServer和HMaster

默认的实现类是org.apache.hadoop.hbase.client.ConnectionImplementation

image.png

org.apache.hadoop.hbase.client.ConnectionImplementation#ConnectionImplementation

ConnectionImplementation(Configuration conf,
                           ExecutorService pool, User user) throws IOException {
    this.conf = conf;
    this.user = user;
    this.batchPool = pool;
    this.connectionConfig = new ConnectionConfiguration(conf);
    this.closed = false;
    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
    if (configuredPauseForCQTBE < pause) {
      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
          + ", will use " + pause + " instead.");
      this.pauseForCQTBE = pause;
    } else {
      this.pauseForCQTBE = configuredPauseForCQTBE;
    }
    this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
      HConstants.DEFAULT_USE_META_REPLICAS);
    this.metaReplicaCallTimeoutScanInMicroSecond =
        connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();

    // how many times to try, one more than max *retry* time
    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
    this.rpcTimeout = conf.getInt(
        HConstants.HBASE_RPC_TIMEOUT_KEY,
        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
      synchronized (nonceGeneratorCreateLock) {
        if (nonceGenerator == null) {
          // 每个客户端随机的NonceGEnerator,主要是为了生成clientid
          nonceGenerator = PerClientRandomNonceGenerator.get();
        }
      }
    } else {
      nonceGenerator = NO_NONCE_GENERATOR;
    }
    // 创建跟踪该connection所相关的region 信息监控实例
    this.stats = ServerStatisticTracker.create(conf);
    //远程服务器出现故障时,进行处理的机制
    this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
    //RpcRetryingCaller创建工厂
    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
    // create the backoff policy
    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
    // 创建一个异步进程实例,该进程主要负责持续的请求流
    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
      this.metrics = new MetricsConnection(this);
    } else {
      this.metrics = null;
    }
    this.metaCache = new MetaCache(this.metrics);

    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
        HConstants.STATUS_PUBLISHED_DEFAULT);
    this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
    Class<? extends ClusterStatusListener.Listener> listenerClass =
        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
            ClusterStatusListener.Listener.class);

    // Is there an alternate BufferedMutator to use?
    this.alternateBufferedMutatorClassName =
        this.conf.get(BufferedMutator.CLASSNAME_KEY);

    try {
      //用于获取集群的基本信息例如clusterid以及region location的meta数据
      // 默认实现是ZKAsyncRegistry
      this.registry = AsyncRegistryFactory.getRegistry(conf);
      retrieveClusterId();
      //负责IPC调用相关
      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);

      // Do we publish the status?
      if (shouldListen) {
        if (listenerClass == null) {
          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
        } else {
          clusterStatusListener = new ClusterStatusListener(
              new ClusterStatusListener.DeadServerHandler() {
                @Override
                public void newDead(ServerName sn) {
                  clearCaches(sn);
                  rpcClient.cancelConnections(sn);
                }
              }, conf, listenerClass);
        }
      }
    } catch (Throwable e) {
      // avoid leaks: registry, rpcClient, ...
      LOG.debug("connection construction failed", e);
      close();
      throw e;
    }
  }

获取ClusterId

AsyncRegistry通过参数hbase.client.registry.impl指定,默认实现是org.apache.hadoop.hbase.client.ZKAsyncRegistry

org.apache.hadoop.hbase.client.ZKAsyncRegistry#getClusterId()

image.png

获取Zookeeper节点/hbase/hbaseid的值,如图

image.png

获取Table对象


@Override
public Table getTable(TableName tableName) throws IOException {
  return getTable(tableName, getBatchPool());
}

@Override
public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
  return new TableBuilderBase(tableName, connectionConfig) {

    @Override
    public Table build() {
      return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
          rpcControllerFactory, pool);
    }
  };
}

注意此处的pool是java.util.concurrent.ThreadPoolExecutor类型

workQueue是java.util.concurrent.LinkedBlockingQueue类型

private ExecutorService getBatchPool() {
    if (batchPool == null) {
      synchronized (this) {
        if (batchPool == null) {
          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
          this.batchPool = getThreadPool(threads, threads, "-shared", null);
          this.cleanupPool = true;
        }
      }
    }
    return this.batchPool;
  }
  private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
      BlockingQueue<Runnable> passedWorkQueue) {
    // shared HTable thread executor not yet initialized
    if (maxThreads == 0) {
      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
    }
    if (coreThreads == 0) {
      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
    }
    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
    BlockingQueue<Runnable> workQueue = passedWorkQueue;
    if (workQueue == null) {
      workQueue =
        new LinkedBlockingQueue<>(maxThreads *
            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
                HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
      coreThreads = maxThreads;
    }
    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
        coreThreads,
        maxThreads,
        keepAliveTime,
        TimeUnit.SECONDS,
        workQueue,
        Threads.newDaemonThreadFactory(toString() + nameHint));
    tpe.allowCoreThreadTimeOut(true);
    return tpe;
  }

构造Put对象

调用table.put提交

put有两个重载方法

  1. put(Put put)    同步提交

  2. put(List<Put> puts)    异步提交

put(Put put)

构造一个RetryingCallable对象,交由org.apache.hadoop.hbase.client.RpcRetryingCallerImpl对象重试提交

image.png

org.apache.hadoop.hbase.client.RpcRetryingCallerImpl#callWithRetries

一直重试执行传进来的callable对象,直到超过给定的重试次数和超时时间

简化的代码如下

image.png

image.png

put(List<Put> puts)

org.apache.hadoop.hbase.client.HTable#put

image.png

org.apache.hadoop.hbase.client.HTable#batch

构造org.apache.hadoop.hbase.client.AsyncProcessTask对象,并交由org.apache.hadoop.hbase.client.AsyncProcess来执行异步处理

image.png

注意此处的setSubmittedRows值为org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows#ALL

SubmittedRows代表已处理的行数。 AsyncProcess具有流量控制功能,可以拒绝某些行

image.png

根据SubmittedRows的不同类型,AsyncProcess会选择不同的方法进行处理

image.png

关于org.apache.hadoop.hbase.client.AsyncProcess#submit方法的讨论请参考《HBase2.0.1源码解析——异步处理AsyncProcess

读后有收获可以支付宝请作者喝咖啡