版本说明:

  • flink-1.12.1
  • hive-2.3.4

(1)Flink SQL与Hive集成的架构图

在这里插入图片描述

(2)Flink 与 Hive 的集成包含两个层面

  • 一是利用了 Hive 的 MetaStore 作为持久化的 Catalog
    用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

  • 二是利用 Flink 来读写 Hive 的表。
    HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

(3)Flink SQL与Hive的集成配置

第一步:配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的环境变量

export HADOOP_CLASSPATH=`hadoop classpath`

第二步:将hive的jar包复制到flink的lib目录下

  • flink-connector-hive_2.11-1.12.1.jar
  • hive-exec-2.3.4.jar
  • flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar

flink-connector-hive_2.11-1.12.1.jar这个包在maven仓库下载:

flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar这个包在maven仓库下载:

第三步:添加Maven 依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>${hive.version}</version>
</dependency>

(4)测试Flink SQL与Hive集成代码

package com.aikfk.flink.sql;

public class CommonSQL {

    public static final String hiveCatalog_name = "flink_udata";
    public static final String hiveDatabase_name = "flink";
    public static final String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
    public static final String version = "2.3.4";


    public static final String user_product_hive_create = "CREATE  TABLE  user_product_hive (\n" +
            "  user_id STRING,\n" +
            "  product_id STRING,\n" +
            "  click_count INT" +
            ") partitioned by (dt string,hr string) " +
            "stored as PARQUET " +
            "TBLPROPERTIES (\n" +
            "  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" +
            "  'sink.partition-commit.delay'='5 s',\n" +
            "  'sink.partition-commit.trigger'='partition-time',\n" +
            "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
            ")";

    public static final String user_product_kafka_create =
            "CREATE TABLE user_product_kafka (\n" +
            " user_id STRING," +
            " product_id STRING," +
            " click_count INT ," +
            " ts bigint ," +
            " r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),\n" +
            " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND " +
            ") WITH (" +
            " 'connector' = 'kafka'," +
            " 'topic' = 'kfk'," +
            " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
            " 'properties.group.id' = 'test1'," +
            " 'format' = 'json'," +
            " 'scan.startup.mode' = 'latest-offset'" +
            ")";

    public static final String user_product_kafka_drop ="DROP TABLE IF EXISTS user_product_kafka";
    public static final String user_product_hive_drop ="DROP TABLE IF EXISTS user_product_hive";

    public static final String user_product_kafka_insert_hive =
            "insert into user_product_hive SELECT user_id, product_id, click_count, " +
            " DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM user_product_kafka";

}
package com.aikfk.flink.sql.hive;

import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class FlinkKafkaHive {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);

        env.enableCheckpointing(5000);

        HiveCatalog hiveCatalog =
                new HiveCatalog(
                        CommonSQL.hiveCatalog_name,
                        CommonSQL.hiveDatabase_name,
                        CommonSQL.hiveConfDir,
                        CommonSQL.version
                        );
        tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
        tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);

        tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_drop);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_create);

        tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnvironment.executeSql(CommonSQL.user_product_hive_drop);
        tableEnvironment.executeSql(CommonSQL.user_product_hive_create);

        tableEnvironment.executeSql(CommonSQL.user_product_kafka_insert_hive).print();

        env.execute();
    }
}

(5)测试kafka数据源与hive写入数据

生产者生产数据:

package com.aikfk.flink.base;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class KafkaProducerUtil extends Thread {

        private String topic = "kfk";

        public KafkaProducerUtil() {
            super();
        }

        private Producer<String, String> createProducer() {
            // 通过Properties类设置Producer的属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            return new KafkaProducer<String, String>(properties);
        }

        @Override
        public void run() {
            Producer<String, String> producer = createProducer();
            Random random = new Random();
            Random random2 = new Random();

            while (true) {
                String user_id = "user_"+random.nextInt(10);
                String product_id = "product_"+random2.nextInt(100);
                System.out.println(user_id + " :" + product_id);
                String time = System.currentTimeMillis() / 1000 + 5 + "";
                try {
//
                    String kaifa_log = "{" +
                            "\"user_id\":\"" + user_id+"\"," +
                            "\"product_id\":\"" + product_id+"\"," +
                            "\"click_count\":\"1" + "\"," +
                            "\"ts\":" + time + "}";
                    System.out.println("kaifa_log = " + kaifa_log);
                    producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));

                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("=========循环一次==========");


                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public static void main(String[] args) {
            new KafkaProducerUtil().run();
        }

}

生产结果:

user_7 :product_74
kaifa_log = {"user_id":"user_7","product_id":"product_74","click_count":"1","ts":1618228651}
=========循环一次==========
user_5 :product_62
kaifa_log = {"user_id":"user_5","product_id":"product_62","click_count":"1","ts":1618228653}
=========循环一次==========
user_9 :product_50
kaifa_log = {"user_id":"user_9","product_id":"product_50","click_count":"1","ts":1618228654}
=========循环一次==========

通过flink SQL client查看数据

Flink SQL> select * from user_product_kafka;

在这里插入图片描述
查看hive写入数据情况:问题暂未解决!可能是jar包问题

关于Flink SQL与hive的集成更多见官方文档:


以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐