kafka-clients java
dependency>groupId>org.apache.kafkagroupId>artifactId>kafka-clientsartifactId>version>0.11.0.0version>dependency>package kafka;import org.apache.ka
·
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
/**
* @author zhangweiwen
* @date 2018/1/10
*/
public class MyConsumer extends Thread {
private String groupid;
private String topic = "test";
public MyConsumer(String groupid){
this.groupid = groupid;
}
public void run() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
props.put("group.id", groupid);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
consumer.seekToBeginning(new ArrayList<TopicPartition>());
Map<String, List<PartitionInfo>> listTopics = consumer.listTopics();
Set<Map.Entry<String, List<PartitionInfo>>> entries = listTopics.entrySet();
for (Map.Entry<String, List<PartitionInfo>> entry:
entries) {
System.out.println("topic:" + entry.getKey());
}
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
}
//按分区读取数据
// for (TopicPartition partition : records.partitions()) {
// List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// for (ConsumerRecord<String, String> record : partitionRecords) {
// System.out.println("fetched from partition " + partition.partition() + ", "+ record.offset() + ": " + record.value());
// }
// }
}
}
}
package kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyProducer extends Thread{
public void run() {
try {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0;
while(true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
e.printStackTrace();
System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
}
});
i++;
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
import kafka.MyProducer;
/**
* @author zhangweiwen
* @date 2018/1/10
*/
public class HelloKafkaP {
public static void main(String[] args) {
new MyProducer().start();
}
}
import kafka.MyConsumer;
/**
* @author zhangweiwen
* @date 2018/1/10
*/
public class HelloKafka {
public static void main(String[] args) {
new MyConsumer("test-consumer-group").start();
}
}
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @author zhangweiwen
* @date 2018/1/10
*/
public class HelloKafka2 {
public static void main(String[] args) throws Exception{
String kafkaConnect = "localhost:9091,localhost:9092,localhost:9093";
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnect);
AdminClient admin = AdminClient.create(config);
Map<String, String> configs = new HashMap<String,String>();
int partitions = 1;
short replication = 1;
// CreateTopicsResult result = admin.createTopics(asList(new NewTopic("mynewtopic", partitions, replication).configs(configs)));
ListTopicsResult result = admin.listTopics();
for (Map.Entry<String, TopicListing> entry : result.namesToListings().get().entrySet()) {
System.out.println("topic {} created:"+ entry.getValue().name());
}
}
}
更多推荐
所有评论(0)