HiveStreaming作用

hive传统的数据导入方式是批量导入,HiveStreaming支持流式数据导入。比如说从kafka批量中连续不断的导入数据到hive表中,传统的导入方式难以胜任该需求。

HiveStreaming使用要求

  1. HiveStreaming 需要配合hive 事务表使用,并且现阶段表的存储方式必须为 orc 格式。
    在 hive-site.xml 中设置如下参数以支持hive事务表
    hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    hive.compactor.initiator.on = true (See more important details here)
    hive.compactor.worker.threads > 0
    更多hive transaction table的知识 请参考官网 https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions

  2. 建表时指定表为事务表 tblproperties(“transactional”=“true”)

  3. hive表必须为分区分桶表,静态分区,动态分区都可以

综述:HiveStreaming 导入hive中的表建表语句模板如下
create table db_name.table_name (
column_name column_type,
.
.
.
column_name column_type)
partitioned by (column_partition type)
clustered by (column_clustered) into 10 buckets
stored as orc tblproperties(“transactional”=“true”);

HiveStreaming API 使用

  1. maven 依赖
HiveStreaming 专属的jar 标签

<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-streaming</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-streaming</artifactId>
<version>${hive.version}</version>
</dependency>

如下的jar也需要,不然运行的时候回报错,编译的时候不会报错
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
  1. .集群环境准备
    建议程序直接加载集群中使用的如下配置文件
    core-site.xml
    hdfs-site.xml
    hive-site.xml

  2. 代码开发
    3.1 静态分区

String dbName = "testing";
String tblName = "alerts";
 
// static partition values
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
 
// create delimited record writer whose schema exactly matches table schema
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
                                      .withFieldDelimiter(',')
                                      .build();
// writer 指定流式写入hive表的数据的格式,此处使用的是逗号分隔的CSV文件;也可以使用 StrictJsonWriter 来输出json格式的数据


// create and open streaming connection (default.src table has to exist already)
StreamingConnection connection = HiveStreamingConnection.newBuilder()
                                    .withDatabase(dbName)
                                    .withTable(tblName)
                                    .withStaticPartitionValues(partitionVals)
                                    .withAgentInfo("example-agent-1")
                                    .withRecordWriter(writer)
                                    .withHiveConf(hiveConf)
                                    .connect();
// 创建连接,StreamingConnection 连接会长期存活,该连接中会有独立的线程定时发送心跳以保持存活,心跳的时间间隔hive.txn.timeout / 2
// begin a transaction, write records and commit 1st transaction
connection.beginTransaction();
connection.write("1,val1".getBytes());
connection.write("2,val2".getBytes());
connection.commitTransaction();
// begin another transaction, write more records and commit 2nd transaction
connection.beginTransaction();
connection.write("3,val3".getBytes());
connection.write("4,val4".getBytes());
connection.commitTransaction();
// close the streaming connection
connection.close();

3.2 动态分区

// dynamic partitioning
// create delimited record writer whose schema exactly matches table schema
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
                                      .withFieldDelimiter(',')
                                      .build();
// create and open streaming connection (default.src table has to exist already)
StreamingConnection connection = HiveStreamingConnection.newBuilder()
                                    .withDatabase(dbName)
                                    .withTable(tblName)
                                    .withAgentInfo("example-agent-1")
                                    .withRecordWriter(writer)
                                    .withHiveConf(hiveConf)
                                    .connect();
// begin a transaction, write records and commit 1st transaction
connection.beginTransaction();
// dynamic partition mode where last 2 columns are partition values
connection.write("11,val11,Asia,China".getBytes());
connection.write("12,val12,Asia,India".getBytes());
connection.commitTransaction();
// begin another transaction, write more records and commit 2nd transaction
connection.beginTransaction();
connection.write("13,val13,Europe,Germany".getBytes());
connection.write("14,val14,Asia,India".getBytes());
connection.commitTransaction();
// close the streaming connection
connection.close();

补充说明

Transaction 事务; 每一个transaction 都会在hive表的hdfs存储路径下创建一个delta 文件。
备注:hive事务表的数据文件分为delta_XX 增量文件和 base_xx 基础文件,update delete insert 等操作,数据会写到增量文件中,hive引擎会定期合并,压缩小的增量文件
以提升性能。
TransactionBatch :Transaction 属于TransactionBatch ,默认情况下,一个TransactionBatch 包含一个Transaction。建议:忽略该概念,使用默认值,API中对用户是透明的。

connection.beginTransaction();
connection.write(data.getBytes());
connection.commitTransaction();
这种方式,一个事务提交一条数据会导致数据插入hive 特别慢,而且很消耗内存。
建议每个 Transaction 提交记录数为 几千条。

HiveStreaming 更多知识点请参考官网 https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest

下面是本人新建的学习交流群,欢迎大家进群,请不要发送与行业不相关的信息,尊重彼此的时间精力!
Alt

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐