clickhouse目前用在实时BI后台,只要数据稳定落库了,出报表很快,临时查询也很快,在使用过程中,对它的一些优点和不足也是深有体会,这里总结一下,不能做到面面俱到,但尽可能详细的介绍实际应用需要注意的问题和应用技巧。

我们是通过编写Flink程序,消费kafka数据,将数据清洗,扩充维度,然后落在clickhouse里面,半年以来,Flink程序很少出问题,数据落库也很稳定。对于clickhouse,使用的是腾讯云的clickhouse服务,有副本的集群,中间扩充了几次磁盘,服务也是挺稳定的,整体看来,整个BI后台,都能稳定的提供数据报表。为了书写方便,接下来clickhouse用ck缩写。

ck里面引用mysql外部数据表

通常需要在ck里面要用mysql里面的表,比如mysql里面存在一张维表,我们需要根据id查询出某个名称,这个时候,不需要把数据导一份过来,就可以把mysql表映射到ck里面,或者直接整个mysql数据库映射到ck某个库里面,就能操作mysql这个数据库所有表,使用sql语法关联查询mysql和ck的表。

MySQL引擎用于将远程的MySQL服务器中的表映射到ClickHouse中,并允许您对表进行INSERT和SELECT查询,以方便您在ClickHouse与MySQL之间进行数据交换。

创建数据库

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MySQL('host:port', ['database' | database], 'user', 'password')

比如,我们在mysql里面创建一张表:

mysql> USE test;
Database changed

mysql> CREATE TABLE `mysql_table` (
    ->   `int_id` INT NOT NULL AUTO_INCREMENT,
    ->   `float` FLOAT NOT NULL,
    ->   PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)

mysql> insert into mysql_table (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)

mysql> select * from mysql_table;
+------+-----+
| int_id | value |
+------+-----+
|      1 |     2 |
+------+-----+
1 row in set (0,00 sec)

我们去ck里面创建一个数据库,跟mysql这个数据库关联起来。

CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password')

这样就在ck里面创建了一个mysql_db,这个数据库跟mysql的test数据库是映射在一起了,我们在ck里面直接查询:

SELECT * FROM mysql_db.mysql_table

┌─int_id─┬─value─┐
│      1 │     2 │
└────────┴───────┘

数据库引擎可以是mysql,也可以是其它数据库,比如sqlite、PostgreSQL,更多可以查阅官方文档:

https://clickhouse.com/docs/zh/engines/database-engines

ck带副本的分布式表

带副本的分布式表,就是分布式表,并且单个part也是有副本的,刚开始我们建表时候,也是花了一些时间,回忆下当时的问题主要有以下:

1) 带副本的分布式表的创建问题,怎么创建?

开始我们也是创建错了,发现数据不完整,每次只有一半,后来得知腾讯云的服务是带副本的分布式集群,创建表也需要带副本的分布式表,不然数据有丢失,建表分2步,语句如下:

-- 第一步:创建本地表,这个表会在每个机器节点上面创建,不要漏了on cluster cluster_name
CREATE TABLE test.table_local on cluster default_cluster
(
    `id` Int32,
    `uid` String,
    `name` String,
    `time` Int32,
    `create_at` DateTime
)
ENGINE = ReplicatedMergeTree()
PARTITION BY toYYYYMM(create_at)
ORDER BY id;

-- 第二步:创建分布式表
CREATE TABLE test.table_all on cluster default_cluster as test.table_local 
ENGINE = Distributed('default_cluster', 'test', 'table_local', sipHash64(id));

参数说明:

ReplicatedMergeTree:带副本的表引擎;

PARTITION BY:数据分区方式;

sipHash64(id):分布式表在每个节点上面的数据分发方式;

具体可以看官方文档,地址:

https://clickhouse.com/docs/zh/engines/table-engines/mergetree-family/replication

后文,都已这张表为例。

2)分布式表,插入数据要每个节点都执行插入操作吗?

不需要,用标准sql语法插入分布式表即可,比如:

insert into test.table_all values(.....)

3)分布式表的更新删除操作,与mysql相同吗?

不相同,只能说是相似,按照模板来使用即可,alter table语法:

ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr

比如:

alter table test.table_local on cluster default_cluster update name = 'name' where id = 10000

注意:更新操作,需要用本地表test.table_local,不能用分布式表。

删除操作也是一样的:

alter table test.table_local on cluster default_cluster delete where id = 10000

4)分布式表,添加列,修改列的类型

-- 添加列
ALTER TABLE test.table_local ON CLUSTER default_cluster ADD COLUMN product String;

-- 修改列
ALTER TABLE test.table_local on cluster default_cluster MODIFY COLUMN uid Int64;

可以看到,ck带副本的表与标准sql语法的区别在于使用了alter table和on cluster关键字,使用时候,套用模板即可。其它的一些DDL操作可以看具体官方文档:

https://clickhouse.com/docs/zh/sql-reference/statements/alter

写性能

ck提倡低频、大批量写,每秒钟只写几次,每次插入上万、十万条数据,这是比较合适的。因为如果稍微了解一下底层原理就知道,ck会间隔合并数据块,不宜频繁写入导致频繁合并,影响性能。

在使用Flink导入数据的过程中,需要攒数据,批量写,我们通过Flink窗口函数积累数据,每次写5秒钟的一批数据。记得刚开始使用ck的时候,开发没注意这些,运维就说要批量写,后来基本就统一了。

添加索引需要注意

ck里面有一级稀疏索引,和二级跳数索引,二级索引是基于一级索引的,有时候一张表建完了,写入数据,我们发现查询需要用到一些字段,需要加索引,语句:

-- 添加索引
alter table test.table_local on cluster default_cluster add index index_uid uid type minmax(100) GRANULARITY 2;

-- 使索引生效,对历史数据也生效索引
ALTER TABLE test.table_local MATERIALIZE index index_uid;

也是用的alter table格式,这里需要注意的是,索引是在插入数据时候建立的,新建索引对历史数据是不生效的,需要让历史数据也生效。

数据去重

ReplacingMergeTree引擎表会删除排序键值相同的重复项,排序键值就是建表时候跟在order by后面的字段。ck对更新不友好,性能很差,于是可以利用这个引擎,每次只管写入,不需要更新,ck会自动帮我们保存最新版本。建表语句如下:

CREATE TABLE test.test_local on cluster default_cluster (
    `id` UInt64,
    `type` Int32,
    `username` String,
    `password` String,
    `phone` String COMMENT '手机号账户',
    `nick` String,
    `mobile` String,
    `insert_time` DateTime DEFAULT '2023-07-31 00:00:00'
) ENGINE = ReplicatedReplacingMergeTree()
partition by dt
order by id;

CREATE TABLE test.test_all on cluster default_cluster as test.test_local ENGINE = Distributed('default_cluster', 'test', 'test_local', sipHash64(id));

insert_time字段需要有,放在最后,便于ck根据时候保留最新数据。

数据的去重只会在数据合并期间进行。合并会在后台一个不确定的时间进行,因此你无法预先作出计划。有一些数据可能仍未被处理。通常使用OPTIMIZE 语句手动触发,比如今天程序异常停止了,我启动了程序, 大概率会有多个版本数据,这个时候需要手动合并一下:

OPTIMIZE table test.test_local on cluster default_cluster final;

这样会触发数据合并,这个过程耗费性能,正常情况下,如果没有多版本数据,不需要触发合并。如果没有触发,查询数据时候,会有多个版本,需要final关键字,查询时候合并一下,如果查询很多,将非常耗费性能,这个时候可以选择定期合并。

select * from test.test_all final where id = 10000

对于这种多个版本的表,有时候也是可以避开final的,比如去重,可以select distinct id from table,而不需要select distinct id from table final,这个final是可以省的,等等。

分布式表的删除

需要删除本地表和分布式表:

drop table test.test_local on cluster default_cluster;
drop table test.test_all on cluster default_cluster;
复杂数据类型map

有些业务数据某个字段是json类型,并且key数量不定,这个时候需要将其在ck里面定义为map类型,包容性好。

CREATE TABLE test.test_local2 on cluster default_cluster (
    `id` UInt64,
    `data` Map(String, String),
) ENGINE = ReplicatedMergeTree()
partition by dt
order by id;

CREATE TABLE test.test_all2 on cluster default_cluster as test.test_local2 ENGINE = Distributed('default_cluster', 'test', 'test_local2', sipHash64(id));

data  Map(String, String) :这就定义了一个map类型字段,查询时候可以这样查:

select data['key'] from test.test_all2 limit 5

对于json,ck也是可以解的。

select JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b')
-- 结果
'[-100, 200.0, 300]'

select JSONExtractString('{"a": "hello", "b": [-100, 200.0, 300]}', 'a')
-- 结果
'hello'

更详细官方文档:https://clickhouse.com/docs/zh/sql-reference/functions/json-functions

关于成本

相比较而言,ck是能节省成本的,运维是这么说的。经常扩容磁盘,而计算性能只扩容了一次。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐