HBase -- HBase CDC调研
向导HBASE CDC1.1WAL简介1.2解析WAL日志发送kafka1.3 通过Observer协处理器获取cdc数据发送kafka1.4 通过Endpoint协处理器获取cdc数据发送kafka1.5 POC结果1.6 结论HBASE CDC1.1WAL简介 HBase的Write Ahead Log (WAL)提供了一种高并发、持久化的日志保存与回放机制。每一个业务数据的写入操作(PUT
向导
HBASE CDC
1.1 WAL简介
HBase的Write Ahead Log (WAL)提供了一种高并发、持久化的日志保存与回放机制。每一个业务数据的写入操作(PUT / DELETE)执行前,都会记账在WAL中。
如果出现HBase服务器宕机,则可以从WAL中回放执行之前没有完成的操作,因此我们可以根据WAL日志来获取CDC数据。以下三种方式其实都是基于WAL日志的,只是实现方式略有不同。
从上图可知:
- 客户端对数据执行一个修改操作,如put(),delete(),incr()等。
- 每一个修改被封装到一个KeyValue对象实例,并通过RPC调用发送出来。
- 上述调用成批地发送给含有匹配region的HRegionServer。
- 数据先被写入到WAL,然后被放放到实际拥有记录的存储文件的MemStore中。
- 当MemStore达到一定的大小或经历一个特定时间之后,数据会异步地连续写入到文件系统中。
1.2 解析WAL日志发送kafka
这里我直接使用了HBASE自己的工具:WALPrettyPrinter,进行了简单修改测试。
代码主要如下:
public void processFile(final Configuration conf, final Path p)
throws IOException {
FileSystem fs = p.getFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString());
}
if (!fs.isFile(p)) {
throw new IOException(p + " is not a file");
}
WAL.Reader log = WALFactory.createReader(fs, p, conf);
if (log instanceof ProtobufLogReader) {
List<String> writerClsNames = ((ProtobufLogReader) log).getWriterClsNames();
if (writerClsNames != null && writerClsNames.size() > 0) {
out.print("Writer Classes: ");
for (int i = 0; i < writerClsNames.size(); i++) {
out.print(writerClsNames.get(i));
if (i != writerClsNames.size() - 1) {
out.print(" ");
}
}
out.println();
}
String cellCodecClsName = ((ProtobufLogReader) log).getCodecClsName();
if (cellCodecClsName != null) {
out.println("Cell Codec Class: " + cellCodecClsName);
}
}
if (outputJSON && !persistentOutput) {
out.print("[");
firstTxn = true;
}
try {
WAL.Entry entry;
while ((entry = log.next()) != null) {
WALKey key = entry.getKey();
WALEdit edit = entry.getEdit();
// begin building a transaction structure
Map<String, Object> txn = key.toStringMap();
long writeTime = key.getWriteTime();
// check output filters
if (sequence >= 0 && ((Long) txn.get("sequence")) != sequence)
continue;
if (region != null && !((String) txn.get("region")).equals(region))
continue;
// initialize list into which we will store atomic actions
List<Map> actions = new ArrayList<Map>();
for (Cell cell : edit.getCells()) {
// add atomic operation to txn
Map<String, Object> op = new HashMap<String, Object>(toStringMap(cell));
if (outputValues) op.put("value", Bytes.toStringBinary(cell.getValue()));
// check row output filter
if (row == null || ((String) op.get("row")).equals(row)) {
actions.add(op);
}
}
if (actions.size() == 0)
continue;
txn.put("actions", actions);
if (outputJSON) {
// JSON output is a straightforward "toString" on the txn object
if (firstTxn)
firstTxn = false;
else
out.print(",");
// encode and print JSON
out.print(MAPPER.writeValueAsString(txn));
} else {
// Pretty output, complete with indentation by atomic action
out.println("Sequence=" + txn.get("sequence") + " "
+ ", region=" + txn.get("region") + " at write timestamp=" + new Date(writeTime));
for (int i = 0; i < actions.size(); i++) {
Map op = actions.get(i);
out.println("row=" + op.get("row") +
", column=" + op.get("family") + ":" + op.get("qualifier"));
if (op.get("tag") != null) {
out.println(" tag: " + op.get("tag"));
}
if (outputValues) out.println(" value: " + op.get("value"));
}
}
}
} finally {
log.close();
}
if (outputJSON && !persistentOutput) {
out.print("]");
}
}
主要逻辑是:
读取HDFS上的WAL文件,一行一行个解析包装为WAL.Entry对象,从Entry中我们能获取到每次操作的表、rowkey和对应的列和值,然后我们可以把这些操作包装为JSON发送到kafka来完成CDC的收集。
不足之处:
- 需要新开发hdfs source来监控hdfs的文件生成和内容变更。
- WAL日志中不只一种格式,put和bulkload操作在WAL中格式不一样,需单独解析(Hadoop Sequence File, protobuf序列化文件)
- 只能获取当前操作的row、列和值,无法获取当前row的所有数据,需反查
1.3 通过Observer协处理器获取cdc数据发送kafka
Observer相当于RDBMS的触发器,它提供了很多钩子函数,我们可以重写我们需要的钩子函数来获取指定操作,即当有指定操作(例如put/delelte)时,会触发执行,并且我们可以获取到当前操作的数据并发送kafka。
使用:
- 编码,继承BaseRegionObserver类,重写需要订阅的操作,我这里订阅了postPut和postDelete,即当有put和delete操作时,会先写入WAL日志,然后就会执行这两个方法了
- 执行该方法时我们可以获取到当前操作的数据并包装成JSON发送kafka
代码样例如下:
public class HBaseObserver extends BaseRegionObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(HBaseObserver.class);
private int cnt = 0;
private long startTime = System.currentTimeMillis();
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
try {
String rowkey = new String(put.getRow());
Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String, Object> json = new HashMap<>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
}
}
json.put("rowkey", rowkey);
BaseProducer.produce("hbase_test_observer", JSONObject.toJSONString(json));
if (++cnt % 10000 == 0) {
LOGGER.info("已发送:[{}]条记录, 耗时: {}s", cnt, (System.currentTimeMillis() - startTime)/1000);
}
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.error("处理错误", ex);
}
}
}
- 打包并上传HDFS
- 使用hbase shell 对一个表添加coprocessor(相当于修改schema)
a)disable 'test_observer'
b)alter 'test_observer', METHOD => 'table_att', 'coprocessor'=>'hdfs:///hbase/xxx.jar|com.bigblue.hbase.HBaseObserver|1001|'
c)enable 'test_observer'
d) coprocessor后面含义是:1.jar路径,2.类全路径,3.优先级 - 使用put、bulkload、delete等操作即可
- 查看控制台或者kafka已有数据进入
不足之处:
- observer是安插在各个regionserver上的,会增加regionserver的操作逻辑,可能会影响吞吐,但实际测试发现并不会明显影响,因为hbase内部使用了大量的协处理器,我们只是增加了一个小小的简单的发送kafka的操作。
- 只能获取当前操作的row、列和值,无法获取当前row的所有数据,需反查
- 需订阅所需要的操作,如果订阅部署后又想追加订阅不方便
1.4 通过Endpoint协处理器获取cdc数据发送kafka
通过Endpoint来实现主要是依靠HBase的集群复制功能,Replication是使用协处理器的Endpoint来实现的,基本原理是将源集群的regionserver会将WAL日志按顺序读取,并将WAL日志读取日志offset等信息通过zookeeper记录,读取之后主动的向目标集群的regionserver发送这些信息,在目标集群的regionserver接收到信息后会使用HTable客户端将这些信息插入的对应的表中。源集群的regionserver是endpoint的客户端,目标集群上的regionserver有endpoint的服务端实现,两者通过protobuf协议实现RPC通信,基于这一点所以需要集群间的版本一致,网络互通。
上面的是HBase的一个集群向另一个集群同步的过程,而我们可以基于此修改为我们自己的ReplicationEndpoint,即我们不需要发送到另一个集群同步,而是解析WAL信息后发送到kafka。
使用:
- 编码,继承BaseReplicationEndpoint类,重写replication方法
- 当对HBase有操作时会记录到WAL,并调用该方法进行同步
- 解析WAL日志,并将对一个rowkey的操作合并为一个操作,并包装为json发送kafka
代码样例如下:
public class HbaseEndpoint extends BaseReplicationEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(HbaseEndpoint.class);
private static final ToHRowJson TO_HROW_JSON = new ToHRowJson();
@Override
public UUID getPeerUUID() {
return UUID.randomUUID();
}
@Override
public boolean replicate(ReplicateContext context) {
final List<WAL.Entry> entries = context.getEntries();
final Map<String, List<WAL.Entry>> entriesByTable = entries.stream()
.collect(groupingBy(entry -> entry.getKey().getTableName().getNameAsString()));
// persist the data to kafka in parallel.
entriesByTable.entrySet().stream().forEach(entry -> {
final String tableName = entry.getKey();
LOG.info("table: " + tableName);
final List<WAL.Entry> tableEntries = entry.getValue();
tableEntries.forEach(tblEntry -> {
List<Cell> cells = tblEntry.getEdit().getCells();
Map<String, List<Cell>> columnsByRow = cells.stream()
.collect(groupingBy(cell -> Bytes.toString(CellUtil.cloneRow(cell))));
columnsByRow.entrySet().forEach(rowcols -> {
HRowJson rowJson = TO_HROW_JSON.apply(rowcols.getKey(), rowcols.getValue());
String jsonResult = JSONObject.toJSONString(rowJson);
LOG.info(jsonResult);
BaseProducer.produce(tableName, jsonResult);
});
});
});
return true;
}
@Override
public void start() {
LOG.info("Hbase replication to Kafka started at " + LocalDate.now());
this.startAsync();
}
@Override
public void stop() {
LOG.info("Hbase replication to Kafka started at " + LocalDate.now());
this.stopAsync();
}
@Override
protected void doStart() {
LOG.info("Hbase replication to Kafka doStarted at " + LocalDate.now());
notifyStarted();
}
@Override
protected void doStop() {
LOG.info("Hbase replication to Kafka doStoped at " + LocalDate.now());
BaseProducer.close();
notifyStopped();
}
- 打包,部署到各个regionserver的lib包下,开启hbase.replication=true
- 重启集群
- 建表并增加REPLICATION_SCOPE这个属性,该表被操作后才会被同步和执行replication方法:
create 'test_endpoint', {NAME => 'info', REPLICATION_SCOPE => '1'}
- 设置自定义的endpoint:
add_peer '1', ENDPOINT_CLASSNAME => 'com.bigblue.hbase.endpoint.HbaseEndpoint'
- 使用put、bulkload、delete等操作即可
- 查看控制台获取kafka已有包装好的json数据
不足之处:
- Endpoint同样是是安插在各个regionserver上的,可能会影响吞吐,但实际测试发现并不会明显影响。
- 只能获取当前操作的row、列和值,无法获取当前row的所有数据,需反查
1.5 POC结果
这次测试是在HBASE2.2.6版本进行的测试,集群3台,16c+64g+1T硬盘。
对于单条put,测试了200w数据,因为单条put效率太低,没有进行更大量测试。
对于批量put,测试2000w数据。
以下是测试结果:
可以看到不管是单条put还是批量插入,加入observer和endpoint,增加解析和发送kafka逻辑,并没有明显的降低吞吐。但共同的不足就是:cdc的数据只能捕捉到当时的操作记录,无法获取row的所有数据,都需要反查才能获取。
1.6 结论
- 三种方式都能实现CDC,难易程度不同,自己解析WAL是对集群最没有影响的,但是是最麻烦的,需要额外写hdfs source;observer和endpoint相对容易,但是observer只能获取订阅的操作,如需扩展稍微麻烦。
- 三种方式虽都能实现CDC,但都不能实现我们所需要的CDC,即一个操作不止能获取操作的数据,还能获取到操作后这条数据的所有数据。
更多推荐
所有评论(0)