flink-1.12.0消费kafka写入clickhouse-20.8.20.1
方案一 DataStream方式写入clickhouse方法主入口public class DataClean {public static void main(String[] args) throws Exception {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(
·
clickhouse集群搭建 请观看我上一篇文章
DataStream方式写入clickhouse - java
方法主入口
public class DataClean {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.17.2.73:9092,172.17.2.74:9092,172.17.2.75:9092");
properties.setProperty("group.id", "flink_20210902");
DataStream<String> stream = bsEnv.addSource(new FlinkKafkaConsumer<>("test_topic", new SimpleStringSchema(), properties));
DataStream<NewsPo> stream2 = stream.map(line -> {
JSONObject jb = new JSONObject(line);
NewsPo news = null;
try {
news = new NewsPo(jb.getString("key"), jb.getString("title"), jb.getString("file1"), jb.getString("field2").substring(0,10));
} catch (Exception e) {
news = new NewsPo();
}
return news;
});
MyClickhouseSink myClickhouseSink = new MyClickhouseSink();
myClickhouseSink.setSql("insert into clj_test.news2 values(?,?,?,?)");
stream2.addSink(myClickhouseSink);
bsEnv.execute("flink clickhouse sink");
}
}
自定义clickhouse sink
public class MyClickhouseSink extends RichSinkFunction<NewsPo> implements Serializable {
Connection conn = null;
String sql = null;
public void setSql(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = ClickHouseUtil.getConn();
}
@Override
public void close() throws Exception {
super.close();
if (conn != null) {
conn.close();
}
}
@Override
public void invoke(NewsPo news) throws SQLException {
PreparedStatement preparedStatement = conn.prepareStatement(sql);
preparedStatement.setString(1, news.getKey());
preparedStatement.setString(2, news.getTitle());
preparedStatement.setString(3, news.getPublish_time());
preparedStatement.setString(4, news.getPublish_date());
preparedStatement.addBatch();
preparedStatement.executeBatch();
}
}
public class ClickHouseUtil {
private static Connection connection;
public static Connection getConn() throws SQLException, ClassNotFoundException {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String address = "jdbc:clickhouse://172.17.2.144:8123/clj_test" ;
connection = DriverManager.getConnection(address);
return connection;
}
public void close() throws SQLException {
connection.close();
}
}
结果展示
插入的是分布式表 4个节点的数据量是一致的
更多推荐
已为社区贡献2条内容
所有评论(0)