代码拉取完成,页面将自动刷新
<?php
/**
* @Author: laoweizhen <1149243551@qq.com>,
* @Date: 2022/5/11 23:09,
* @LastEditTime: 2022/5/11 23:09
*/
declare(strict_types=1);
namespace Lwz\HyperfRocketMQ;
use Hyperf\Di\Annotation\AnnotationCollector;
use Lwz\HyperfRocketMQ\Constants\MqConstant;
use Lwz\HyperfRocketMQ\Event\AfterProduce;
use Lwz\HyperfRocketMQ\Library\Model\TopicMessage;
use Lwz\HyperfRocketMQ\Library\MQProducer;
use Lwz\HyperfRocketMQ\Message\ProducerMessageInterface;
class Producer extends Builder
{
public function produce(ProducerMessageInterface $producerMessage): bool
{
$this->injectMessageProperty($producerMessage);
$poolName = $producerMessage->getPoolName();
$config = new Config($poolName);
$result = $this->checkIsProduceSuccess($this->publishMessage($config, $producerMessage));
if ($result) {
// 如果记录生产状态日志,消费成功删除日志
$producerMessage->updateMessageStatus(MqConstant::PRODUCE_STATUS_SENT);
$this->eventDispatcher && $this->eventDispatcher->dispatch(new AfterProduce($producerMessage));
// 记录日志
/*$this->setLogger($producerMessage->getLogGroup());
if ($producerMessage->getSaveProduceLog()) {
$this->logger->info('[消息生成成功]', $producerMessage->getProduceInfo());
}*/
}
return $result;
}
protected function publishMessage(Config $config, ProducerMessageInterface $message): TopicMessage
{
$producer = $this->getProducer($config, $message);
$publishMessage = new TopicMessage($message->payload());
$message->getMessageTag() && $publishMessage->setMessageTag($message->getMessageTag());
if ($timeInMillis = $message->getDeliverTime()) {
$publishMessage->setStartDeliverTime($timeInMillis);
}
return $producer->publishMessage($publishMessage);
}
/**
* 判断是否投递成功
*/
private function checkIsProduceSuccess(TopicMessage $publishRet): bool
{
// 如果返回了 message id ,则视为投递成功(不考虑,MQ存储缓存丢失情况)
return isset($publishRet->messageId) && ! empty($publishRet->messageId);
}
private function getProducer(Config $config, ProducerMessageInterface $producerMessage): MQProducer
{
return $this->getClient($config)->getProducer($config->getInstanceId(), $producerMessage->getTopic());
}
private function injectMessageProperty(ProducerMessageInterface $producerMessage)
{
if (class_exists(AnnotationCollector::class)) {
/** @var \Lwz\HyperfRocketMQ\Annotation\Producer $annotation */
$annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Annotation\Producer::class);
if ($annotation) {
$annotation->topic && $producerMessage->setTopic($annotation->topic);
$annotation->messageTag && $producerMessage->setMessageTag($annotation->messageTag);
$annotation->messageKey && $producerMessage->setMessageKey($annotation->messageKey);
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。