使用laradock配置kafka应用

使用laradock 配置kafka应用

1. 先安装zookeeper

docker-compose up -d zookeeper

2. 安装kafka

# 需要先安装zookeeper,不然启动kafka会报错
docker-compose up -d kafka

3. 修改docker-compose配置

  • 编辑env文件新增
KAFKA_ADVERTISED_HOST=192.168.20.128
  • 修改php-fpm 与 workspace rdkafka扩展
PHP_FPM_INSTALL_RDKAFKA=true

WORKSPACE_INSTALL_RDKAFKA=true
  • 修改docker-compose 文件
### kafka ####################################################
    kafka:
      image: wurstmeister/kafka
      ports:
        - "9092:9092"
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ADVERTISED_HOST_NAME: ${KAFKA_ADVERTISED_HOST}
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${KAFKA_ADVERTISED_HOST}:9092
        KAFKA_MESSAGE_MAX_BYTES: 2000000
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      volumes:
        - ${DATA_PATH_HOST}/kafka:/kafka
        - /var/run/docker.sock:/var/run/docker.sock
      networks:
        - backend

4. 安装kafka-php 包

compose require nmred/kafka-php

5. 测试

  • producer.php
<?php
require __DIR__ . "/../vendor/autoload.php";
date_default_timezone_set('PRC');
//use Monolog\Logger;
//use Monolog\Handler\StdoutHandler;
// Create the logger
//$logger = new Logger('my_logger');
// Now add some handlers
//$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.20.128:9092');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'test',
                'value' => 'test....message.333333',
                'key' => 'testkey',
            ],
        ];
    }
);
//$producer->setLogger($logger);
$producer->success(function($result) {
    var_dump($result);
});
$producer->error(function($errorCode) {
    var_dump($errorCode);
});
$producer->send(true);
  • consumer.php
<?php
require __DIR__ . "/../vendor/autoload.php";

$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("192.168.20.128:9092");

$oObjTopic = $objRdKafka->newTopic("test");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $oMsg->errstr(), "\n";
        break;
    } else {
        echo $oMsg->payload, "\n";
    }
}

6. 该消息队列可以使用策略工厂模式进行封装使用.