HiveStreaming使用笔记
HiveStreaming使用笔记HiveStreaming作用HiveStreaming使用要求HiveStreaming API 使用补充说明HiveStreaming作用hive传统的数据导入方式是批量导入,HiveStreaming支持流式数据导入。比如说从kafka批量中连续不断的导入数据到hive表中,传统的导入方式难以胜任该需求。HiveStreaming使用要求Hive...
HiveStreaming使用笔记
HiveStreaming作用
hive传统的数据导入方式是批量导入,HiveStreaming支持流式数据导入。比如说从kafka批量中连续不断的导入数据到hive表中,传统的导入方式难以胜任该需求。
HiveStreaming使用要求
-
HiveStreaming 需要配合hive 事务表使用,并且现阶段表的存储方式必须为 orc 格式。
在 hive-site.xml 中设置如下参数以支持hive事务表
hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true (See more important details here)
hive.compactor.worker.threads > 0
更多hive transaction table的知识 请参考官网 https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions -
建表时指定表为事务表 tblproperties(“transactional”=“true”)
-
hive表必须为分区分桶表,静态分区,动态分区都可以
综述:HiveStreaming 导入hive中的表建表语句模板如下
create table db_name.table_name (
column_name column_type,
.
.
.
column_name column_type)
partitioned by (column_partition type)
clustered by (column_clustered) into 10 buckets
stored as orc tblproperties(“transactional”=“true”);
HiveStreaming API 使用
- maven 依赖
HiveStreaming 专属的jar 标签
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-streaming</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-streaming</artifactId>
<version>${hive.version}</version>
</dependency>
如下的jar也需要,不然运行的时候回报错,编译的时候不会报错
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
-
.集群环境准备
建议程序直接加载集群中使用的如下配置文件
core-site.xml
hdfs-site.xml
hive-site.xml -
代码开发
3.1 静态分区
String dbName = "testing";
String tblName = "alerts";
// static partition values
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
// create delimited record writer whose schema exactly matches table schema
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
// writer 指定流式写入hive表的数据的格式,此处使用的是逗号分隔的CSV文件;也可以使用 StrictJsonWriter 来输出json格式的数据
// create and open streaming connection (default.src table has to exist already)
StreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("example-agent-1")
.withRecordWriter(writer)
.withHiveConf(hiveConf)
.connect();
// 创建连接,StreamingConnection 连接会长期存活,该连接中会有独立的线程定时发送心跳以保持存活,心跳的时间间隔hive.txn.timeout / 2
// begin a transaction, write records and commit 1st transaction
connection.beginTransaction();
connection.write("1,val1".getBytes());
connection.write("2,val2".getBytes());
connection.commitTransaction();
// begin another transaction, write more records and commit 2nd transaction
connection.beginTransaction();
connection.write("3,val3".getBytes());
connection.write("4,val4".getBytes());
connection.commitTransaction();
// close the streaming connection
connection.close();
3.2 动态分区
// dynamic partitioning
// create delimited record writer whose schema exactly matches table schema
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
// create and open streaming connection (default.src table has to exist already)
StreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withAgentInfo("example-agent-1")
.withRecordWriter(writer)
.withHiveConf(hiveConf)
.connect();
// begin a transaction, write records and commit 1st transaction
connection.beginTransaction();
// dynamic partition mode where last 2 columns are partition values
connection.write("11,val11,Asia,China".getBytes());
connection.write("12,val12,Asia,India".getBytes());
connection.commitTransaction();
// begin another transaction, write more records and commit 2nd transaction
connection.beginTransaction();
connection.write("13,val13,Europe,Germany".getBytes());
connection.write("14,val14,Asia,India".getBytes());
connection.commitTransaction();
// close the streaming connection
connection.close();
补充说明
Transaction 事务; 每一个transaction 都会在hive表的hdfs存储路径下创建一个delta 文件。
备注:hive事务表的数据文件分为delta_XX 增量文件和 base_xx 基础文件,update delete insert 等操作,数据会写到增量文件中,hive引擎会定期合并,压缩小的增量文件
以提升性能。
TransactionBatch :Transaction 属于TransactionBatch ,默认情况下,一个TransactionBatch 包含一个Transaction。建议:忽略该概念,使用默认值,API中对用户是透明的。
connection.beginTransaction();
connection.write(data.getBytes());
connection.commitTransaction();
这种方式,一个事务提交一条数据会导致数据插入hive 特别慢,而且很消耗内存。
建议每个 Transaction 提交记录数为 几千条。
HiveStreaming 更多知识点请参考官网 https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
下面是本人新建的学习交流群,欢迎大家进群,请不要发送与行业不相关的信息,尊重彼此的时间精力!
更多推荐
所有评论(0)