HBase2.0.1源码解析——异步处理AsyncProcess

  |   0 评论   |   3,874 浏览

概述

在HBase中,把多个Action抽象为Mutation,如Put、Delete、Append等

image.png

这些类可以构造为org.apache.hadoop.hbase.client.AsyncProcessTask对象

然后提交给org.apache.hadoop.hbase.client.AsyncProcess#submit来执行异步的提交操作

基本的使用方法方法如下

image.png

org.apache.hadoop.hbase.client.AsyncProcess#submit

image.png

根据SubmittedRows的不同类型,AsyncProcess会选择不同的方法进行处理

SubmittedRows代表已处理的行数。 AsyncProcess具有流量控制功能,可以拒绝某些行

image.png

submitAll

org.apache.hadoop.hbase.client.AsyncProcess#submitAll

无论服务器状态如何,立即提交行列表。 保持向后兼容性:它允许与返回一组对象的批处理接口一起使用。

  • 把Row列表转换为Action列表

  • 调整优先级

image.png

遍历Action列表,添加RegionServer信息,然后添加到队列中等待发送

将每个region server的操作列表分组并将其发送

org.apache.hadoop.hbase.client.AsyncRequestFutureImpl#groupAndSendMultiAction

image.png

查找Region的位置

org.apache.hadoop.hbase.client.AsyncRequestFutureImpl#findAllLocationsOrFail

image.png

定位Region的位置有两种方法,分别是locateRegion和relocateRegion,区别在于是否使用缓存

关于定位Region的详细过程,请参考《HBase2.0.1源码解析——根据RowKey定位Region的过程

image.png

添加到队列中等待发送

org.apache.hadoop.hbase.client.AsyncProcess#addAction

image.png

org.apache.hadoop.hbase.client.MultiAction#actions

actions是一个TreeMap对象,映射region到该region的puts/gets/deletes操作列表

定义如下

protected Map<byte[], List<Action>> actions = new TreeMap<>(Bytes.BYTES_COMPARATOR);

调用sendMultiAction发送

放到队列的下一步,当然就是发送了

image.png

org.apache.hadoop.hbase.client.AsyncRequestFutureImpl#sendMultiAction

根据尝试次数延迟一段时间后,异步地将多Action结构发送到服务器

调用org.apache.hadoop.hbase.client.AsyncRequestFutureImpl#getNewMultiActionRunnable将Action封装成Runnable对象,然后提交给线程池执行

image.png

submit

org.apache.hadoop.hbase.client.AsyncProcess#submit

从行列表中提取我们可以提交的内容。 我们无法提交的行保留在列表中。

不会将请求发送到副本(目前不用于流式put之外的其他任何内容)。

private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
    boolean atLeastOne) throws InterruptedIOException {
    TableName tableName = task.getTableName();
    RowAccess<? extends Row> rows = task.getRowAccess();
    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
    List<Action> retainedActions = new ArrayList<>(rows.size());
    NonceGenerator ng = this.connection.getNonceGenerator();
    // 目前,nonce组是每个整个客户端。
    long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
    // Location errors that happen before we decide what requests to take.
    // 在我们决定采取什么请求之前发生Location错误
    List<Exception> locationErrors = null;
    List<Integer> locationErrorRows = null;
    RequestController.Checker checker = requestController.newChecker();
    boolean firstIter = true;
    do {
      // Wait until there is at least one slot for a new task.
      // 为新任务等待至少一个槽位
      requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
      int posInList = -1;
      if (!firstIter) {
        checker.reset();
      }
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        HRegionLocation loc;
        try {
          if (r == null) {
            throw new IllegalArgumentException("#" + id + ", row cannot be null");
          }
          // Make sure we get 0-s replica.
          // 确保我们得到0-s个副本
          RegionLocations locs = connection.locateRegion(
              tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
          if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
            throw new IOException("#" + id + ", no location found, aborting submit for"
                + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
          }
          loc = locs.getDefaultRegionLocation();
        } catch (IOException ex) {
          locationErrors = new ArrayList<>(1);
          locationErrorRows = new ArrayList<>(1);
          LOG.error("Failed to get region location ", ex);
          // This action failed before creating ars. Retain it, but do not add to submit list.
          // We will then add it to ars in an already-failed state.
          int priority = HConstants.NORMAL_QOS;
          if (r instanceof Mutation) {
            priority = ((Mutation) r).getPriority();
          }
          retainedActions.add(new Action(r, ++posInList, priority));
          locationErrors.add(ex);
          locationErrorRows.add(posInList);
          it.remove();
          break; // Backward compat: we stop considering actions on location error.
        }
        ReturnCode code = checker.canTakeRow(loc, r);
        if (code == ReturnCode.END) {
          break;
        }
        if (code == ReturnCode.INCLUDE) {
          int priority = HConstants.NORMAL_QOS;
          if (r instanceof Mutation) {
            priority = ((Mutation) r).getPriority();
          }
          Action action = new Action(r, ++posInList, priority);
          setNonce(ng, r, action);
          retainedActions.add(action);
          // TODO: replica-get is not supported on this path
          byte[] regionName = loc.getRegionInfo().getRegionName();
          addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
          it.remove();
        }
      }
      firstIter = false;
    } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
    if (retainedActions.isEmpty()) return NO_REQS_RESULT;
    return submitMultiActions(task, retainedActions, nonceGroup,
        locationErrors, locationErrorRows, actionsByServer);
  }

org.apache.hadoop.hbase.client.AsyncProcess#submitMultiActions

下面就和submitAll相同了

image.png

读后有收获可以支付宝请作者喝咖啡