Ai
1 Star 1 Fork 1

zhen/hyperf_rocketmq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ConsumerManager.php 3.44 KB
一键复制 编辑 原始数据 按行查看 历史
zhen 提交于 2022-05-25 10:12 +08:00 . 格式化代码
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @see https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Lwz\HyperfRocketMQ;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\ProcessManager;
use Lwz\HyperfRocketMQ\Annotation\Consumer as ConsumerAnnotation;
use Lwz\HyperfRocketMQ\Message\ConsumerMessageInterface;
use Psr\Container\ContainerInterface;
class ConsumerManager
{
private ContainerInterface $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function run()
{
$classes = AnnotationCollector::getClassesByAnnotation(ConsumerAnnotation::class);
/**
* @var string $class
* @var ConsumerAnnotation $annotation
*/
foreach ($classes as $class => $annotation) {
$instance = make($class);
if (! $instance instanceof ConsumerMessageInterface) {
continue;
}
$annotation->poolName && $instance->setPoolName($annotation->poolName);
$annotation->topic && $instance->setTopic($annotation->topic);
$annotation->groupId && $instance->setGroupId($annotation->groupId);
$annotation->messageTag && $instance->setMessageTag($annotation->messageTag);
$annotation->numOfMessage && $instance->setNumOfMessage($annotation->numOfMessage);
$annotation->waitSeconds && $instance->setWaitSeconds($annotation->waitSeconds);
$annotation->enable && $instance->setEnable($instance->isEnable());
$annotation->maxConsumption && $instance->setMaxConsumption($annotation->maxConsumption);
$annotation->openCoroutine && $instance->setOpenCoroutine($annotation->openCoroutine);
property_exists($instance, 'container') && $instance->container = $this->container;
$nums = $annotation->processNums;
$process = $this->createProcess($instance);
$process->nums = (int) $nums;
$process->name = $annotation->name . '-' . $instance->getMessageTag();
ProcessManager::register($process);
}
}
private function createProcess(ConsumerMessageInterface $consumerMessage): AbstractProcess
{
return new class($this->container, $consumerMessage) extends AbstractProcess {
/**
* @var \Hyperf\Amqp\Consumer
*/
private $consumer;
/**
* @var ConsumerMessageInterface
*/
private $consumerMessage;
public function __construct(ContainerInterface $container, ConsumerMessageInterface $consumerMessage)
{
parent::__construct($container);
$this->consumer = $container->get(Consumer::class);
$this->consumerMessage = $consumerMessage;
}
public function handle(): void
{
$this->consumer->consume($this->consumerMessage);
}
public function getConsumerMessage(): ConsumerMessageInterface
{
return $this->consumerMessage;
}
public function isEnable($server): bool
{
return $this->consumerMessage->isEnable();
}
};
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/easy_code/hyperf_rocketmq.git
git@gitee.com:easy_code/hyperf_rocketmq.git
easy_code
hyperf_rocketmq
hyperf_rocketmq
master

搜索帮助