Flink 消费kafka数据写入hbase
@羲凡——只为了更好的活着Flink 消费kafka数据写入hbase一.前提准备1.创建Hbase表create 'test_20191122','info'2.pom.xml文件中要添加依赖<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-cli...
·
@羲凡——只为了更好的活着
Flink 消费kafka数据写入hbase
一.前提准备
1.创建Hbase表
create 'test_20191122','info'
2.pom.xml文件中要添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
二.直接上代码
package pulsar;
import com.hoperun.flink.sink.hbase.connector.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkConsumeKafkaWriteHbase {
private static final Logger LOG = LoggerFactory.getLogger(FlinkConsumeKafkaWriteHbase.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
// kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "cdh01.com:9092,cdh02.com:9092,cdh03.com:9092");
properties.setProperty("group.id", "test227");
properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
properties.setProperty("max.poll.records","1000");
properties.setProperty("max.partition.fetch.bytes","5242880");
// kafka消费者
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"testtopic",
new SimpleStringSchema(),
properties);
consumer.setStartFromLatest();
// 生成DateStream ,并做简单的wordcount计算
DataStreamSource<String> text = env.addSource(consumer,"Kafka").setParallelism(1);
DataStream<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String str, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] arr = str.split(" ");
for (String s : arr) {
collector.collect(new Tuple2<>(s, 1));
}
}
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1);
sum.print();
// 写入 hbase
Map<String, String> userConfig = new HashMap<>();
userConfig.put("hbase.zookeeper.quorum","cdh01.com,cdh02.com,cdh03.com");
userConfig.put("hbase.zookeeper.property.clientPort","2181");
userConfig.put("bulk.flush.interval.ms","1000");
sum.addSink(new HBaseSink<>(userConfig,
new HBaseSinkFunction<Tuple2<String, Integer>>() {
@Override
public void process(Tuple2<String, Integer> element, RuntimeContext ctx, RequestPutter putter) {
byte[] rowKey = Bytes.toBytes(element.f0);
byte[] cf = Bytes.toBytes("info");
byte[] qualifier = Bytes.toBytes("num");
Put put = new Put(rowKey);
put.addColumn(cf, qualifier, Bytes.toBytes(element.f1.toString()));
put.addColumn(cf,Bytes.toBytes("test") , Bytes.toBytes("227"));
putter.add(new PutRequest(TableName.valueOf("test_20191122"),put));
}
},
new PutRequestFailureHandler() {
@Override
public void onFailure(PutRequest entity, Throwable failure, int restStatusCode, RequestPutter putter) throws Throwable {
try {
String tableName = entity.getTableName().getNameAsString();
for (Cell cell : entity.getPut().getFamilyCellMap().get(Bytes.toBytes("info"))) {
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
if(value != null){
LOG.error("HBase数据写入异常。Table:{}, RowKey:{}, Qualifier:{}, value:{}", tableName, rowKey, qualifier, value);
LOG.error("异常消息:", failure);
} else {
return;
}
}
} catch (Exception e) {
LOG.error("HBase错误处理异常:", e);
LOG.error("异常处理数据:", entity);
}
}
}));
env.execute("FlinkConsumeKafkaWriteHbase");
}
}
三.结果展示
hbase(main):005:0> scan 'test_20191122'
ROW COLUMN+CELL
aaa column=info:num, timestamp=1574390105876, value=4
aaa column=info:test, timestamp=1574390105876, value=227
bbb column=info:num, timestamp=1574390105876, value=2
bbb column=info:test, timestamp=1574390105876, value=227
ccc column=info:num, timestamp=1574390105876, value=2
ccc column=info:test, timestamp=1574390105876, value=227
ddd column=info:num, timestamp=1574390105876, value=2
ddd column=info:test, timestamp=1574390105876, value=227
4 row(s) in 0.0380 seconds
====================================================================
@羲凡——只为了更好的活着
若对博客中有任何问题,欢迎留言交流
更多推荐
已为社区贡献3条内容
所有评论(0)