# Zinx **Repository Path**: WHU_Liyunfan/zinx ## Basic Information - **Project Name**: Zinx - **Description**: 通过Zinx框架学习基于Golang的长连接并发服务器框架开发 - **Primary Language**: Go - **License**: Unlicense - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-03-18 - **Last Updated**: 2022-03-31 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 介绍 Zinx是一款TCP长连接轻量级框架,其核心是通过实现一个工作池对连接进行复用,采用消息任务队列对任务进行缓冲,在实现过程中也包括一些处理粘包问题的手段,线程之间通信的实现等。设计模式:面向接口编程。 # Zinx架构 ![输入图片说明](src/images/imgszinx%E6%9E%B6%E6%9E%84.png) # 自底向上实现核心功能 ## 消息封装 message包的结构包括消息ID,数据,数据长度,使用TLV( Type-Len-Value )封包格式来解决TCP粘包问题, 所以Zinx此时应该提供一个统一的拆包和封包的方法。在发包之前打包成有head和body的两部分的包,在收到数据的时候分两次进行读取,先读取固定长度的head部分,得到后续Data的长度,再根据DataLen读取之后的body。这样就能够解决粘包的问题了。 ```go package znet import ( "bytes" "encoding/binary" "errors" "zinx/src/utils" "zinx/src/ziface" ) // DataPack 封包拆包类实例,暂时不需要成员 type DataPack struct{} // NewDataPack 实例化方法 func NewDataPack() *DataPack { return &DataPack{} } // GetHeadLen 获取包头长度方法 func (dp *DataPack) GetHeadLen() uint32 { //Id uint32 + DataLen uint32 return 8 } // Pack 封包方法(压缩数据) func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) { //创建一个用于存放bytes字节的缓冲 dataBuff := bytes.NewBuffer([]byte{}) //写dataLen,按照LittleEndian序将msg中的信息写入到dataBuff字节缓冲区中 if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil { return nil, err } if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil { return nil, err } if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil { return nil, err } return dataBuff.Bytes(), nil } // Unpack 拆包方法,解压数据 func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) { //创建一个从输入读取二进制数据的ioReader dataBuff := bytes.NewReader(binaryData) //只解压head的信息,得到dataLen和msgID,怎么做到的? msg := &Message{} if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil { //把dataBuff的数据读到msg中去,因为DataLen为uint32,所以读了8字节的数据就会停 return nil, err } if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil { return nil, err } //判断dataLen的长度是否超出我们允许的最大包长度 if (utils.GlobalObject.MaxPacketSize > 0) && (msg.DataLen > utils.GlobalObject.MaxPacketSize) { return nil, errors.New("too large msg data received") } //只需要head的数据拆包出来就行,然后通过head的长度再从conn读取一次数据 return msg, nil } /*需要注意的是Unpack方法,我们进行拆包的时候是分两次过程的,第二次是依赖第一次的dataLen结果, 所以Unpack只能解压出包头head的内容,得到msgId 和 dataLen。 之后调用者再根据dataLen继续从io流中读取body中的数据。*/ ``` ## 多路由模式 基于map的数据结构实现即可 ## 读写分离 开启两个协程,分别用于处理读数据和写数据的请求,并且通过有缓冲的Channel进行通信,实现同步。 ![输入图片说明](src/images/imgs%E8%AF%BB%E5%86%99%E5%88%86%E7%A6%BB.png) ```go package znet import ( "errors" "fmt" "io" "net" "sync" "zinx/src/utils" "zinx/src/ziface" ) type Connection struct { //当前Conn属于哪个Server TcpServer ziface.IServer Conn *net.TCPConn ConnID uint32 isClosed bool MsgHandler ziface.IMsgHandle //消息管理者 //告知该连接已经退出/停止的channel ExitBuffChan chan bool //无缓冲管道,用于读写的两个协程之间的消息通信 msgChan chan []byte msgBuffChan chan []byte //连接属性 property map[string]interface{} //保护连接属性修改的读写锁 propertyLock sync.RWMutex } func (c *Connection) SetProperty(key string, value interface{}) { c.propertyLock.Lock() //加写锁 defer c.propertyLock.Unlock() c.property[key] = value } func (c *Connection) GetProperty(key string) (interface{}, error) { c.propertyLock.RLock() defer c.propertyLock.RUnlock() if value, ok := c.property[key]; ok { return value, nil } else { return nil, errors.New("no property found") } } func (c *Connection) RemoveProperty(key string) { c.propertyLock.Lock() defer c.propertyLock.Unlock() delete(c.property, key) } // NewConnection 创建一个连接的结构体的函数 func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { c := &Connection{ TcpServer: server, Conn: conn, ConnID: connID, isClosed: false, MsgHandler: msgHandler, //记得要把路由也注册到connection中 ExitBuffChan: make(chan bool, 1), //channel的缓冲区大小为1 msgChan: make(chan []byte), msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), property: make(map[string]interface{}), } //将新创建的conn添加到连接管理中 c.TcpServer.GetConnMgr().Add(c) //避免TcpServer空指针 return c } // StartWriter 写消息的协程,用户将数据发送到客户端 func (c *Connection) StartWriter() { fmt.Println("[write goroutine is running]") defer fmt.Println(c.RemoteAddr().String(), "[conn writer exit...]") for { select { case data, _ := <-c.msgChan: //有数据要写给客户端 if _, err := c.Conn.Write(data); err != nil { fmt.Println("send data error", err) return } case <-c.ExitBuffChan: //如果从其中能够取出true,说明conn已经关闭 return case data, ok := <-c.msgBuffChan: if ok { if _, err := c.Conn.Write(data); err != nil { fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit") return } } else { break } } } } func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error { if c.isClosed == true { return errors.New("connection closed when send buff msg") } //将data封包,并且发送 dp := NewDataPack() msg, err := dp.Pack(NewMsgPackage(msgId, data)) if err != nil { fmt.Println("Pack error msg id = ", msgId) return errors.New("Pack error msg ") } //写回客户端 c.msgBuffChan <- msg return nil } // SendMsg 直接从服务端将msg数据发送数据给远程的tcp客户端 func (c *Connection) SendMsg(msgId uint32, data []byte) error { if c.isClosed == true { return errors.New("connection closed when send msg") } //将data封包 dp := NewDataPack() msg, err := dp.Pack(NewMsgPackage(msgId, data)) if err != nil { fmt.Println("Pack error msg id =", msgId) return errors.New("pack error msg") } //将封包的msg发送给channel给writer读取 c.msgChan <- msg return nil } //StartReader 处理conn读数据的协程 //添加:在Connection调用注册的Router处理业务 func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running...") defer fmt.Println(c.RemoteAddr().String(), "conn reader exit...") defer c.Stop() for { //创建拆包解包的对象 dp := NewDataPack() //读取客户端的msg head headData := make([]byte, dp.GetHeadLen()) if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil { fmt.Println("read msg head error,", err) c.ExitBuffChan <- true //通知其他goroutine退出 continue } //拆包,得到msgID和DataLen放在msg中 msg, err := dp.Unpack(headData) if err != nil { fmt.Println("unpack error,", err) c.ExitBuffChan <- true continue } //根据dataLen读取data到msg.data中 var data []byte if msg.GetDataLen() > 0 { data := make([]byte, msg.GetDataLen()) if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil { fmt.Println("read msg data error", err) c.ExitBuffChan <- true continue } } msg.SetData(data) //根据上述msg结果得到当前客户端请求的Request数据,进行实例化 req := Request{ conn: c, msg: msg, } if utils.GlobalObject.WorkerPoolSize > 0 { //如果配置了工作池机制,就把任务交给他们去处理 c.MsgHandler.SendMsgToTaskQueue(&req) } else { go c.MsgHandler.DoMsgHandler(&req) } } } func (c *Connection) Start() { //开启读写协程 go c.StartReader() go c.StartWriter() //执行钩子方法 c.TcpServer.CallOnConnStart(c) } func (c *Connection) Stop() { fmt.Printf("conn stop...connID = %d\n", c.ConnID) if c.isClosed == true { return } c.isClosed = true //执行钩子方法 c.TcpServer.CallOnConnStop(c) //关闭连接 err := c.Conn.Close() if err != nil { fmt.Println("Conn close error", err) } //通知从缓冲队列读取数据的业务,该连接已经被关闭了 c.ExitBuffChan <- true //将该连接从连接管理器中删除 c.TcpServer.GetConnMgr().Remove(c) //通过connection调用连接管理器 close(c.ExitBuffChan) //关闭所有的通道 close(c.msgBuffChan) } func (c *Connection) GetTCPConnection() *net.TCPConn { return c.Conn } func (c *Connection) GetConnID() uint32 { return c.ConnID } func (c *Connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() } ``` ## 消息队列与任务池机制 通过worker的数量来限定处理业务的固定goroutine数量,而不是无限制的开辟Goroutine,用消息队列来缓冲worker工作的数据。 每个消息任务队列都对应一个worker。 ![输入图片说明](src/images/imgs%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97.png) 核心代码: ```go package znet import ( "fmt" "strconv" "zinx/src/utils" "zinx/src/ziface" ) type MsgHandle struct { Apis map[uint32]ziface.IRouter //存放msgid对应的处理方法路由 WorkerPoolSize uint32 //业务worker池的size TaskQueue []chan ziface.IRequest //worker负责取任务的消息队列 } func NewMsgHandle() *MsgHandle { return &MsgHandle{ Apis: make(map[uint32]ziface.IRouter), WorkerPoolSize: utils.GlobalObject.WorkerPoolSize, //1个worker对应一个消息queue,即该channel的缓冲长度 //用来缓冲提供worker调用的Request请求信息, //worker会从对应的队列中获取客户端的请求数据并且处理掉。 TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize), } } // SendMsgToTaskQueue 将消息交给TaskQueue,由worker进行处理 func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) { //根据ConnID来分配当前的连接该由哪个worker负责处理 //轮询的平均分配法则 workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID) //将请求消息发送给对应的任务队列 mh.TaskQueue[workerID] <- request } // StartOneWorker 启动一个worker工作流程 func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) { fmt.Printf("worker id = %d,is started\n", workerID) //阻塞等待队列中的消息 for { select { case request := <-taskQueue: mh.DoMsgHandler(request) } } } // StartWorkerPool 启动worker工作池 func (mh *MsgHandle) StartWorkerPool() { //遍历需要启动的worker数量,依次启动 for i := 0; i < int(mh.WorkerPoolSize); i++ { //1个worker被启动,给当前worker对应的任务队列开辟空间 mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) //启动当前worker,阻塞等待对应的消息队列是否有消息传递进来 go mh.StartOneWorker(i, mh.TaskQueue[i]) } } // DoMsgHandler 以非阻塞方式调用处理方法处理消息 func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { handler, ok := mh.Apis[request.GetMsgID()] if !ok { fmt.Printf("api msgId = %d,is not FOUND\n", request.GetMsgID()) return } //执行对应的处理方法 handler.PreHandle(request) handler.Handle(request) handler.PostHandle(request) } // AddRouter 添加msgId对应的路由器处理者 func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) { //1.判断当前msg绑定的api处理方法是否已经存在 if _, ok := mh.Apis[msgId]; ok { panic("repeated api,msgId = " + strconv.Itoa(int(msgId))) } //2.添加msg与api的绑定关系 mh.Apis[msgId] = router fmt.Println("add api msgId =", msgId) } ```