Ai
1 Star 1 Fork 1

zhen/hyperf_rocketmq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
Producer.php 3.13 KB
一键复制 编辑 原始数据 按行查看 历史
zhen 提交于 2022-05-25 10:12 +08:00 . 格式化代码
<?php
/**
* @Author: laoweizhen <1149243551@qq.com>,
* @Date: 2022/5/11 23:09,
* @LastEditTime: 2022/5/11 23:09
*/
declare(strict_types=1);
namespace Lwz\HyperfRocketMQ;
use Hyperf\Di\Annotation\AnnotationCollector;
use Lwz\HyperfRocketMQ\Constants\MqConstant;
use Lwz\HyperfRocketMQ\Event\AfterProduce;
use Lwz\HyperfRocketMQ\Library\Model\TopicMessage;
use Lwz\HyperfRocketMQ\Library\MQProducer;
use Lwz\HyperfRocketMQ\Message\ProducerMessageInterface;
class Producer extends Builder
{
public function produce(ProducerMessageInterface $producerMessage): bool
{
$this->injectMessageProperty($producerMessage);
$poolName = $producerMessage->getPoolName();
$config = new Config($poolName);
$result = $this->checkIsProduceSuccess($this->publishMessage($config, $producerMessage));
if ($result) {
// 如果记录生产状态日志,消费成功删除日志
$producerMessage->updateMessageStatus(MqConstant::PRODUCE_STATUS_SENT);
$this->eventDispatcher && $this->eventDispatcher->dispatch(new AfterProduce($producerMessage));
// 记录日志
/*$this->setLogger($producerMessage->getLogGroup());
if ($producerMessage->getSaveProduceLog()) {
$this->logger->info('[消息生成成功]', $producerMessage->getProduceInfo());
}*/
}
return $result;
}
protected function publishMessage(Config $config, ProducerMessageInterface $message): TopicMessage
{
$producer = $this->getProducer($config, $message);
$publishMessage = new TopicMessage($message->payload());
$message->getMessageTag() && $publishMessage->setMessageTag($message->getMessageTag());
if ($timeInMillis = $message->getDeliverTime()) {
$publishMessage->setStartDeliverTime($timeInMillis);
}
return $producer->publishMessage($publishMessage);
}
/**
* 判断是否投递成功
*/
private function checkIsProduceSuccess(TopicMessage $publishRet): bool
{
// 如果返回了 message id ,则视为投递成功(不考虑,MQ存储缓存丢失情况)
return isset($publishRet->messageId) && ! empty($publishRet->messageId);
}
private function getProducer(Config $config, ProducerMessageInterface $producerMessage): MQProducer
{
return $this->getClient($config)->getProducer($config->getInstanceId(), $producerMessage->getTopic());
}
private function injectMessageProperty(ProducerMessageInterface $producerMessage)
{
if (class_exists(AnnotationCollector::class)) {
/** @var \Lwz\HyperfRocketMQ\Annotation\Producer $annotation */
$annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Annotation\Producer::class);
if ($annotation) {
$annotation->topic && $producerMessage->setTopic($annotation->topic);
$annotation->messageTag && $producerMessage->setMessageTag($annotation->messageTag);
$annotation->messageKey && $producerMessage->setMessageKey($annotation->messageKey);
}
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/easy_code/hyperf_rocketmq.git
git@gitee.com:easy_code/hyperf_rocketmq.git
easy_code
hyperf_rocketmq
hyperf_rocketmq
master

搜索帮助