SchuEngine schu-queue 基于ZendFramework3和Kafka的队列服务模块

简介

schu-queue是一个基于ZendFramework3的队列服务,目前支持Kafka集群.使用PHP-RdKafka作为Kafka的驱动.支持HighLevelConsumerLowLevelConsumer.

使用zend-servicemanager和简单的配置文件,可以轻松获得Kafka集群的Producer和Consumer实例.

SchuEngine下的所有模块目前全部使用zend-servicemanager作为统一入口,好处在与zend-servicemanager可以实现灵活的工厂模型,以及对类的复用.

依赖

请看composer.json

安装

$ composer require schuengine/schu-queue

配置文件


'schu_queue' => [
    'adapter' => 'rdkafka',
    'producer' => [
        'log_level' => LOG_DEBUG,
        'metadata.broker.list' => 'PLAINTEXT://localhost:9092',
        'topics' => [
            'test' => [
                'message.timeout.ms' => 3600,
            ],
            'test2' => [
                'message.timeout.ms' => 3600,
                'error.error' => '',
            ],
        ],
    ],
    'consumer' => [
        'group.id' => 'consumer',
        'metadata.broker.list' => 'PLAINTEXT://localhost:9092',
        'topics' => [
            'test' => [
                'consumer.level' => 'low',
            ],
            'test2' => [
                'consumer.level' => 'high',
            ],
        ],
    ],
],

  1. adapter: 适配器.目前只支持RdKafka.

  2. producer: 生产者配置块.

    里面包含:

    1. producer配置: 例如'log_level','metadata.broker.list'.点此查看详细配置字段
    2. topic配置: producer可以包含多个topic,将这些topic配置文件放在producer下的topics字段中.点此查看详细配置字段
  3. consumer: 消费者配置块.和producer模块类似.点此查看详细配置字段

使用

前提:

  1. schu-queue在一个实例中,只能有一个producer和一个consumer.
  2. 每个实例的producerconsumer可以有多个topic供生产和消费.

入口:

在配置好schu-queue后,使用如下方式获得该模块的入口:

$producer = $serviceManager->get('schu-queue-producer');
$consumer = $serviceManager->get('schu-queue-consumer');
$adapter = $serviceManager->get('schu-queue');

如果想获取php-rdkafka中类的实例,然后自己配置生产者消费者模型,也可以使用zend-servicemanager:

$conf = $serviceManager->get('RdKafka\Conf');
$conf->set('metadata.broker.list', 'PLAINTEXT://localhost:9092');
//...

$producer = $serviceManager->build('RdKafka\Producer', [$conf]);
//...

生产:

producer接口如下:

interface ProducerInterface
{
    /**
     * Produce message to message server.
     * 
     * @param  string $topic
     * @param  string $message
     * @return integer
     */
    public function produce(string $topic, string $message);

    /**
     * Get Adapter
     *
     * @return \Schu\Queue\Adapter\MessageServerAdapterInterface
     */
    public function getAdapter();
}

继续上文中zend-servicemanager获得produce后的例子:

$producer->produce('test', 'hello world');

消费:

consumer接口如下:

interface ConsumerInterface
{
    /**
     * Consume a message
     *
     * @param  string $topic
     * @param  array $options
     * @return \RdKafka\message
    */
    public function consume(string $topic, array $options = null);
 
    /**
     * Get adapter
     *
     * @return \Schu\Queue\Adapter\MessageServerAdapterInterface
     */
    public function getAdapter();
}

继而:

$options = [
    'partition' => 0,
    'position'  => RD_KAFKA_OFFSET_STORED,
    'timeout'   => 120*1200,
];

$message = $consumer->consume('test', $options);

这里需要一个特殊的参数$options,这个参数根据adapter参数的变化而有具体要求,在配置文件中'adapter' => 'rdkafka'时,$options有以上三个参数:

  1. 'partition'是指consumer消费的topic的分区号,在配置文件中'consumer.level' => 'high'时,不需要这个参数,因为RdKafkaHighLevel``consumer的分区是自动的.
  2. 'position',是指consumer开始消费的位置, RD_KAFKA_OFFSET_STORED是指从存储的上次消费的位置开始.
  3. 'timeout'是指超时时间,单位为毫秒(ms).

发表新评论