使用laradock配置kafka应用
2021-09-21
3 min read
使用laradock 配置kafka应用
1. 先安装zookeeper
docker-compose up -d zookeeper
2. 安装kafka
# 需要先安装zookeeper,不然启动kafka会报错
docker-compose up -d kafka
3. 修改docker-compose配置
- 编辑env文件新增
KAFKA_ADVERTISED_HOST=192.168.20.128
- 修改php-fpm 与 workspace rdkafka扩展
PHP_FPM_INSTALL_RDKAFKA=true
WORKSPACE_INSTALL_RDKAFKA=true
- 修改docker-compose 文件
### kafka ####################################################
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: ${KAFKA_ADVERTISED_HOST}
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${KAFKA_ADVERTISED_HOST}:9092
KAFKA_MESSAGE_MAX_BYTES: 2000000
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- ${DATA_PATH_HOST}/kafka:/kafka
- /var/run/docker.sock:/var/run/docker.sock
networks:
- backend
4. 安装kafka-php 包
compose require nmred/kafka-php
5. 测试
- producer.php
<?php
require __DIR__ . "/../vendor/autoload.php";
date_default_timezone_set('PRC');
//use Monolog\Logger;
//use Monolog\Handler\StdoutHandler;
// Create the logger
//$logger = new Logger('my_logger');
// Now add some handlers
//$logger->pushHandler(new StdoutHandler());
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.20.128:9092');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'test',
'value' => 'test....message.333333',
'key' => 'testkey',
],
];
}
);
//$producer->setLogger($logger);
$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode) {
var_dump($errorCode);
});
$producer->send(true);
- consumer.php
<?php
require __DIR__ . "/../vendor/autoload.php";
$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("192.168.20.128:9092");
$oObjTopic = $objRdKafka->newTopic("test");
/**
* consumeStart
* 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
* 第二个参数标识从什么位置开始拉取消息,可选值为
* RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
* RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
* RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
*/
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);
while (true) {
// 第一个参数是分区,第二个参数是超时时间
$oMsg = $oObjTopic->consume(0, 1000);
// 没拉取到消息时,返回NULL
if (!$oMsg) {
usleep(10000);
continue;
}
if ($oMsg->err) {
echo $oMsg->errstr(), "\n";
break;
} else {
echo $oMsg->payload, "\n";
}
}