版本说明:

  1. Flink 1.11.1
  2. Kafka 2.4.0
  3. Hive 2.3.6
  4. Hadoop 2.7.3

详细步骤:

  1. 准备相关jar包
hive-exec-2.3.6.jar
hive-metastore-2.3.6.
libfb303-0.9.3.jarjar
kafka-clients-2.4.0.jar
flink-sql-connector-hive-2.3.6_2.11-1.11.1.jar
flink-sql-connector-kafka_2.11-1.11.1.jar
flink-shaded-hadoop2-uber-2.7.5-1.8.3.jar
  1. 修改配置文件

修改$FLINK_HOME/conf/sql-client-defaults.yaml,主要修改两个地方:

catalogs:
  - name: myhive
    type: hive
    hive-conf-dir: /vm_data/apps/apache-hive-2.3.6-bin/conf
    default-database: default
execution:
    current-catalog: myhive

修改$FLINK_HOME/bin/start-cluster.sh,加入Hadoop Classpath:

export HADOOP_CLASSTHPATH=`hadoop classthpath`

修改$HIVE_HOME/conf/hive-site.xml

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://hadoop000:9083</value>
</property>
  1. 启动Flink SQL Client

需要先启动Flink Standalone集群

$FLINK_HOME/bin/start-cluster.sh
$FLINK_HOME/bin/sql-client.sh embedded
  1. 在Flink SQL Client中创建Hive表,指定数据源为Kafka
    在配置文件中配置了默认为default库
CREATE TABLE student(
  id INT,
  name STRING,
  password STRING,
  age INT,
  ts BIGINT,
  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
  WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal', -- 指定Kafka连接器版本,不能为2.4.0,必须为universal,否则会报错
  'connector.topic' = 'student', -- 指定消费的topic
  'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置
  'connector.properties.zookeeper.connect' = 'hadoop000:2181',
  'connector.properties.bootstrap.servers' = 'hadooop000:9092',
  'connector.properties.group.id' = 'student_1',
  'format.type' = 'json',
  'format.derive-schema' = 'true', -- 由表schema自动推导解析JSON
  'update-mode' = 'append'
);
  1. 启动Kafka,发送数据
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list hadoop000:9092 --topic student
{"id":12, "name":"kevin", "password":"wong", "age":22, "ts":1603769073}
  1. 通过Flink SQL Client查询表中的数据
select * from student

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐