4 Star 5 Fork 4

Plato / Service-Box-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
channel.go 6.39 KB
一键复制 编辑 原始数据 按行查看 历史
CloudGuan 提交于 2023-06-21 14:36 . update: 支持rpc身份认证
//MIT License
//Copyright (c) 2021 cloudguan rcloudguan@163.com
//Permission is hereby granted, free of charge, to any person obtaining a copy
//of this software and associated documentation files (the "Software"), to deal
//in the Software without restriction, including without limitation the rights
//to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the Software is
//furnished to do so, subject to the following conditions:
//The above copyright notice and this permission notice shall be included in all
//copies or substantial portions of the Software.
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
//IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
//FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
//AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
//LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
//SOFTWARE.
package sbox
import (
"errors"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
"gitee.com/dennis-kk/service-box-go/common"
"gitee.com/dennis-kk/service-box-go/internal/net"
"gitee.com/dennis-kk/service-box-go/util/slog"
"github.com/panjf2000/gnet/pkg/ringbuffer"
"sync/atomic"
"time"
)
var (
errNotEnabledHeartBeat = errors.New("not config started rpc proxy heartbeat")
errLostHeartBeat = errors.New("lost heartbeat")
)
// BoxChannel implement rpc backend go transport interface
// one connect has its own channel
type BoxChannel struct {
transID uint32 //channel uuid
globalIndex protocol.GlobalIndexType //global index for
status int32 //status RUNNING, CLOSE, RECONNECT
localHost string //local ip port cache
peerHost string //peer host cache
conn net.IBoxConn //GNet connect handle
rb *ringbuffer.RingBuffer //RingBuffer impl
cache []byte //cache buffer for head
heartbeat *heartbeatTimer //heartbeat info
identityId string // Identity UUID
identityTag string // Identity Tag
}
func (bch *BoxChannel) GlobalIndex() protocol.GlobalIndexType {
return bch.globalIndex
}
func (bch *BoxChannel) SetGlobalIndex(uuid protocol.GlobalIndexType) {
bch.globalIndex = uuid
}
func NewBoxChannel(conn net.IBoxConn, cacheLen int) *BoxChannel {
if conn == nil {
return nil
}
boxchan := &BoxChannel{
conn: conn,
status: common.ChannelRunning,
localHost: conn.LocalAddr().String(),
peerHost: conn.RemoteAddr().String(),
rb: ringbuffer.New(cacheLen),
cache: make([]byte, 16),
}
return boxchan
}
func (bch *BoxChannel) LocalAddr() string {
return bch.localHost
}
func (bch *BoxChannel) RemoteAddr() string {
return bch.peerHost
}
func (bch *BoxChannel) Write(pkg []byte, length int) (int, error) {
if bch == nil {
return 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0, common.ErrChannelNotRun
}
resLen, err := bch.rb.Write(pkg[:])
if resLen != len(pkg) || err != nil {
return 0, err
}
return resLen, nil
}
func (bch *BoxChannel) Read(pkg []byte, length int) (int, error) {
if bch == nil {
return 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0, common.ErrChannelNotRun
}
//FIXME
resLen, err := bch.rb.Read(pkg)
if err != nil {
return 0, err
}
return resLen, nil
}
// Peek read buffer without modify read point
func (bch *BoxChannel) Peek(length int) ([]byte, int, error) {
if bch == nil {
return nil, 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return nil, 0, common.ErrChannelNotRun
}
//not enough buffer cache
if bch.rb.Length() < length {
return nil, 0, nil
}
head, tail := bch.rb.Peek(length)
if len(tail) != 0 {
total := len(head) + len(tail)
if total > len(bch.cache) {
bch.cache = make([]byte, total)
}
copy(bch.cache, head)
copy(bch.cache[len(head):], tail)
return bch.cache, total, nil
}
return head, len(head), nil
}
func (bch *BoxChannel) Send(pkg []byte) error {
if bch == nil {
return common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return common.ErrChannelNotRun
}
if bch.conn == nil {
return common.ErrChannelConn
}
return bch.conn.Write(pkg)
}
// Close active close net channel !
func (bch *BoxChannel) Close() {
if bch == nil {
return
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return
}
defer func() {
atomic.StoreInt32(&bch.status, common.ChannelClose)
}()
err := bch.conn.Close()
if err != nil {
slog.Warn("[BoxChannel] %d active close error %v !", bch.transID, err)
return
}
slog.Info("The application tried to close the connection %s id:%d global index %d", bch.peerHost, bch.transID, bch.globalIndex)
}
// Clean channel and recover
func (bch *BoxChannel) Clean() {
if bch == nil {
return
}
bch.conn = nil
bch.cache = nil
if bch.rb != nil {
bch.rb.Reset()
bch.rb = nil
}
if bch.heartbeat != nil {
bch.heartbeat = nil
}
atomic.StoreInt32(&bch.status, common.ChannelClose)
}
func (bch *BoxChannel) Size() uint32 {
if bch == nil {
return 0
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0
}
return uint32(bch.rb.Length())
}
func (bch *BoxChannel) IsClose() bool {
if bch == nil {
return true
}
if bch.conn == nil {
return true
}
return atomic.LoadInt32(&bch.status) != common.ChannelRunning
}
func (bch *BoxChannel) GetID() uint32 {
if bch == nil {
return 0
}
return bch.transID
}
func (bch *BoxChannel) SetID(transID uint32) {
if bch == nil {
return
}
bch.transID = transID
}
func (bch *BoxChannel) Heartbeat() error {
if bch.heartbeat == nil {
return errNotEnabledHeartBeat
}
// 设置时间戳
atomic.StoreInt64(&bch.heartbeat.lastHeartbeat, time.Now().UnixNano()/1e6)
// 返回pong 包体给 客户端
return bch.Send(bch.heartbeat.pongBuffer)
}
func (bch *BoxChannel) SetIdentityID(id string) {
bch.identityId = id
}
func (bch *BoxChannel) SetIdentityTag(tag string) {
bch.identityTag = tag
}
func (bch *BoxChannel) IdentityID() string {
return bch.identityId
}
func (bch *BoxChannel) IdentityTag() string {
return bch.identityTag
}
Go
1
https://gitee.com/dennis-kk/service-box-go.git
git@gitee.com:dennis-kk/service-box-go.git
dennis-kk
service-box-go
Service-Box-go
v0.5.17

搜索帮助