Kafka 0.11.0.2 安装备忘录
Kafka 0.11.0.2 安装备忘录本文基于 Kafka 0.11.0.2的安装,环境 Centos 6 / Centos 7服务器共3台,用来安装Kafka集群和ZK集群服务器名IPzk myidKafka broker IDzk110.20.32.11311zk210.20.32.11422zk310.20.3
·
Kafka 0.11.0.2 安装备忘录
本文基于 Kafka 0.11.0.2的安装,环境 Centos 6 / Centos 7
服务器共3台,用来安装Kafka集群和ZK集群
服务器名 | IP | zk myid | Kafka broker ID |
---|---|---|---|
zk1 | 10.20.32.113 | 1 | 1 |
zk2 | 10.20.32.114 | 2 | 2 |
zk3 | 10.20.32.126 | 3 | 3 |
ZooKeeper 安装部分
- 分别在三台机器上,写入host名称
修改 \etc\hosts
10.20.32.113 zk1
10.20.32.114 zk2
10.20.32.126 zk3
下载解压ZooKeeper,我用的是zookeeper-3.4.11.统一放到/usr/local/zookeeper-3.4.11/
创建数据文件夹 ,写入zookeeper的ID到myid文件里,注意每台机器都不一样,可参照上面的表格,查看服务器 IP MYID的对应关系
mkdir /usr/local/zookeeper-3.4.11/data
touch /usr/local/zookeeper-3.4.11/data/myid
echo "1" > /usr/local/zookeeper-3.4.11/data/myid
- 在zk的conf文件夹里,复制zoo_sample.cfg为zoo.cfg,并修改端口(可选,我这里使用的不是默认端口2181,而是1119)
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper-3.4.11/data/
# the port at which the clients will connect
clientPort=1119
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=zk1:5888:6888
server.2=zk2:5888:6888
server.3=zk3:5888:6888
修改 /etc/rc.local 配置ZooKeeper开机启动(可选),注意这里的JAVA_HOME要指向你本机实际的JAVA目录,这里我使用yum安装的openjdk
yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel
/etc/rc.local
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el6_9.x86_64/jre
/usr/local/zookeeper-3.4.11/bin/zkServer.sh start
下面就可以在三台机器上分别启动zk了
/usr/local/zookeeper-3.4.11/bin/zkServer.sh start
Kafka安装部分
下载解压Kafka 0.11.0.2,这里我统一安装到 /usr/local/kafka_2.11-0.11.0.2/
修改配置文件如下(以配置第一台kafka机器为例,配置很简单只需要修改配置文件的3处即可)
vi /usr/local/kafka_2.11-0.11.0.2/config/server.properties
#修改,这里ID为1
broker.id=1
#这里指定kafak运行端口,可以填写IP或者机器名
listeners=PLAINTEXT://zk1:9092
#指定zk集群的连接地址
zookeeper.connect=zk1:1119,zk2:1119,zk3:1119
- 依次完成3个机器的配置修改。下面启动Kafka集群
/usr/local/kafka_2.11-0.11.0.2/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.11.0.2/config/server.properties
另外可以把这个命令加入/etc/rc.local开机启动(可选)
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el6_9.x86_64/jre
/usr/local/zookeeper-3.4.11/bin/zkServer.sh start
/usr/local/kafka_2.11-0.11.0.2/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.11.0.2/config/server.properties
验证测试Kafka
以下所以操作都是在/usr/local/kafka_2.11-0.11.0.2/bin
cd /usr/local/kafka_2.11-0.11.0.2/bin
首先创建topic,此处我的tpoic叫Mark_test
/kafka-topics.sh --create --zookeeper zk1:1119 --replication-factor 1 --partitions 1 --topic Mark_test
在本机或者其他2台机器上查看一下是否能看到这个topic
./kafka-topics.sh --list --zookeeper zk1:1119 Mark_test
在某一台机器上创建消息,运行命令以后可发送任意消息,此处为生产者
./kafka-console-producer.sh --broker-list zk1:9092 --topic Mark_test
在某一台机器上消费消息,运行命令以后可接受指定topic的消息,此处为消费者,可以在其他2台机器上运行此命令
./kafka-console-consumer.sh --zookeeper localhost:1119 --topic Mark_test --from-beginning
java客户端验证
建立maven项目,pom.xml内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>my.kafka.test</groupId>
<artifactId>demo1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
</dependency>
</dependencies>
</project>
java生产者
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Hello world!
*/
public class KafkaProducer
{
public final static String TOPIC = "Mark_test";
private final Producer<String, String> producer;
private KafkaProducer() {
Properties props = new Properties();
// 此处配置的是kafka的端口
props.put("metadata.broker.list", "10.20.32.126:9092");
// 配置value的序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 配置key的序列化类
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
// request.required.acks
// 0, which means that the producer never waits for an acknowledgement
// from the broker (the same behavior as 0.7). This option provides the
// lowest latency but the weakest durability guarantees (some data will
// be lost when a server fails).
// 1, which means that the producer gets an acknowledgement after the
// leader replica has received the data. This option provides better
// durability as the client waits until the server acknowledges the
// request as successful (only messages that were written to the
// now-dead leader but not yet replicated will be lost).
// -1, which means that the producer gets an acknowledgement after all
// in-sync replicas have received the data. This option provides the
// best durability, we guarantee that no messages will be lost as long
// as at least one in sync replica remains.
props.put("request.required.acks", "-1");
producer = new Producer<String, String>(new ProducerConfig(props));
}
public static void main(String[] args)
{
new KafkaProducer().produce();
}
void produce() {
int messageNo = 1000;
final int COUNT = 10000;
while (messageNo < COUNT) {
String key = String.valueOf(messageNo);
String data = "Current time is:" + System.currentTimeMillis() + " msg key: " + key;
producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
System.out.println(data);
messageNo++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
java消费者
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer {
private final ConsumerConnector consumer;
public KafkaConsumer() {
Properties props = new Properties();
// zookeeper 配置
props.put("metadata.broker.list", "10.20.32.126:9092");
// group 代表一个消费组
props.put("group.id", "jd-group");
// zk连接超时
props.put("zookeeper.connect", "zk1:1119,zk2:1119,zk3:1119");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "largest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
public static void main(String[] args) {
new KafkaConsumer().consume();
}
public void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("Mark_test", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(
new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(
"Mark_test").get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}
}
最后放上配合0.11版本的kafka的java API
更多推荐
已为社区贡献1条内容
所有评论(0)