安装node环境:
wget https://nodejs.org/dist/v6.10.3/node-v6.10.3-linux-x64.tar.xz
tar xf node-v6.10.3-linux-x64.tar.xz -C /usr/local
mv /usr/local/node-v6.10.3-linux-x64 /usr/local/node
rm -f node-v6.10.3-linux-x64.tar.xz
echo 'PATH=$PATH:/usr/local/node/bin' >> /etc/profile
source /etc/profile
node -v
npm -v
npm install -g nodemon
定义目录:
mkdir -p /home/node/kafka/test
安装node-kafka库:
cd /home/node/
npm install kafka-node
定义producer.js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
Client = kafka.Client;
var client = new Client('localhost:2181', 'producer-test');
var producer = new Producer(client);
var topic = 'test01';
var payloads = [
{ topic: topic, messages: 'this is test message' },
];
producer.on('ready', function() {
producer.createTopics([topic], function(err, data) {
producer.send(payloads, function(err, data) {
console.log(err || data);
process.exit();
})
})
})
producer.on('error', function(err) {
console.log('error', err);
})
定义consumer.js
var async = require('async'),
kafka = require('kafka-node'),
ConsumerGroup = kafka.ConsumerGroup;
var topic = 'test01';
var options = {
host: 'localhost:2181',
groupId: 'group-test',
sessionTimeout: 15000,
autoCommit: true,
};
var c1 = new ConsumerGroup(Object.assign({id: 'c1'}, options), topic);
c1.on('message', onMessage);
c1.on('error', onError);
var c2 = new ConsumerGroup(Object.assign({id: 'c2'}, options), topic);
c2.on('message', onMessage);
c2.on('error', onError);
var c3 = new ConsumerGroup(Object.assign({id: 'c3'}, options), topic);
c3.on('message', onMessage);
c3.on('error', onError);
function onMessage(message) {
console.log(this.client.clientId);
console.log(message);
}
function onError(err) {
console.log(err);
}
process.once('SIGINT', function() {
async.each([c1, c2, c3], function(c, cb) {
c.close(true, cb);
})
})
测试运行:
cd /home/node/kafka/test/
nodemon -w consumer.js consumer.js
node producer.js
参考:
https://www.npmjs.com/package/kafka-node
所有评论(0)