HBASE CDC

1.1 WAL简介

  HBase的Write Ahead Log (WAL)提供了一种高并发、持久化的日志保存与回放机制。每一个业务数据的写入操作(PUT / DELETE)执行前,都会记账在WAL中。
  如果出现HBase服务器宕机,则可以从WAL中回放执行之前没有完成的操作,因此我们可以根据WAL日志来获取CDC数据。以下三种方式其实都是基于WAL日志的,只是实现方式略有不同。
在这里插入图片描述
从上图可知:

  1. 客户端对数据执行一个修改操作,如put(),delete(),incr()等。
  2. 每一个修改被封装到一个KeyValue对象实例,并通过RPC调用发送出来。
  3. 上述调用成批地发送给含有匹配region的HRegionServer。
  4. 数据先被写入到WAL,然后被放放到实际拥有记录的存储文件的MemStore中。
  5. 当MemStore达到一定的大小或经历一个特定时间之后,数据会异步地连续写入到文件系统中。

1.2 解析WAL日志发送kafka

  这里我直接使用了HBASE自己的工具:WALPrettyPrinter,进行了简单修改测试。

代码主要如下:

public void processFile(final Configuration conf, final Path p)
    throws IOException {
  FileSystem fs = p.getFileSystem(conf);
  if (!fs.exists(p)) {
    throw new FileNotFoundException(p.toString());
  }
  if (!fs.isFile(p)) {
    throw new IOException(p + " is not a file");
  }
  WAL.Reader log = WALFactory.createReader(fs, p, conf);
  if (log instanceof ProtobufLogReader) {
    List<String> writerClsNames = ((ProtobufLogReader) log).getWriterClsNames();
    if (writerClsNames != null && writerClsNames.size() > 0) {
      out.print("Writer Classes: ");
      for (int i = 0; i < writerClsNames.size(); i++) {
        out.print(writerClsNames.get(i));
        if (i != writerClsNames.size() - 1) {
          out.print(" ");
        }
      }
      out.println();
    }
    String cellCodecClsName = ((ProtobufLogReader) log).getCodecClsName();
    if (cellCodecClsName != null) {
      out.println("Cell Codec Class: " + cellCodecClsName);
    }
  }
  if (outputJSON && !persistentOutput) {
    out.print("[");
    firstTxn = true;
  }
  try {
    WAL.Entry entry;
    while ((entry = log.next()) != null) {
      WALKey key = entry.getKey();
      WALEdit edit = entry.getEdit();
      // begin building a transaction structure
      Map<String, Object> txn = key.toStringMap();
      long writeTime = key.getWriteTime();
      // check output filters
      if (sequence >= 0 && ((Long) txn.get("sequence")) != sequence)
        continue;
      if (region != null && !((String) txn.get("region")).equals(region))
        continue;
      // initialize list into which we will store atomic actions
      List<Map> actions = new ArrayList<Map>();
      for (Cell cell : edit.getCells()) {
        // add atomic operation to txn
        Map<String, Object> op = new HashMap<String, Object>(toStringMap(cell));
        if (outputValues) op.put("value", Bytes.toStringBinary(cell.getValue()));
        // check row output filter
        if (row == null || ((String) op.get("row")).equals(row)) {
          actions.add(op);
        }
      }
      if (actions.size() == 0)
        continue;
      txn.put("actions", actions);
      if (outputJSON) {
        // JSON output is a straightforward "toString" on the txn object
        if (firstTxn)
          firstTxn = false;
        else
          out.print(",");
        // encode and print JSON
        out.print(MAPPER.writeValueAsString(txn));
      } else {
        // Pretty output, complete with indentation by atomic action
        out.println("Sequence=" + txn.get("sequence") + " "
            + ", region=" + txn.get("region") + " at write timestamp=" + new Date(writeTime));
        for (int i = 0; i < actions.size(); i++) {
          Map op = actions.get(i);
          out.println("row=" + op.get("row") +
              ", column=" + op.get("family") + ":" + op.get("qualifier"));
          if (op.get("tag") != null) {
            out.println("    tag: " + op.get("tag"));
          }
          if (outputValues) out.println("    value: " + op.get("value"));
        }
      }
    }
  } finally {
    log.close();
  }
  if (outputJSON && !persistentOutput) {
    out.print("]");
  }
}

主要逻辑是:
  读取HDFS上的WAL文件,一行一行个解析包装为WAL.Entry对象,从Entry中我们能获取到每次操作的表、rowkey和对应的列和值,然后我们可以把这些操作包装为JSON发送到kafka来完成CDC的收集。

不足之处:

  1. 需要新开发hdfs source来监控hdfs的文件生成和内容变更。
  2. WAL日志中不只一种格式,put和bulkload操作在WAL中格式不一样,需单独解析(Hadoop Sequence File, protobuf序列化文件)
  3. 只能获取当前操作的row、列和值,无法获取当前row的所有数据,需反查

1.3 通过Observer协处理器获取cdc数据发送kafka

  Observer相当于RDBMS的触发器,它提供了很多钩子函数,我们可以重写我们需要的钩子函数来获取指定操作,即当有指定操作(例如put/delelte)时,会触发执行,并且我们可以获取到当前操作的数据并发送kafka。

使用:

  1. 编码,继承BaseRegionObserver类,重写需要订阅的操作,我这里订阅了postPut和postDelete,即当有put和delete操作时,会先写入WAL日志,然后就会执行这两个方法了
  2. 执行该方法时我们可以获取到当前操作的数据并包装成JSON发送kafka
    代码样例如下:
public class HBaseObserver extends BaseRegionObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(HBaseObserver.class);
    private int cnt = 0;
    private long startTime = System.currentTimeMillis();
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        try {
            String rowkey = new String(put.getRow());
            Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
            Map<String, Object> json = new HashMap<>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                }
            }
            json.put("rowkey", rowkey);
            BaseProducer.produce("hbase_test_observer", JSONObject.toJSONString(json));
              if (++cnt % 10000 == 0) {
                  LOGGER.info("已发送:[{}]条记录, 耗时: {}s", cnt, (System.currentTimeMillis() - startTime)/1000);
              }
        } catch (Exception ex) {
            ex.printStackTrace();
            LOGGER.error("处理错误", ex);
        }
    }
}
  1. 打包并上传HDFS
  2. 使用hbase shell 对一个表添加coprocessor(相当于修改schema)
    a) disable 'test_observer'
    b) alter 'test_observer', METHOD => 'table_att', 'coprocessor'=>'hdfs:///hbase/xxx.jar|com.bigblue.hbase.HBaseObserver|1001|'
    c) enable 'test_observer'
    d) coprocessor后面含义是:1.jar路径,2.类全路径,3.优先级
  3. 使用put、bulkload、delete等操作即可
  4. 查看控制台或者kafka已有数据进入

不足之处:

  1. observer是安插在各个regionserver上的,会增加regionserver的操作逻辑,可能会影响吞吐,但实际测试发现并不会明显影响,因为hbase内部使用了大量的协处理器,我们只是增加了一个小小的简单的发送kafka的操作。
  2. 只能获取当前操作的row、列和值,无法获取当前row的所有数据,需反查
  3. 需订阅所需要的操作,如果订阅部署后又想追加订阅不方便

1.4 通过Endpoint协处理器获取cdc数据发送kafka

  通过Endpoint来实现主要是依靠HBase的集群复制功能,Replication是使用协处理器的Endpoint来实现的,基本原理是将源集群的regionserver会将WAL日志按顺序读取,并将WAL日志读取日志offset等信息通过zookeeper记录,读取之后主动的向目标集群的regionserver发送这些信息,在目标集群的regionserver接收到信息后会使用HTable客户端将这些信息插入的对应的表中。源集群的regionserver是endpoint的客户端,目标集群上的regionserver有endpoint的服务端实现,两者通过protobuf协议实现RPC通信,基于这一点所以需要集群间的版本一致,网络互通。

  上面的是HBase的一个集群向另一个集群同步的过程,而我们可以基于此修改为我们自己的ReplicationEndpoint,即我们不需要发送到另一个集群同步,而是解析WAL信息后发送到kafka。

使用:

  1. 编码,继承BaseReplicationEndpoint类,重写replication方法
  2. 当对HBase有操作时会记录到WAL,并调用该方法进行同步
  3. 解析WAL日志,并将对一个rowkey的操作合并为一个操作,并包装为json发送kafka
    代码样例如下:
public class HbaseEndpoint extends BaseReplicationEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(HbaseEndpoint.class);
    private static final ToHRowJson TO_HROW_JSON = new ToHRowJson();
    @Override
    public UUID getPeerUUID() {
        return UUID.randomUUID();
    }
    @Override
    public boolean replicate(ReplicateContext context) {
        final List<WAL.Entry> entries = context.getEntries();
        final Map<String, List<WAL.Entry>> entriesByTable = entries.stream()
                .collect(groupingBy(entry -> entry.getKey().getTableName().getNameAsString()));
        // persist the data to kafka in parallel.
        entriesByTable.entrySet().stream().forEach(entry -> {
            final String tableName = entry.getKey();
            LOG.info("table: " + tableName);
            final List<WAL.Entry> tableEntries = entry.getValue();
            tableEntries.forEach(tblEntry -> {
                List<Cell> cells = tblEntry.getEdit().getCells();

                Map<String, List<Cell>> columnsByRow = cells.stream()
                        .collect(groupingBy(cell -> Bytes.toString(CellUtil.cloneRow(cell))));
                columnsByRow.entrySet().forEach(rowcols -> {
                    HRowJson rowJson = TO_HROW_JSON.apply(rowcols.getKey(), rowcols.getValue());
                    String jsonResult = JSONObject.toJSONString(rowJson);
                    LOG.info(jsonResult);
                    BaseProducer.produce(tableName, jsonResult);
                });
            });
        });
        return true;
    }
    @Override
    public void start() {
        LOG.info("Hbase replication to Kafka started at " + LocalDate.now());
        this.startAsync();
    }
    @Override
    public void stop() {
        LOG.info("Hbase replication to Kafka started at " + LocalDate.now());
        this.stopAsync();
    }
    @Override
    protected void doStart() {
        LOG.info("Hbase replication to Kafka doStarted at " + LocalDate.now());
        notifyStarted();
    }
    @Override
    protected void doStop() {
        LOG.info("Hbase replication to Kafka doStoped at " + LocalDate.now());
        BaseProducer.close();
        notifyStopped();
    }
  1. 打包,部署到各个regionserver的lib包下,开启hbase.replication=true
  2. 重启集群
  3. 建表并增加REPLICATION_SCOPE这个属性,该表被操作后才会被同步和执行replication方法:create 'test_endpoint', {NAME => 'info', REPLICATION_SCOPE => '1'}
  4. 设置自定义的endpoint: add_peer '1', ENDPOINT_CLASSNAME => 'com.bigblue.hbase.endpoint.HbaseEndpoint'
  5. 使用put、bulkload、delete等操作即可
  6. 查看控制台获取kafka已有包装好的json数据

不足之处:

  1. Endpoint同样是是安插在各个regionserver上的,可能会影响吞吐,但实际测试发现并不会明显影响。
  2. 只能获取当前操作的row、列和值,无法获取当前row的所有数据,需反查

1.5 POC结果

  这次测试是在HBASE2.2.6版本进行的测试,集群3台,16c+64g+1T硬盘。
  对于单条put,测试了200w数据,因为单条put效率太低,没有进行更大量测试。
  对于批量put,测试2000w数据。
  以下是测试结果:
在这里插入图片描述
  可以看到不管是单条put还是批量插入,加入observer和endpoint,增加解析和发送kafka逻辑,并没有明显的降低吞吐。但共同的不足就是:cdc的数据只能捕捉到当时的操作记录,无法获取row的所有数据,都需要反查才能获取。

1.6 结论

  1. 三种方式都能实现CDC,难易程度不同,自己解析WAL是对集群最没有影响的,但是是最麻烦的,需要额外写hdfs source;observer和endpoint相对容易,但是observer只能获取订阅的操作,如需扩展稍微麻烦。
  2. 三种方式虽都能实现CDC,但都不能实现我们所需要的CDC,即一个操作不止能获取操作的数据,还能获取到操作后这条数据的所有数据。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐