Flink消费Kafka插入ClickHouse实现百亿秒级OLAP
一、数据写入ClickHouse的几种方式(Java版)1、第三方集成库:flink-clickhouse-sink 点我进官网版本限制:flinkflink-clickhouse-sink1.3.*1.0.01.9.*1.3.1Maven依赖<dependency><groupId>ru.ivi.opensource</groupId><artifact
·
- 部署结构
- 物化视图
一、数据写入ClickHouse的几种方式(Java版)
1、第三方集成库:flink-clickhouse-sink 点我进官网
-
版本限制:
flink flink-clickhouse-sink 1.3.* 1.0.0 1.9.* 1.3.1 -
Maven依赖
<dependency> <groupId>ru.ivi.opensource</groupId> <artifactId>flink-clickhouse-sink</artifactId> <version>1.3.1</version> </dependency>
-
Job类添加ClickHouse的环境配置
... ... ... Map<String, String> sinkPro = new HashMap<>(); //sink Properties sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "sc.chproxy.bigdata.services.org:10000"); // ClickHouse 本地写账号 sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "your-user"); sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "your-password"); // sink common sinkPro.put(ClickHouseSinkConst.TIMEOUT_SEC, "10"); sinkPro.put(ClickHouseSinkConst.NUM_WRITERS, "10"); sinkPro.put(ClickHouseSinkConst.NUM_RETRIES, "3"); sinkPro.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1000000"); sinkPro.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); sinkPro.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");//本地运行会在项目内生成名字为"d:"的文件夹,以存放运行失败明细记录 // env - sinkPro ParameterTool parameters = ParameterTool.fromMap(sinkPro); env.getConfig().setGlobalJobParameters(parameters); // ClickHouseSink - sinkPro Properties props = new Properties(); props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "database_1564.ch_zjk_test_local"); props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000"); ClickHouseSink sink = new ClickHouseSink(props); env.setParallelism(1);//ClickHouse不支持高并发,本地测试建议加上,全局环境并行设为1 ... ... ... env.addSource(new FlinkKafkaConsumer(PropertyUtil.get("your-topic"), new SimpleStringSchema(), sourcPro).setStartFromLatest()).uid("JobSource").name("JobSource").setParallelism(1) //这个FlatMap算子是数据的ETL .flatMap(new SourceFlatMapRep()).uid("SourceFlatMapRep").name("SourceFlatMapRep").setParallelism(2) //这个算子是输出要插入的字段数据,封装成固定格式的字符串(圆括号包裹整体,单引包裹每个元素),例如-> ('zjk','22','csdn_note') .flatMap(new OutFlatMap()).uid("OutFlatMap").name("OutFlatMap").setParallelism(1) //输入到flink-clickhouse-sink的三方处理,自动发送给CK .andSink(sink); env.execute("EventJob");
2、FlinkJDBC:flink-connector-jdbc
flink-connector-jdbc要求Flink版本为
1.11.0+
-
Maven依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
-
Job类
//不需要CK全局环境那些配置,参数都在SinkFunction自定义 ... ... ... env.addSource(new FlinkKafkaConsumer(PropertyUtil.get("topic1111"), new SimpleStringSchema(), sourcPro).setStartFromLatest()).uid("JobSource").name("JobSource").setParallelism(5) .flatMap(new SourceFlatMap()).uid("SourceFlatMap").name("SourceFlatMap").setParallelism(10) .flatMap(new EventFlatMap()).uid("EventFlatMap").name("EventFlatMap").setParallelism(10) .addSink(MySink.sink()).setParallelism(4); env.execute("EventJob");
-
SinkFunction算子
import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseDriver; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; public class MySink extends RichSinkFunction { private static final Logger LOG = LoggerFactory.getLogger(MySink.class); private static String WRITE_USER = "your-user"; private static String PASSWD = "your-password"; private static String url = "jdbc:clickhouse://sc.chproxy.bigdata.services.org:10000/database_1564"; private static String insertSql = "insert into database_1564.ch_zjk_test_local (seq,real_time,app_id,app_version,session_id,event_id,device_uuid,old_id,mos,m,o,br,os,bd,ise,lct,mid,chl,lpro,mosv,manu,osvi,is_ca,_pt,id,du,pg,dus,ext,gdu,ids,tab,tag,key,code,type,from,plat,exts,docid,style,state,types,newev,hotev,value,vtype,styles,fromID,status,action,msg_id,source,offset,column,spsuuid,buildev,referer,offsets,columnd,fold_id,searchid,rec_type,commentid,refererid,content_id,referer_id,schsessionid,exposurepercent,exposurepercents) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; public static JdbcStatementBuilder<List<String>> marketStatement = (ps, t) -> { LOG.info("##### {}", t); for (int i = 1; i < 71; i++) { ps.setString(i, t.get(i - 1)); } }; public static SinkFunction sink() { return JdbcSink.sink(insertSql, marketStatement, JdbcExecutionOptions.builder().withBatchIntervalMs(1000 * 3).withBatchSize(100).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(url) .withUsername(WRITE_USER) .withPassword(PASSWD) .withDriverName(ClickHouseDriver.class.getName()) .build()); } }
3、ClickHouseJDBC:clickhouse-jdbc
此方式对任何Flink版本有效 建议使用
- Maven依赖
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.3</version> </dependency>
- Job类
//它也不需要CK全局环境那些配置,参数都在FlatMap算子的open方法里自定义 ... ... ... env.addSource(new FlinkKafkaConsumer(PropertyUtil.get("topic1111"), new SimpleStringSchema(), sourcPro).setStartFromLatest()).uid("JobSource").name("JobSource").setParallelism(30) .flatMap(new SourceFlatMap()).uid("SourceFlatMap").name("SourceFlatMap").setParallelism(20) .flatMap(new EventFlatMap()).uid("EventFlatMap").name("EventFlatMap").setParallelism(20) //写入CK的算子 .flatMap(new CHSinkFlatMap()).uid("CHSinkFlatMap").name("CHSinkFlatMap").setParallelism(15) env.execute("EventJob");
- FlatMap算子
import java.util.List; import org.slf4j.Logger; import java.sql.Connection; import java.sql.SQLException; import java.sql.DriverManager; import org.slf4j.LoggerFactory; import java.sql.PreparedStatement; import org.apache.flink.util.Collector; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple4; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseDataSource; import org.apache.flink.configuration.Configuration; import ru.yandex.clickhouse.settings.ClickHouseProperties; import org.apache.flink.api.common.functions.RichFlatMapFunction; public class CHSinkFlatMap extends RichFlatMapFunction<List<String>,String>{ private static final Logger LOG = LoggerFactory.getLogger(CHSinkFlatMap.class); private static int count = 1; private static ClickHouseConnection connection= null; private static PreparedStatement preparedStatement = null; //创建连接对象和会话 @Override public void open(Configuration parameters) throws Exception { try{ connection = getConn(); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(insertSql); }catch (Exception e) { LOG.error("clickhouse初始化连接报错:",e); } } //使用Batch批量写入,关闭自动提交 @Override public void flatMap(List<String> list, Collector<String> collector) throws Exception { try { for(int i = 1; i<71 ; i++) { preparedStatement.setString(i, StringUtils.isNotBlank(list.get(i-1)) ? list.get(i-1) : "uk"); } preparedStatement.addBatch(); count = count+1; try{ if (count >= 50000) { preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); count = 1; } }catch (Exception ee) { LOG.error("数据插入click house 报错:",ee); } }catch (Exception ex){ LOG.error("ClickhouseSink插入报错====",ex); } } public static ClickHouseConnection getConn() { String username = ""; String password = ""; String address = ""; String db = ""; int socketTimeout = 600000; ClickHouseProperties properties = new ClickHouseProperties(); properties.setUser(username); properties.setPassword(password); properties.setDatabase(db); properties.setSocketTimeout(socketTimeout); ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties); ClickHouseConnection conn = null; try { conn = clickHouseDataSource.getConnection(); return conn; } catch (SQLException e) { e.printStackTrace(); } return null; } }
二、ClickHouse表
1、表引擎
三、物化视图实时cube
四、报表优化
更多推荐
已为社区贡献1条内容
所有评论(0)