kafka 0.8.x producer Example(scala)
Producer最简配置metadata.broker.list参数指定broker地址,这里不需要填上所有的broker地址,但是如果只写一个,这个broker挂掉后就无法往topic中写入信息,一般写入2-3个broker地址。serializer.class指定序列化的方式props.put("metadata.broker.list","broker1:9092,broker2:90
·
Producer
最简配置
metadata.broker.list
参数指定broker地址,这里不需要填上所有的broker地址,但是如果只写一个,这个broker挂掉后就无法往topic中写入信息,一般写入2-3个broker地址。
serializer.class
指定序列化的方式
props.put("metadata.broker.list","broker1:9092,broker2:9092,broker3:9092")
props.put("serializer.class","kafka.serializer.StringEncoder")
producer
两个类型参数,第一个为partition key类型,第二个为消息类型
val producer = new Producer[String,String] (config)
发送消息
KeyedMessage
的两个参数,第一个为要写入的topic名字,第二个为要写入的消息。
val date = new KeyedMessage[String, String] ("kafka-spark-test", "testInfo")
producer.send (date)
完整代码
import java.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
object kafka_producer {
def main(args: Array[String]) {
val props = new Properties()
props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("request.required.acks", "1")
val config = new ProducerConfig(props);
val producer = new Producer[String, String](config)
val date = new KeyedMessage[String, String]("kafka-spark-test", "testInfo")
producer.send(date)
producer.close
}
}
Tip
如果运行时发现如下错误:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
将log4j.properties
加入到src下
更多推荐
已为社区贡献11条内容
所有评论(0)