Flink SQL与Hive的集成
版本说明:flink-1.12.1hive-2.3.4目录(1)Flink SQL与Hive集成的架构图(2)Flink 与 Hive 的集成包含两个层面(3)Flink SQL与Hive的集成配置(4)测试Flink SQL与Hive集成代码(5)测试kafka数据源与hive写入数据(1)Flink SQL与Hive集成的架构图(2)Flink 与 Hive 的集成包含两个层面一是利用了 Hi
版本说明:
- flink-1.12.1
- hive-2.3.4
目录
(1)Flink SQL与Hive集成的架构图

(2)Flink 与 Hive 的集成包含两个层面
-
一是利用了 Hive 的 MetaStore 作为持久化的 Catalog
用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。 -
二是利用 Flink 来读写 Hive 的表。
HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
(3)Flink SQL与Hive的集成配置
第一步:配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
第二步:将hive的jar包复制到flink的lib目录下
- flink-connector-hive_2.11-1.12.1.jar
- hive-exec-2.3.4.jar
- flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar
flink-connector-hive_2.11-1.12.1.jar这个包在maven仓库下载:
flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar这个包在maven仓库下载:
第三步:添加Maven 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
(4)测试Flink SQL与Hive集成代码
package com.aikfk.flink.sql;
public class CommonSQL {
public static final String hiveCatalog_name = "flink_udata";
public static final String hiveDatabase_name = "flink";
public static final String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
public static final String version = "2.3.4";
public static final String user_product_hive_create = "CREATE TABLE user_product_hive (\n" +
" user_id STRING,\n" +
" product_id STRING,\n" +
" click_count INT" +
") partitioned by (dt string,hr string) " +
"stored as PARQUET " +
"TBLPROPERTIES (\n" +
" 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" +
")";
public static final String user_product_kafka_create =
"CREATE TABLE user_product_kafka (\n" +
" user_id STRING," +
" product_id STRING," +
" click_count INT ," +
" ts bigint ," +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),\n" +
" WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'kfk'," +
" 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
" 'properties.group.id' = 'test1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'latest-offset'" +
")";
public static final String user_product_kafka_drop ="DROP TABLE IF EXISTS user_product_kafka";
public static final String user_product_hive_drop ="DROP TABLE IF EXISTS user_product_hive";
public static final String user_product_kafka_insert_hive =
"insert into user_product_hive SELECT user_id, product_id, click_count, " +
" DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM user_product_kafka";
}
package com.aikfk.flink.sql.hive;
import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaHive {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
env.enableCheckpointing(5000);
HiveCatalog hiveCatalog =
new HiveCatalog(
CommonSQL.hiveCatalog_name,
CommonSQL.hiveDatabase_name,
CommonSQL.hiveConfDir,
CommonSQL.version
);
tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);
tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnvironment.executeSql(CommonSQL.user_product_kafka_drop);
tableEnvironment.executeSql(CommonSQL.user_product_kafka_create);
tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnvironment.executeSql(CommonSQL.user_product_hive_drop);
tableEnvironment.executeSql(CommonSQL.user_product_hive_create);
tableEnvironment.executeSql(CommonSQL.user_product_kafka_insert_hive).print();
env.execute();
}
}
(5)测试kafka数据源与hive写入数据
生产者生产数据:
package com.aikfk.flink.base;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerUtil extends Thread {
private String topic = "kfk";
public KafkaProducerUtil() {
super();
}
private Producer<String, String> createProducer() {
// 通过Properties类设置Producer的属性
Properties properties = new Properties();
properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<String, String>(properties);
}
@Override
public void run() {
Producer<String, String> producer = createProducer();
Random random = new Random();
Random random2 = new Random();
while (true) {
String user_id = "user_"+random.nextInt(10);
String product_id = "product_"+random2.nextInt(100);
System.out.println(user_id + " :" + product_id);
String time = System.currentTimeMillis() / 1000 + 5 + "";
try {
//
String kaifa_log = "{" +
"\"user_id\":\"" + user_id+"\"," +
"\"product_id\":\"" + product_id+"\"," +
"\"click_count\":\"1" + "\"," +
"\"ts\":" + time + "}";
System.out.println("kaifa_log = " + kaifa_log);
producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("=========循环一次==========");
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new KafkaProducerUtil().run();
}
}
生产结果:
user_7 :product_74
kaifa_log = {"user_id":"user_7","product_id":"product_74","click_count":"1","ts":1618228651}
=========循环一次==========
user_5 :product_62
kaifa_log = {"user_id":"user_5","product_id":"product_62","click_count":"1","ts":1618228653}
=========循环一次==========
user_9 :product_50
kaifa_log = {"user_id":"user_9","product_id":"product_50","click_count":"1","ts":1618228654}
=========循环一次==========
通过flink SQL client查看数据
Flink SQL> select * from user_product_kafka;

查看hive写入数据情况:问题暂未解决!可能是jar包问题
关于Flink SQL与hive的集成更多见官方文档:
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
更多推荐
所有评论(0)