自定义FlumeKafkaSink
1、自定义FlumeKafkaSinkpackage com.panguoyuan.flume.sink;import java.util.Iterator;import java.util.Map;import java.util.Map.Entry;import java.util.Properties;import kafka.javaapi.producer.Produc
·
1、自定义FlumeKafkaSink
package com.panguoyuan.flume.sink;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class KafkaSink extends AbstractSink implements Configurable {
private String topic;
private Producer<String, String> producer;
public void configure(Context context) {
Properties props = new Properties();
Map<String,String> map = context.getParameters();
Iterator iter = map.entrySet().iterator();
String metadataBrokerList = map.get("metadata.broker.list");
props.setProperty("metadata.broker.list", metadataBrokerList);
topic = map.get("custom.topic.name");
if(topic == null || "".equals(topic)){
topic = "kafka-defualt-topic-name";
}
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.panguoyuan.flume.sink.HashSimplePartitioner");
props.put("zookeeper.connect", "ip1:2181,ip2:2181,ip3:2181");
props.setProperty("num.partitions", "3"); //
props.put("request.required.acks", "1");
// props.put("replication-factor", "3");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
while(iter.hasNext()){
Entry<String, String> entry = (Entry)iter.next();
System.out.println("key="+entry.getKey()+" value="+entry.getValue());
}
System.out.println("KafkaSink初始化完成.");
}
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
try {
tx.begin();
Event e = channel.take();
if (e == null) {
tx.rollback();
return Status.BACKOFF;
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
producer.send(data);
System.out.println("flume向kafka发送消息:" + new String(e.getBody()));
tx.commit();
return Status.READY;
} catch (Exception e) {
System.err.println("Flume KafkaSinkException:"+ e);
tx.rollback();
return Status.BACKOFF;
} finally {
tx.close();
}
}
}
2、自定义Partitioner
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* 自定义Kafka Partitioner
*
* @Author 潘国远
*
* @Time 2015-02-07
*
*/
public class HashSimplePartitioner implements Partitioner {
public HashSimplePartitioner(VerifiableProperties props) {
}
@Override
public int partition(Object obj, int numPartitions) {
int partition = 0;
partition = obj.hashCode() % numPartitions;
return partition;
}
}
3、编写flume-kafka接受端
#bin/flume-ng agent --conf conf --conf-file conf/flume-accept-conf.properties --name producer -Dflume.root.logger=INFO,console
#agent
producer.sources = s
producer.channels = c
producer.sinks = r
#source
producer.sources.s.type = avro
producer.sources.s.bind = 192.168.1.100
producer.sources.s.port = 44444
producer.sources.s.channels = c
producer.sinks.r.type = com.panguoyuan.flume.sink.KafkaSink
producer.sinks.r.metadata.broker.list=ip1:9092,ip2:9092,ip3:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=com.panguoyuan.flume.sink.HashSimplePartitioner
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=flume-kafka-topic
producer.sinks.r.keep-alive = 10
producer.sinks.r.channel = c
producer.channels.c.type = memory
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 10000
4、编写flume数据发送端口
send.sources = source1 source2
send.channels = c
send.sinks = s
send.sources.source1.type = exec
send.sources.source1.command = tail -F /app/log1.log
send.sources.source1.channels = c
send.sources.source2.type = exec
send.sources.source2.command = tail -F /app/log2.log
send.sources.source2.channels = c
send.channels.c.type = memory
send.channels.c.capacity = 10000
send.channels.c.transactionCapacity = 10000
send.sinks.s.type = avro
send.sinks.s.hostname = 192.168.1.100
send.sinks.s.port = 44444
send.sinks.s.channel = c
send.sinks.s.keep-alive = 10
更多推荐
已为社区贡献6条内容
所有评论(0)