9 Star 76 Fork 11

龙之言 / phpkafka

Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
Clone or download
producer.md 3.07 KB
Copy Edit Web IDE Raw Blame History
宇润 authored 2021-02-01 16:29 . Support producer partition strategy

生产者

生产者配置

类名:longlang\phpkafka\Producer\ProducerConfig

支持构造方法传入数组赋值

配置参数

参数名 说明 默认值
connectTimeout 连接超时时间(单位:秒,支持小数),为-1则不限制 -1
sendTimeout 发送超时时间(单位:秒,支持小数),为-1则不限制 -1
recvTimeout 接收超时时间(单位:秒,支持小数),为-1则不限制 -1
clientId Kafka 客户端标识 null
maxWriteAttempts 最大写入尝试次数 3
client 使用哪个 Kafka 客户端类,默认为null时根据场景自动识别 null
socket 使用哪个 Kafka Socket 类,默认为null时根据场景自动识别 null
brokers 手动配置 brokers 列表,如果要使用手动配置,请把updateBrokers设为true。格式:'127.0.0.1:9092,127.0.0.1:9093'['127.0.0.1:9092','127.0.0.1:9093'] null
bootstrapServers 别名bootstrapServer,引导服务器,如果配置了该值,会自动连接该服务器,并自动更新 brokers。格式:'127.0.0.1:9092,127.0.0.1:9093'['127.0.0.1:9092','127.0.0.1:9093'] null
updateBrokers 是否自动更新 brokers true
acks 生产者要求领导者,在确认请求完成之前已收到的确认数值。允许的值:0表示无确认,1表示仅领导者,-1表示完整的ISR。 0
producerId 生产者 ID -1
producerEpoch 生产者 Epoch -1
partitionLeaderEpoch 分区 Leader Epoch -1
autoCreateTopic 自动创建主题 true
exceptionCallback 遇到无法在recv()协程抛出的异常时,调用此回调。格式:function(\Exception $e){} null
partitioner 分区策略 默认策略:\longlang\phpkafka\Producer\Partitioner\DefaultPartitioner

默认分区策略:

如果指定了分区,则使用指定的分区;

如果没有指定分区,但指定了 key,会根据 key 的哈希值(crc32)选择分区;

如果没有指定分区,也没有指定 key,会使用轮询策略。

发送单个消息

代码示例:

use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$value = (string) microtime(true);
$key = uniqid('', true);
$producer->send('test', $value, $key);

批量发送消息

代码示例:

use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$partition0 = 0;
$partition1 = 1;
$producer->sendBatch([
    new ProduceMessage($topic, 'v1', 'k1', $partition0),
    new ProduceMessage($topic, 'v2', 'k2', $partition1),
]);

Comment ( 0 )

Sign in for post a comment

PHP
1
https://gitee.com/longzhiyan/phpkafka.git
git@gitee.com:longzhiyan/phpkafka.git
longzhiyan
phpkafka
phpkafka
master

Search