1.需求:

有这样一个需求。要从kafka采集数据然后写到hdfs上。要直接写为orc格式。直接给orc的表来用。

2.解决问题

  1) 我们首先想到用flume的 kafka source 和hdfs sink。但是事实是hdfs不能直接写为orc格式。除非自己再重新封装hdfs sink。  (这样就要开发了)。下面是官网的hdfs sink(1.9版本) 说只允许那几种压缩和SequenceFile格式。

2) 自己开发一个采集程序。就是定义一个kafka消费者。然后去kafka上消费数据。然后自己写orc格式的文件到hdfs上。当然上面两种都是比较好的。但是缺点就是要自己开发耗时长。

 

3)简单粗暴。直接使用hive-sink

3 hive-sink的用法

1)了解配置文件

#类型为hive
tier1.sinks.k1.type=hive
#hive的元数据地址
tier1.sinks.k1.hive.metastore = thrift://master:9083
#hive的数据库
tier1.sinks.k1.hive.database = default
#表名字
tier1.sinks.k1.hive.table = t_rsd_tornado_event
#采集的数据放在哪个分区下
tier1.sinks.k1.hive.partition = %Y,%m,%d
#滚动
tier1.sinks.k1.round = true
#滚动时间
tier1.sinks.k1.roundValue = 60
tier1.sinks.k1.roundUnit = minute
#序列化
tier1.sinks.k1.serializer = DELIMITED、
#这2个就不说了,
tier1.sinks.k1.serializer.delimiter = "\t"
tier1.sinks.k1.serializer.serdeSeparator = '\t'

#表的字段
tier1.sinks.k1.serializer.fieldnames =id,device_id,src_obj,dest_obj,src_ip,dest_ip,src_mac,dest_mac,protocol,app_layer_protocol,src_domain,dest_domain,ip_version,src_port,dest_port,packet_size,package_data,payload,sig_id,signame,match_point,match_data,action,incident_level,incident_time,risk_level,incident_type,active,last_update_time,last_update_user,create_time,creator,data_from
#批量提交到hive表的条数
tier1.sinks.k1.batchSize=10000

2) 对hive表的要求

a:表必须是事物表

b:表必须是分区表,分桶表

3)依赖的问题

缺少hive的一个依赖。

将这两个依赖导入flume的lib目录

4) 我这里用的是apache版本的flume没有用cdh版本的。cdh版本的错误更加多~~

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐