4.4Kafka Api
kafka API操作
·
在IDEA中建立maven项目
pom.xml配置文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.long</groupId>
<artifactId>BigData_Kafka_Dome</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
</project>
在遇到红色字体的情况,右击
创建这个文件
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
<Appenders>
<!-- 类型名为Console,名称为必须属性 -->
<Appender type="Console" name="STDOUT">
<!-- 布局为PatternLayout的方式,
输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
<Layout type="PatternLayout"
pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
</Appender>
</Appenders>
<Loggers>
<!-- 可加性为false -->
<Logger name="test" level="info" additivity="false">
<AppenderRef ref="STDOUT" />
</Logger>
<!-- root loggerConfig设置 -->
<Root level="info">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>
创建包文件
创建生产者 代码如下
package edu.hao.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception{
Properties props = new Properties();//
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.74.139:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);//固定格式
//创建生产者,producer,声明变量
KafkaProducer<String,String> producer=
new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String,String>("hao","0823"));
producer.close();
}
}
消费者消费情况,01没有消费
图解
利用多线程进行不同的分组测试(可能理解有偏差,不要介意,有异议可提出修改)
for (int i = 1;i < 600;i++){
producer.send(new ProducerRecord<String,String>("hao",String.valueOf(i)));
}
(二)带回调的函数API
for (int i = 1;i < 10;i++){
//producer.send(new ProducerRecord<String,String>("hao",String.valueOf(i)));
//会触发ack
producer.send(new ProducerRecord<String, String>("hao", String.valueOf(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null){
System.out.println(e.getMessage());
}else{
System.out.println(metadata.partition());
}
}
});
0代表0号分区
因为上一章有三个分区
出现错误暂时没有解决
(三、自定义分区器)
package edu.hao.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class EvenPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes,
Object o1, byte[] bytes1, Cluster cluster) {
int num =Integer.parseInt(o1.toString());
return num %2 ==0?0:1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
原来代码修改一下
注释一下原来的
package edu.hao.producer;
import edu.hao.partltloner.EvenPartitioner;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception{
Properties props = new Properties();//
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.74.139:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);//固定格式
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,EvenPartitioner.class);
//创建生产者,producer,声明变量
KafkaProducer<String,String> producer=
new KafkaProducer<String, String>(props);
for (int i = 1;i < 10;i++){
producer.send(new ProducerRecord<String,String>("song",String.valueOf(i)));
//会触发ack
// producer.send(new ProducerRecord<String, String>("hao", String.valueOf(i)), new Callback() {
// @Override
// public void onCompletion(RecordMetadata metadata, Exception e) {
// if (e != null){
// System.out.println(e.getMessage());
// }else{
// System.out.println(metadata.partition());
// }
//
// }
// });
}
producer.close();
}
}
就可以把1—10奇偶分离
仅供个人参考
更多推荐
已为社区贡献3条内容
所有评论(0)