php环境下使用kafka
1.安装PHP 扩展a. 安装librdkafka 库 官网地址:https://github.com/edenhill/librdkafka $git clone https://github.com/edenhill/librdkafka.git$./configure$make$sudo make installb. 安装php-rdkafka 扩展...
·
1.安装PHP 扩展
a. 安装librdkafka 库
官网地址:https://github.com/edenhill/librdkafka
$ git clone https://github.com/edenhill/librdkafka.git
$ ./configure
$ make
$ sudo make install
b. 安装php-rdkafka 扩展
官网地址:https://github.com/arnaud-lb/php-rdkafka
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
#生成configure文件
$ phpize
#编译安装
$ ./configure --with-php-config=/usr/local/php7/bin/php-config
$ make
$ make install
#在php.ini 文件中配置 rdkafka扩展
$ vim /usr/local/php7/etc/php.ini
extension=rdkafka.so
#查看扩展是否生效
$php -m
2.编写producer方法
a.我在model层封装了一个kafka类
<?php
/**
* Created by PhpStorm.
* User: wangan
* Date: 2018/10/17
* Time: 10:06
*/
namespace App\Models;
class Kafka
{
public $broker_list = 'localhost:9092';
public $topic = 'test';
public $partition = 0;
protected $producer = null;
protected $consumer = null;
public function __construct()
{
if($this->broker_list){
}
$rk = new \RdKafka\Producer();
if(empty($rk)){
throw new \Exception('producer error');
}
$rk->setLogLevel(LOG_DEBUG);
if(!$rk->addBrokers($this->broker_list)){
throw new \Exception('addBrokers error');
}
$this->producer = $rk;
}
public function sendMessage($array_message = [])
{
$topic = $this->producer->newTopic($this->topic);
return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($array_message));
}
}
b.在controller中调用该方法
public function testKafkaProducer(){
$kafuka = new Kafka();
$kafuka->sendMessage(['cei shi aaa']);
}
实现效果如下
可以看到,浏览器没调用一次,下方的消费者都会接受到一个消息。
3.编写异步消费者方法
https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html
更多推荐
已为社区贡献2条内容
所有评论(0)