Ai
1 Star 0 Fork 0

SasukeBo/go-micro

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
stream.go 1.50 KB
一键复制 编辑 原始数据 按行查看 历史
汪波 提交于 2023-02-23 10:27 +08:00 . fix: 替换包名
// Package stream encapsulates streams within streams
package stream
import (
"context"
"sync"
"gitee.com/sasukebo/go-micro/v4/client"
"gitee.com/sasukebo/go-micro/v4/codec"
"gitee.com/sasukebo/go-micro/v4/metadata"
"gitee.com/sasukebo/go-micro/v4/server"
)
type Stream interface {
Context() context.Context
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
}
type stream struct {
Stream
sync.RWMutex
err error
request *request
}
type request struct {
client.Request
context context.Context
}
func (r *request) Codec() codec.Reader {
return r.Request.Codec().(codec.Reader)
}
func (r *request) Header() map[string]string {
md, _ := metadata.FromContext(r.context)
return md
}
func (r *request) Read() ([]byte, error) {
return nil, nil
}
func (s *stream) Request() server.Request {
return s.request
}
func (s *stream) Send(v interface{}) error {
err := s.Stream.SendMsg(v)
if err != nil {
s.Lock()
s.err = err
s.Unlock()
}
return err
}
func (s *stream) Recv(v interface{}) error {
err := s.Stream.RecvMsg(v)
if err != nil {
s.Lock()
s.err = err
s.Unlock()
}
return err
}
func (s *stream) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
// New returns a new encapsulated stream
// Proto stream within a server.Stream
func New(service, endpoint string, req interface{}, s Stream) server.Stream {
return &stream{
Stream: s,
request: &request{
context: s.Context(),
Request: client.DefaultClient.NewRequest(service, endpoint, req),
},
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sasukebo/go-micro.git
git@gitee.com:sasukebo/go-micro.git
sasukebo
go-micro
go-micro
v4.7.1

搜索帮助