0x01 需求背景
将Kafka中的JSON数据持久化存储到Hive表中,以供后期有查找的需求。

(看了很多讲解的博文,出了各种bug!饶了很多弯路!总结出来的经验就是一定要仔细看Flume的官方文档!!!!!!)

Kafka中的数据示例:

>{"id":1,"name":"snowty","age":25}
Hive表示例:

hive> desc hivetable;
OK
id                      int                                         
name                    string                                      
age                     int                                         
Time taken: 0.162 seconds, Fetched: 3 row(s)
0x02 环境搭建
参考:kafka、hive、flume环境搭建

0x03 Kafka2Hive
1、hive配置
    建表时要进行分桶、赋予事务性,需要对hive进行配置

修改hive-site.xml文件:
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    <description>
      Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
      transactions, which also requires appropriate settings for hive.compactor.initiator.on,
      hive.compactor.worker.threads, hive.support.concurrency (true),
      and hive.exec.dynamic.partition.mode (nonstrict).
      The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
      no transactions.
    </description>
</property>
<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
    <description>
      Whether Hive supports concurrency control or not.
      A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
    </description>
</property>
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://localhost:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
执行:
$ hive --service metastore &        //先启动元数据服务
$ hive
创建database、table,其中表有id、name、age这个三个字段
hive> create database hivedatabase;
 
hive> create table hivetable(id int,name string,age int) clustered by(id) into 2 buckets stored as orc tblproperties('transactional'='true');
将`/opt/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.1.jar `拷贝到flume的lib文件夹下:
      (不拷贝的话会有dependency的问题,困扰了我好久..)

$ sudo cp /opt/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.1.jar /opt/flume/lib/
 

2、flume agent配置文件(详情参考官方文档!花十几分钟看下就豁然开朗了!)


由图可知,flume中需要配置:

source:读取数据源(此处的数据源为kafka,即kafka source),将数据传输到channel中
channel:传输数据的通道
sink:从channel中读取数据,再将其存储到数据库中(此处的数据库为hive,即hive sink)
flume通过配置文件来运行,因此在`/opt/flume/conf`目录下创建` kafka2hive.conf`文件

1). 创建source、channel、sink:

a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=hive_sink
2). 配置kafka source,参考官方文档,kafka的地址、topic必须配置:

例如:

a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.batchSize=10
a.sources.source_from_kafka.kafka.bootstrap.servers=localhost:9092
a.sources.source_from_kafka.topic=test
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
3).配置hive sink:

例如:

a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
a.sinks.hive_sink.hive.database=hivedatabase
a.sinks.hive_sink.hive.table=hivetable
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=10
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=id,name,age
 4).完整的kafka2hive.conf文件为:

a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=hive_sink
#kafka为souce的配置
a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.batchSize=10
a.sources.source_from_kafka.kafka.bootstrap.servers=localhost:9092
a.sources.source_from_kafka.topic=test
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
#hive为sink的配置
a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
a.sinks.hive_sink.hive.database=hivedatabase
a.sinks.hive_sink.hive.table=hivetable
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=10
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=id,name,age
#channel的配置
a.channels.mem_channel.type=memory
a.channels.mem_channel.capacity=1000
a.channels.mem_channel.transactionCapacity=100
#三者之间的关系
a.sources.source_from_kafka.channels=mem_channel
a.sinks.hive_sink.channel=mem_channel
3、运行 
运行flume
$ flume-ng agent -n a -c /opt/flume/conf -f /opt/flume/conf/kafka2hive.conf -Dflume.root.logger=INFO,console
向kafka的topic中传输JSON格式的数据:
>{"id":1,"name":"snowty","age":25}
查看hive表:
select * from hivetable;
 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐