Ai
1 Star 0 Fork 0

李文建/protoactor-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
endpoint_reader.go 2.13 KB
一键复制 编辑 原始数据 按行查看 历史
Potter Dai 提交于 2019-02-01 15:09 +08:00 . Double quote comments.
package remote
import (
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/log"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type endpointReader struct {
suspended bool
}
func (s *endpointReader) Connect(ctx context.Context, req *ConnectRequest) (*ConnectResponse, error) {
if s.suspended {
return nil, status.Error(codes.Canceled, "Suspended")
}
return &ConnectResponse{DefaultSerializerId: DefaultSerializerID}, nil
}
func (s *endpointReader) Receive(stream Remoting_ReceiveServer) error {
targets := make([]*actor.PID, 100)
for {
if s.suspended {
time.Sleep(time.Millisecond * 500)
continue
}
batch, err := stream.Recv()
if err != nil {
plog.Debug("EndpointReader failed to read", log.Error(err))
return err
}
// only grow pid lookup if needed
if len(batch.TargetNames) > len(targets) {
targets = make([]*actor.PID, len(batch.TargetNames))
}
for i := 0; i < len(batch.TargetNames); i++ {
targets[i] = actor.NewLocalPID(batch.TargetNames[i])
}
for _, envelope := range batch.Envelopes {
pid := targets[envelope.Target]
message, err := Deserialize(envelope.MessageData, batch.TypeNames[envelope.TypeId], envelope.SerializerId)
if err != nil {
plog.Debug("EndpointReader failed to deserialize", log.Error(err))
return err
}
// if message is system message send it as sysmsg instead of usermsg
sender := envelope.Sender
switch msg := message.(type) {
case *actor.Terminated:
rt := &remoteTerminate{
Watchee: msg.Who,
Watcher: pid,
}
endpointManager.remoteTerminate(rt)
case actor.SystemMessage:
ref, _ := actor.ProcessRegistry.GetLocal(pid.Id)
ref.SendSystemMessage(pid, msg)
default:
var header map[string]string
if envelope.MessageHeader != nil {
header = envelope.MessageHeader.HeaderData
}
localEnvelope := &actor.MessageEnvelope{
Header: header,
Message: message,
Sender: sender,
}
rootContext.Send(pid, localEnvelope)
}
}
}
}
func (s *endpointReader) suspend(toSuspend bool) {
s.suspended = toSuspend
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/lwj8507/protoactor-go.git
git@gitee.com:lwj8507/protoactor-go.git
lwj8507
protoactor-go
protoactor-go
v0.0.1

搜索帮助