消息队列 - Kafka 生产/消费
Kafka介绍kafka是一个分布式流处理的平台,通过kafka我们可以发布和订阅流式记录。有关kafka的介绍可以参考官网或者这篇文章https://juejin.im/post/6844903495670169607,介绍的非常的详细。kafka中多个broker称为一个集群,一个broker(独立的kafka服务器)中可以创建不同的topic(主题),topic下可以建立多个partitio
·
Kafka介绍
kafka是一个分布式流处理的平台,通过kafka我们可以发布和订阅流式记录。有关kafka的介绍可以参考官网或者这篇文章https://juejin.im/post/6844903495670169607,介绍的非常的详细。
kafka中多个broker称为一个集群,一个broker(独立的kafka服务器)中可以创建不同的topic(主题),topic下可以建立多个partition(分区),数据则存放在分区中的一个单元里面(partition可以近似理解为一个数组,存入kafka的一条记录就存放在partition1[0]、partition1[1]…)
Kafka安装
个人喜欢docker方式,安装方便
# WARNING: This docker-compose.yml is only for testing purpose.
# Parameters:
# - name: CONFLUENT_PLATFORM_VERSION
# default: 3.0.0
# reference: https://hub.docker.com/u/confluentinc/
# Ports:
# - description: Major ports are exposed to host computer
# - zookeeper: 2181
# kafka1: 9091
# kafka2: 9092
# kafka3: 9093
# kafka4: 9094
# kafka5: 9095
# Tips:>
# - You can up part of the cluster with below command.
# $ docker-compose up -d kafka1 kafka2 kafka3
version: '3.3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
healthcheck:
test: echo stat | nc localhost 2181
interval: 10s
timeout: 10s
retries: 3
environment:
- ZOOKEEPER_SERVER_ID=1
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_INIT_LIMIT=5
- ZOOKEEPER_SYNC_LIMIT=2
- ZOOKEEPER_SERVERS=zookeeper:2888:3888
kafka1:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9091:9091"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9091
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9091
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=1
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka2:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=2
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka3:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9093
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=3
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka4:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9094
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=4
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka5:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9095:9095"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9095
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9095
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=5
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
Kafka 生产/消费
个人喜欢c/c++操作kafka,需要安装kafka客户端librdkafka、cppkafka
sudo apt install librdkafka-dev
git clone https://github.com/mfontanini/cppkafka.git
cd cppkafka
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=RELEASE ..
make -j
sudo make install
cmake使用方式
find_package(CppKafka REQUIRED)
target_link_libraries(<YourLibrary> CppKafka::cppkafka)
生产
#include <stdexcept>
#include <iostream>
#include <boost/program_options.hpp>
#include "cppkafka/utils/buffered_producer.h"
#include "cppkafka/configuration.h"
using std::string;
using std::exception;
using std::getline;
using std::cin;
using std::cout;
using std::endl;
using cppkafka::BufferedProducer;
using cppkafka::Configuration;
using cppkafka::Topic;
using cppkafka::MessageBuilder;
using cppkafka::Message;
namespace po = boost::program_options;
int main(int argc, char* argv[]) {
string brokers;
string topic_name;
int partition_value = -1;
po::options_description options("Options");
options.add_options()
("help,h", "produce this help message")
("brokers,b", po::value<string>(&brokers)->required(),
"the kafka broker list")
("topic,t", po::value<string>(&topic_name)->required(),
"the topic in which to write to")
("partition,p", po::value<int>(&partition_value),
"the partition to write into (unassigned if not provided)")
;
po::variables_map vm;
try {
po::store(po::command_line_parser(argc, argv).options(options).run(), vm);
po::notify(vm);
}
catch (exception& ex) {
cout << "Error parsing options: " << ex.what() << endl;
cout << endl;
cout << options << endl;
return 1;
}
// Create a message builder for this topic
MessageBuilder builder(topic_name);
// Get the partition we want to write to. If no partition is provided, this will be
// an unassigned one
if (partition_value != -1) {
builder.partition(partition_value);
}
// Construct the configuration
Configuration config = {
{ "metadata.broker.list", brokers }
};
// Create the producer
BufferedProducer<string> producer(config);
// Set a produce success callback
producer.set_produce_success_callback([](const Message& msg) {
cout << "Successfully produced message with payload " << msg.get_payload() << endl;
});
// Set a produce failure callback
producer.set_produce_failure_callback([](const Message& msg) {
cout << "Failed to produce message with payload " << msg.get_payload() << endl;
// Return false so we stop trying to produce this message
return false;
});
cout << "Producing messages into topic " << topic_name << endl;
// Now read lines and write them into kafka
string line;
while (getline(cin, line)) {
// Set the payload on this builder
builder.payload(line);
// Add the message we've built to the buffered producer
producer.add_message(builder);
// Now flush so we:
// * emit the buffered message
// * poll the producer so we dispatch on delivery report callbacks and
// therefore get the produce failure/success callbacks
producer.flush();
}
}
消费
#include <stdexcept>
#include <iostream>
#include <csignal>
#include <boost/program_options.hpp>
#include "cppkafka/consumer.h"
#include "cppkafka/configuration.h"
using std::string;
using std::exception;
using std::cout;
using std::endl;
using cppkafka::Consumer;
using cppkafka::Configuration;
using cppkafka::Message;
using cppkafka::TopicPartitionList;
namespace po = boost::program_options;
bool running = true;
int main(int argc, char* argv[]) {
string brokers;
string topic_name;
string group_id;
po::options_description options("Options");
options.add_options()
("help,h", "produce this help message")
("brokers,b", po::value<string>(&brokers)->required(),
"the kafka broker list")
("topic,t", po::value<string>(&topic_name)->required(),
"the topic in which to write to")
("group-id,g", po::value<string>(&group_id)->required(),
"the consumer group id")
;
po::variables_map vm;
try {
po::store(po::command_line_parser(argc, argv).options(options).run(), vm);
po::notify(vm);
}
catch (exception& ex) {
cout << "Error parsing options: " << ex.what() << endl;
cout << endl;
cout << options << endl;
return 1;
}
// Stop processing on SIGINT
signal(SIGINT, [](int) { running = false; });
// Construct the configuration
Configuration config = {
{ "metadata.broker.list", brokers },
{ "group.id", group_id },
// Disable auto commit
{ "enable.auto.commit", false }
};
// Create the consumer
Consumer consumer(config);
// Print the assigned partitions on assignment
consumer.set_assignment_callback([](const TopicPartitionList& partitions) {
cout << "Got assigned: " << partitions << endl;
});
// Print the revoked partitions on revocation
consumer.set_revocation_callback([](const TopicPartitionList& partitions) {
cout << "Got revoked: " << partitions << endl;
});
// Subscribe to the topic
consumer.subscribe({ topic_name });
cout << "Consuming messages from topic " << topic_name << endl;
// Now read lines and write them into kafka
while (running) {
// Try to consume a message
Message msg = consumer.poll();
if (msg) {
// If we managed to get a message
if (msg.get_error()) {
// Ignore EOF notifications from rdkafka
if (!msg.is_eof()) {
cout << "[+] Received error notification: " << msg.get_error() << endl;
}
}
else {
// Print the key (if any)
if (msg.get_key()) {
cout << msg.get_key() << " -> ";
}
// Print the payload
cout << msg.get_payload() << endl;
// Now commit the message
consumer.commit(msg);
}
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)