1 Star 0 Fork 0

jack/protoactor-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
pubsub_extensions.go 3.21 KB
一键复制 编辑 原始数据 按行查看 历史
490689386@qq.com 提交于 2025-05-19 14:50 +08:00 . 初始化
package cluster
import (
"gitee.com/wujianhai/protoactor-go/actor"
)
// Publisher creates a new PubSub publisher that publishes messages directly to the TopicActor
func (c *Cluster) Publisher() Publisher {
return NewPublisher(c)
}
// BatchingProducer create a new PubSub batching producer for specified topic, that publishes directly to the topic actor
func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigOption) *BatchingProducer {
return NewBatchingProducer(c.Publisher(), topic, opts...)
}
// SubscribeByPid subscribes to a PubSub topic by subscriber PID
func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error) {
res, err := c.Request(topic, TopicActorKind, &SubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}},
}, opts...)
if err != nil {
return nil, err
}
return res.(*SubscribeResponse), err
}
// SubscribeByClusterIdentity subscribes to a PubSub topic by cluster identity
func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error) {
res, err := c.Request(topic, TopicActorKind, &SubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: identity}},
}, opts...)
if err != nil {
return nil, err
}
return res.(*SubscribeResponse), err
}
// SubscribeWithReceive subscribe to a PubSub topic by providing a Receive function, that will be used to spawn a subscriber actor
func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, opts ...GrainCallOption) (*SubscribeResponse, error) {
props := actor.PropsFromFunc(receive)
pid := c.ActorSystem.Root.Spawn(props)
return c.SubscribeByPid(topic, pid, opts...)
}
// UnsubscribeByPid unsubscribes from a PubSub topic by subscriber PID
func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error) {
res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}},
}, opts...)
if err != nil {
return nil, err
}
return res.(*UnsubscribeResponse), err
}
// UnsubscribeByClusterIdentity unsubscribes from a PubSub topic by cluster identity
func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error) {
res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: identity}},
}, opts...)
if err != nil {
return nil, err
}
return res.(*UnsubscribeResponse), err
}
// UnsubscribeByIdentityAndKind unsubscribes from a PubSub topic by cluster identity
func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error) {
res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: NewClusterIdentity(identity, kind)}},
}, opts...)
if err != nil {
return nil, err
}
return res.(*UnsubscribeResponse), err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wujianhai/protoactor-go.git
git@gitee.com:wujianhai/protoactor-go.git
wujianhai
protoactor-go
protoactor-go
5633fe2499dd

搜索帮助