HBase2.0.1源码解析——get读数据的过程

  |   0 评论   |   1,306 浏览

概述

HBase中的一致性级别

STRONG

强一致性是HBase中的默认一致性模型,读取和写入通过单个服务器来串行化更新,并返回所有已写入和已确认的数据。

TIMELINE

时间轴一致性读取可能会返回可能看不到最新更新的值。

写事务总是在HBase中的强一致性模型中执行的,这保证了事务的排序,并且以相同的顺序重放所有数据副本。

在时间线一致性中,可能从陈旧的数据中回答get和scan请求。

如果请求是由不同服务器响应的,则客户端仍可能会观察到不按顺序的事务。

写数据的过程

org.apache.hadoop.hbase.client.HTable#get

设置一致性级别

默认为STRONG

image.png

强一致性

image.png

构造一个protocol buffer GetRequest

org.apache.hadoop.hbase.shaded.protobuf.RequestConverter#buildGetRequest

在GetRequest中利用doGet进行数据的获取

org.apache.hadoop.hbase.client.ClientServiceCallable#doGet

image.png

带重试的调用

org.apache.hadoop.hbase.client.RpcRetryingCallerImpl#callWithRetries

image.png

时间轴一致性

如果是TIMELINE,就考虑副本的调用

image.png

org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas#call

算法

  1. 我们把查询放入执行池。

  2. 在x ms之后,如果我们没有结果,我们添加secondary副本的查询

  3. 我们采用第一个结果

  4. 完成后,我们取消剩下的查询。 取消意味着:

    1. 如果实际调用未开始,则从池中移除

    2. 如果调用已经启动,则中断调用

客户端我们需要考虑

  1. 调用在被放入池中后不会立即执行

  2. 调用是一个线程。 我们不要将线程数乘以副本数。

服务器端

如果它仍然在handler池中,我们取消它会好得多,因为调用可能需要一些I/O。

在全局范围内,重试次数,超时等仍然适用,但它是针对每个副本的,而不是针对全局的。 我们继续,直到所有重试都完成,或都超时。

public Result call(int operationTimeout)
      throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
    boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
    RegionLocations rl = null;
    boolean skipPrimary = false;
    try {
      // 查找指定tableName和row所在的Region的位置
      rl = getRegionLocations(true,
        (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
        cConnection, tableName, get.getRow());
    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
      // 没有指定特定的副本标识时。 只需要加载所有副本。
      if (isTargetReplicaSpecified) {
        throw e;
      } else {
        // 我们无法获得主副本位置,有可能托管meta的region server挂了,它需要继续尝试缓存副本。
        if (cConnection instanceof ConnectionImplementation) {
          rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
          if (rl == null) {
            // No cached locations
            throw e;
          }
          // 主副本位置未知,跳过主副本
          skipPrimary = true;
        } else {
          // For completeness
          throw e;
        }
      }
    }
    final ResultBoundedCompletionService<Result> cs =
        new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
    int startIndex = 0;
    int endIndex = rl.size();
    if(isTargetReplicaSpecified) {
      // 创建并提交调用
      addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
      endIndex = 1;
    } else {
      if (!skipPrimary) {
        addCallsForReplica(cs, rl, 0, 0);
        try {
          // 等待超时以查看主副本是否回应
          Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
          if (f != null) {
            return f.get(); //great we got a response
          }
          if (cConnection.getConnectionMetrics() != null) {
            cConnection.getConnectionMetrics().incrHedgedReadOps();
          }
        } catch (ExecutionException e) {
          // We ignore the ExecutionException and continue with the secondary replicas
          if (LOG.isDebugEnabled()) {
            LOG.debug("Primary replica returns " + e.getCause());
          }
          // Skip the result from the primary as we know that there is something wrong
          // 由于我们知道有问题,跳过主副本的结果
          startIndex = 1;
        } catch (CancellationException e) {
          throw new InterruptedIOException();
        } catch (InterruptedException e) {
          throw new InterruptedIOException();
        }
      } else {
        // 由于主副本被跳过,所以需要相应地调整endIndex
        endIndex --;
      }
      // 一次提交所有secondaries副本的调用
      addCallsForReplica(cs, rl, 1, rl.size() - 1);
    }
    try {
      ResultBoundedCompletionService<Result>.QueueingFuture<Result> f =
          cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex);
      if (f == null) {
        throw new RetriesExhaustedException("Timed out after " + operationTimeout +
            "ms. Get is sent to replicas with startIndex: " + startIndex +
            ", endIndex: " + endIndex + ", Locations: " + rl);
      }
      if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified &&
          !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
        cConnection.getConnectionMetrics().incrHedgedReadWin();
      }
      return f.get();
    } catch (ExecutionException e) {
      throwEnrichedException(e, retries);
    } catch (CancellationException e) {
      throw new InterruptedIOException();
    } catch (InterruptedException e) {
      throw new InterruptedIOException();
    } finally {
      // 到达这里是因为我们被打断了、或者因为一个或多个调用成功或失败了。 在任何情况下,我们都会停止所有的任务。
      cs.cancelAll();
    }
    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
    return null; // unreachable
}

定位Region

org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas#getRegionLocations

image.png

关于定位Region的详细过程,请参考《HBase2.0.1源码解析——根据RowKey定位Region的过程

创建并提交调用

org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas#addCallsForReplica

image.png

读后有收获可以支付宝请作者喝咖啡