11 Star 97 Fork 19

龙之言 / phpkafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
producer.en.md 2.96 KB
一键复制 编辑 原始数据 按行查看 历史
宇润 提交于 2021-02-01 16:29 . Support producer partition strategy

Producer

Producer configuration

Class longlang\phpkafka\Producer\ProducerConfig

You can pass an array to a constructor.

Configuration keys

Key Description Default
connectTimeout Connection timeout(unit: second, decimal). -1 means no limit. -1
sendTimeout Connection timeout(unit: second, decimal). -1 means no limit. -1
recvTimeout Connection timeout(unit: second, decimal).-1 means no limit. -1
clientId Kafka client ID null
maxWriteAttempts Maximum attempts to write 3
client Kafka client used. null by default means auto recognition. null
socket Kafka Socket used. null by default means auto recognition. null
brokers Configure brokers. If configure it manually, set updateBrokers to true. Format: '127.0.0.1:9092,127.0.0.1:9093' or ['127.0.0.1:9092','127.0.0.1:9093'] null
bootstrapServers Alias bootstrapServer, used to boot the server. If configured, the server will be connected and brokers updated. Format: '127.0.0.1:9092,127.0.0.1:9093' or ['127.0.0.1:9092','127.0.0.1:9093'] null
updateBrokers Auto update brokers true
acks The producer acknowledges the leader before responding. 0 means not confirmed, 1 means the leader confirmed, -1 means ISR. 0
producerId producer ID -1
producerEpoch producer Epoch -1
partitionLeaderEpoch partition Leader Epoch -1
autoCreateTopic auto create topic true
exceptionCallback This callback is called when an exception that cannot be thrown by the recv() coroutine is encountered. Format: function(\Exception $e){} null
partitioner Partitioning strategy Default: \longlang\phpkafka\Producer\Partitioner\DefaultPartitioner

Default partitioning strategy:

If partition !== null, then use partition

If partition === null && key !== null, then use crc32(key) % partitions to select partition

If partition === null && key === null, then use Round Robin to select partition

Send a single message

Example

use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$value = (string) microtime(true);
$key = uniqid('', true);
$producer->send('test', $value, $key);

Send batch messages

Example

use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$partition0 = 0;
$partition1 = 1;
$producer->sendBatch([
    new ProduceMessage($topic, 'v1', 'k1', $partition0),
    new ProduceMessage($topic, 'v2', 'k2', $partition1),
]);
PHP
1
https://gitee.com/longzhiyan/phpkafka.git
git@gitee.com:longzhiyan/phpkafka.git
longzhiyan
phpkafka
phpkafka
master

搜索帮助