今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;

java版本:

package com.cn.kafka;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.sf.json.JSONObject;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

/**
 * 从kafka里读取数据
 * @projectName:jason_kafka
 * @author:JasonLee
 * @ClassName:KafkaConsumerCS
 * @createdTime:2017-12-20
 */
public class KafkaUtils {

	public void consumer(){
		Properties props = new Properties();
		props.put("bootstrap.servers", "");
		props.put("group.id", "jason_");
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("session.timeout.ms", "30000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("jason_20180519"));
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records){
				System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
			}
		}
	}
	public void producer(){
		Properties props = new Properties();
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		props.put("metadata.broker.list", "");  //声明broker;
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		ProducerConfig config = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(config);
		Map map = new HashMap();
		map.put("name","jason");
		map.put("addr","100");
		JSONObject jsonObject = JSONObject.fromObject(map);
		KeyedMessage<String, String> message =  new KeyedMessage<String, String>("jason_0627", jsonObject.toString());
		producer.send(message);
	}
}

scala版本:

package kafka

import java.util.{Collections, Properties}
import net.sf.json.JSONObject
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.collection.JavaConversions._
object KafkaConsumer {

  def cunsumer(): Unit ={
    val props = new Properties()
    props.put("bootstrap.servers", PropertiesScalaUtils.loadProperties("broker"))
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("group.id", "something")
    props.put("auto.offset.reset","earliest")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("test_v1"))
    while (true){
      val records = consumer.poll(100)
      for (record <- records){
        println(record.offset() +"--" +record.key() +"--" +record.value())
      }
    }
    consumer.close()
  }


  def producer(): Unit ={
    val brokers_list = ""
    val topic = "jason_20180511"
    val properties = new Properties()
    properties.put("group.id", "jaosn_")
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers_list)
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)//value的序列化;
    val producer = new KafkaProducer[String, String](properties)
    var num = 0
    for(i<- 1 to 1000){
      val json = new JSONObject()
      json.put("name","jason"+i)
      json.put("addr","25"+i)
      producer.send(new ProducerRecord(topic,json.toString()))
    }
    producer.close()
  }

}

如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,更多的Flink和spark的干货可以加入下面的星球

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐