一、官网介绍:

1、clickhouse集成hive、kafka官网介绍:

https://clickhouse.tech/docs/en/engines/table-engines/integrations/hdfs/
https://clickhouse.tech/docs/en/engines/table-engines/integrations/kafka/

2、clickhouse支持的数据类型:

https://clickhouse.tech/docs/en/interfaces/formats/#data_types-matching

二、创建对应hive的clickhouse表

1、利用clickhouse的HDFS Engine

建表语句

CREATE TABLE company_employees.employees(
    name String,
    age String,
    email String,
    addr String
  ) ENGINE =  HDFS('hdfs://{active_namenode}:8020/ods/statis_date=202{0..9}{0..9}{0..9}{0..9}{0..9}/statis_hour={0,1,2}{0..9}/*', 'ORC')

官网举例:

Example

Suppose we have several files in TSV format with the following URIs on HDFS:
'hdfs://hdfs1:9000/some_dir/some_file_1'
'hdfs://hdfs1:9000/some_dir/some_file_2'
'hdfs://hdfs1:9000/some_dir/some_file_3'
'hdfs://hdfs1:9000/another_dir/some_file_1'
'hdfs://hdfs1:9000/another_dir/some_file_2'
'hdfs://hdfs1:9000/another_dir/some_file_3'
There are several ways to make a table consisting of all six files:
CREATE TABLE table_with_range (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/some_file_{1..3}', 'TSV')
Another way:

CREATE TABLE table_with_question_mark (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/some_file_?', 'TSV')
Table consists of all the files in both directories (all files should satisfy format and schema described in query):

CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/*', 'TSV')

三、集成hive建表需要注意的地方(详细解析)

1、schema必须要与hive表中一致

  • 如果hive为分区表,分区字段不能写入建表语句中
  • 建表语句的字段完全与hive表中字段一致

2、格式要求

  • 建表语句中的格式跟hive表格式一致

3、路径解析

常用的hive表基本都是分区表,因此clickhouse建表路径要包含所有分区路径
*
我构建的hive表的分区如下所示
在这里插入图片描述
对应的路径写法为:

/ods/statis_date=202{0..9}{0..9}{0..9}{0..9}{0..9}/statis_hour={0,1,2}{0..9}/*', 'ORC')

4、hdfs路径

将路径写为hdfs://{active_namenode}:8020,如果采用cluster或者standby节点回获取不到数据,cluster模式可能也有办法,比如导入core-site.xml文件啥的,但是我没有试过

四、关于增量数据同步问题,因为我配置了增量数据同步,经测试clickhouse表也可以正常同步hive增量数据至创建的表中。

五、创建对应kafka的clickhouse表

1、源表创建

CREATE TABLE service_flow.service_flow_increment_resource (
    mac String,
    rxTraffic double,
    updateTime String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'node1:9092',
                            kafka_topic_list = 'service-flow-increment',
                            kafka_group_name = 'kafka_flow_test',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 1;

2、同步后的表创建

CREATE TABLE service_flow.service_flow_increment(
    mac String,
    rxTraffic double,
    updateTime String
)ENGINE = MergeTree()
ORDER BY updateTime
SETTINGS index_granularity = 8192;

3、同步视图创建

视图里面可以进行一系列操作,使得源数据经处理后符合自己的数据需求同步至实际的表中

CREATE MATERIALIZED VIEW service_flow.service_flow_increment_consumer TO service_flow.service_flow_increment
    AS SELECT 
	mac, 
	rxTraffic,
	updateTime
    FROM service_flow.service_flow_increment_resource;
    
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐