flink消费kafka消息
package testMaven.testMaven;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streami
·
package testMaven.testMaven;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class App
{
public static void main( String[] args ) throws Exception
{
String topic = "test";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.213:9092");
properties.setProperty("zookeeper.connect", "192.168.1.213:2181");
properties.setProperty("group.id", "site");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableSysoutLogging();
DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer08<String>(topic, new SimpleStringSchema(), properties));
dataStream.writeAsText("/tmp/app.txt");
env.execute("read from kafka example");
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)