java,scala读写kafka操作
今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;java版本:package com.cn.kafka;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import java.util.Properties;i...
·
今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;
java版本:
package com.cn.kafka;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.sf.json.JSONObject;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
/**
* 从kafka里读取数据
* @projectName:jason_kafka
* @author:JasonLee
* @ClassName:KafkaConsumerCS
* @createdTime:2017-12-20
*/
public class KafkaUtils {
public void consumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "");
props.put("group.id", "jason_");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("jason_20180519"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
public void producer(){
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", ""); //声明broker;
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
Map map = new HashMap();
map.put("name","jason");
map.put("addr","100");
JSONObject jsonObject = JSONObject.fromObject(map);
KeyedMessage<String, String> message = new KeyedMessage<String, String>("jason_0627", jsonObject.toString());
producer.send(message);
}
}
scala版本:
package kafka
import java.util.{Collections, Properties}
import net.sf.json.JSONObject
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.collection.JavaConversions._
object KafkaConsumer {
def cunsumer(): Unit ={
val props = new Properties()
props.put("bootstrap.servers", PropertiesScalaUtils.loadProperties("broker"))
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "something")
props.put("auto.offset.reset","earliest")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("test_v1"))
while (true){
val records = consumer.poll(100)
for (record <- records){
println(record.offset() +"--" +record.key() +"--" +record.value())
}
}
consumer.close()
}
def producer(): Unit ={
val brokers_list = ""
val topic = "jason_20180511"
val properties = new Properties()
properties.put("group.id", "jaosn_")
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers_list)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)//value的序列化;
val producer = new KafkaProducer[String, String](properties)
var num = 0
for(i<- 1 to 1000){
val json = new JSONObject()
json.put("name","jason"+i)
json.put("addr","25"+i)
producer.send(new ProducerRecord(topic,json.toString()))
}
producer.close()
}
}
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,更多的Flink和spark的干货可以加入下面的星球
更多推荐
已为社区贡献8条内容
所有评论(0)