ClickHouse表引擎之Integration系列
Integration系统表引擎主要用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。1 Kafka1.1 Kafka引擎将Kafka Topic中的数据直接导入到ClickHouse。语法如下:CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster](name1 [type1
Integration系统表引擎主要用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。
1 Kafka
1.1 Kafka引擎
将Kafka Topic中的数据直接导入到ClickHouse。
语法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0]
参数说明:
①必需的参数
参数 | 说明 |
---|---|
kafka_broker_list | Kafka broker列表,以逗号分隔 |
kafka_topic_list | Kafka topic列表 |
kafka_group_name | Kafka消费者组,如果不希望消息在集群中重复,使用相同的组名 |
kafka_format | 消息格式。使用与SQL格式函数相同的符号,例如JSONEachRow |
②可选参数
参数 | 说明 |
---|---|
kafka_row_delimiter | 分隔符字符,用于一行的结束标识符号 |
kafka_schema | 如果kafka_format参数需要schema定义,则通过该参数来支持 |
kafka_num_consumers | 每张表的消费者个数。默认值:1。如果一个使用者的吞吐量不足,则指定更多使用者。使用者的总数不应该超过主题中的分区数,因为每个分区只能分配一个使用者。 |
kafka_max_block_size | 轮询的最大批处理大小 |
kafka_skip_broken_messages | 忽略无效记录的条数。默认值:0 |
kafka_commit_every_batch | 在编写整个块之后提交每个使用和处理的批而不是单个提交(默认值:0) |
测试:(1)建表
CREATE TABLE test_kafka (\
timestamp UInt64,\
level String,\
message String\
) ENGINE = Kafka() SETTINGS kafka_broker_list = 'ambari01:6667,ambari02:6667,ambari03:6667',\
kafka_topic_list = 'test',\
kafka_group_name = 'group1',\
kafka_format = 'JSONEachRow',\
kafka_row_delimiter = '\n'
注意:如果后面在查询过程中报如下错误。是因为有些引擎版本存在的,消息中数据之间的分割符号未指定,导致无法处理。解决办法: 添加 kafka_row_delimiter = ‘\n’。
Cannot parse input: expected { before: \0: (at row 2)
(2)在kafka建立一个新的topic
sh kafka-topics.sh --create --zookeeper ambari01:2181,ambari02:2181,ambari03:2181 --replication-factor 1 --partitions 3 --topic test
(3)在kafka建立发布者console-producer
sh kafka-console-producer.sh --broker-list ambari01:6667,ambari02:6667,ambari03:6667 --topic test
(4)发送数据
{"timestamp":1515897460,"level":"one","message":"aa"}
注意:由于一个kafka的partition 只能由一个 group consumer 消费,所以clickhouse 节点数需要大于 topic 的 partition 数。
(5)第一次查询
SELECT *
FROM test_kafka
┌──timestamp─┬─level─┬─message─┐
│ 1515897460 │ one │ aa │
└────────────┴───────┴─────────┘
(6)第二次查询
SELECT *
FROM test_kafka
Ok.
发现第二次查询的时候没有数据了,因为 Kafka引擎 表只是 kafka 流的一个视图而已,当数据被 select 了一次之后,这个数据就会被认为已经消费了,下次 select 就不会再出现。所以Kafka表单独使用是没什么用的,一般是用来和 MaterialView 配合,将Kafka表里面的数据自动导入到 MaterialView 里面。
(7)与 MaterialView 集成
我们现在每一节点建一个 MaterialView 保存 Kafka 里面的数据, 再建一个全局的Distributed表。
CREATE MATERIALIZED VIEW test_kafka_view ENGINE = SummingMergeTree() PARTITION BY day ORDER BY (day, level) AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as count FROM test_kafka GROUP BY day, level;
(6)再次发送数据
{"timestamp":1515897461,"level":"2","message":'bb'}
{"timestamp":1515897462,"level":"3","message":'cc'}
{"timestamp":1515897462,"level":"3","message":'ee'}
{"timestamp":1515897463,"level":"4","message":'dd'}
(7)查询数据
SELECT *
FROM test_kafka
Ok.
0 rows in set. Elapsed: 2.686 sec.
---------------------------------------
SELECT *
FROM test_kafka_view
Ok.
0 rows in set. Elapsed: 0.002 sec.
发现没有数据,原因:kafka 引擎默认消费根据条数与时间进行入库,不然肯定是没效率的。其中对应的参数有两个。 max_insert_block_size(默认值为: 1048576),stream_flush_interval_ms(默认值为: 7500)这两个参数都是全局性的。
业务系统需要从kafka读取数据,按照官方文档建好表后,也能看到数据,但是延时很高。基本要延时15分钟左右。kafka的数据大约每秒50条左右。基本规律是累计到65535行以后(最小的块大小)才会在表中显示数据。尝试更改stream_flush_interval_ms 没有作用,但是有不想改max_block_size,因为修改以后影响到全局所有表,并且影响搜索效率。希望能每N秒保证不管block有没有写满都flush一次。
虽然ClickHouse和 Kafka的配合可以说是十分的便利,只有配置好,但是相当的局限性对 kafka 数据格式的支持也有限。下面介绍WaterDrop这个中间件将Kafka的数据接入ClickHouse。
1.2 WaterDrop
WaterDrop: 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark和 Apache Flink之上。github地址:https://github.com/InterestingLab/waterdrop
①下载并解压
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.4.3/waterdrop-1.4.3.zip
unzip waterdrop-1.4.3.zip
②修改配置文件waterdrop-env.sh
vim /opt/module/waterdrop-1.4.3/config/waterdrop-env.sh
SPARK_HOME=/usr/jdp/3.2.0.0-108/spark2 #配置为spark的路径
③增加配置文件test.conf
spark {
spark.streaming.batchDuration = 5
spark.app.name = "test_waterdrop"
spark.ui.port = 14020
spark.executor.instances = 3
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
kafkaStream {
topics = "test_wd"
consumer.bootstrap.servers = "10.0.0.50:6667,10.0.0.52:6667,10.0.0.53:6667"
consumer.zookeeper.connect = "10.0.0.50:2181,10.0.0.52:2181,10.0.0.53:2181"
consumer.group.id = "group1"
consumer.failOnDataLoss = false
consumer.auto.offset.reset = latest
consumer.rebalance.max.retries = 100
}
}
filter {
json{
source_field = "raw_message"
}
}
output {
clickhouse {
host = "10.0.0.50:8123"
database = "test"
table = "test_wd"
fields = ["act","b_t","s_t"]
username = "admin"
password = "admin"
retry_codes = [209, 210 ,1002]
retry = 10
bulk_size = 1000
}
}
④创建Clickhouse表
create table test.test_wd( act String, b_t String, s_t Date) ENGINE = MergeTree() partition by s_t order by s_t;
⑤启动写入程序
cd /data/work/waterdrop-1.4.1
sh /opt/module/waterdrop-1.4.3/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/module/waterdrop-1.4.3/config/test.conf
⑥插入数据
{"act":"aaaa","b_t":"100","s_t":"2019-12-22"}
{"act":"bxc","b_t":"200","s_t":"2020-01-01"}
{"act":"dd","b_t":"50","s_t":"2020-02-01"}
⑦查看表数据
SELECT *
FROM test_wd
┌─act─┬─b_t─┬────────s_t─┐
│ dd │ 50 │ 2020-02-01 │
└─────┴─────┴────────────┘
┌─act──┬─b_t─┬────────s_t─┐
│ aaaa │ 100 │ 2019-12-22 │
└──────┴─────┴────────────┘
┌─act─┬─b_t─┬────────s_t─┐
│ bxc │ 200 │ 2020-01-01 │
└─────┴─────┴────────────┘
2 MySQL
将Mysql作为存储引擎,可以对存储在远程 MySQL 服务器上的数据执行 select查询
语法:
MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
参数说明
参数 | 说明 |
---|---|
host:port | MySQL 服务器地址 |
database | 数据库的名称 |
table | 表名称 |
user | 数据库用户 |
password | 用户密码 |
replace_query | 将 INSERT INTO 查询是否替换为 REPLACE INTO 的标志。如果 replace_query=1,则替换查询 |
on_duplicate_clause | 将 ON DUPLICATE KEY UPDATE on_duplicate_clause 表达式添加到 INSERT 查询语句中。 |
测试:
在Mysql中建表,并插入数据
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`username` varchar(50) DEFAULT NULL,
`sex` varchar(5) DEFAULT NULL
)
INSERT INTO user values(11,"zs","0");
INSERT INTO user values(12,"ls","0");
INSERT INTO user values(13,"ww","0");
INSERT INTO user values(14,"ll","1");
创建ClickHouse表,insert_time字段为默认字段
CREATE TABLE test.from_mysql(\
id UInt64,\
username String,\
sex String,\
insert_time Date DEFAULT toDate(now())\
) ENGINE = MergeTree()\
PARTITION BY insert_time \
ORDER BY (id,username)
插入数据
INSERT INTO test.from_mysql (id,username,sex) SELECT id, username,sex FROM mysql('10.0.0.42:3306','test', 'user', 'root', 'admin');
查询数据
SELECT *
FROM from_mysql
┌─id─┬─username─┬─sex─┬─insert_time─┐
│ 11 │ zs │ 0 │ 2020-05-24 │
│ 12 │ ls │ 0 │ 2020-05-24 │
│ 13 │ ww │ 0 │ 2020-05-24 │
│ 14 │ ll │ 1 │ 2020-05-24 │
└────┴──────────┴─────┴─────────────┘
4 rows in set. Elapsed: 0.003 sec.
3 HDFS
用户通过执行SQL语句,可以在ClickHouse中直接读取HDFS的文件,也可以将读取的数据导入到ClickHouse本地表。
HDFS引擎:ENGINE = HDFS(URI, format)。URI:HDFS的URI,format:存储格式,格式链接https://clickhouse.tech/docs/en/interfaces/formats/#formats
3.1 查询文件
这种使用场景相当于把HDFS做为ClickHouse的外部存储,当查询数据时,直接访问HDFS的文件,而不是把HDFS文件导入到ClickHouse再进行查询。相对于ClickHouse的本地存储查询,速度较慢。
在HDFS上新建一个数据文件:user.csv,上传hadoop fs -cat /user/test/user.csv,内容如下:
1,zs,18
2,ls,19
4,wu,25
3,zl,22
在ClickHouse上创建一个访问user.csv文件的表:
CREATE TABLE test_hdfs_csv(\
id UInt64,\
name String,\
age UInt8\
)ENGINE = HDFS('hdfs://ambari01:8020/user/test/user.csv', 'CSV')
查询hdfs_books_csv表
SELECT *
FROM test_hdfs_csv
┌─id─┬─name─┬─age─┐
│ 1 │ zs │ 18 │
│ 2 │ ls │ 19 │
│ 4 │ wu │ 25 │
│ 3 │ zl │ 22 │
└────┴──────┴─────┘
3.2 从HDFS导入数据
从HDFS导入数据,数据在ClickHouse本地表,建本地表
CREATE TABLE test_hdfs_local(\
id UInt64,\
name String,\
age UInt8\
)ENGINE = Log
在数据存储目录下可以找到这个表的文件夹
/data/clickhouse/data/test/test_hdfs_local
从HDFS导入数据
INSERT INTO test_hdfs_local SELECT * FROM test_hdfs_csv
查询
SELECT *
FROM test_hdfs_local
┌─id─┬─name─┬─age─┐
│ 1 │ zs │ 18 │
│ 2 │ ls │ 19 │
│ 4 │ wu │ 25 │
│ 3 │ zl │ 22 │
└────┴──────┴─────┘
更多推荐
所有评论(0)