2 Star 0 Fork 0

slh92 / plugin-sip

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
tx.go 4.05 KB
一键复制 编辑 原始数据 按行查看 历史
package transaction
import (
"fmt"
"gitee.com/slh1992/plugin-sip/sip"
. "gitee.com/slh1992/plugin-sip/transport"
"gitee.com/slh1992/plugin-sip/utils"
"net/http"
"sync"
"time"
)
var ActiveTX *GBTxs
// GBTxs a GBTxs stands for a Gb28181 Transaction collection
type GBTxs struct {
Txs map[string]*GBTx
RWM *sync.RWMutex
}
func (txs *GBTxs) NewTX(key string, conn Connection) *GBTx {
tx := NewTransaction(key, conn)
txs.RWM.Lock()
txs.Txs[key] = tx
txs.RWM.Unlock()
return tx
}
func (txs *GBTxs) GetTX(key string) *GBTx {
txs.RWM.RLock()
tx, ok := txs.Txs[key]
if !ok {
tx = nil
}
txs.RWM.RUnlock()
return tx
}
func (txs *GBTxs) rmTX(tx *GBTx) {
txs.RWM.Lock()
delete(txs.Txs, tx.key)
txs.RWM.Unlock()
}
// GBTx Gb28181 Transaction
type GBTx struct {
conn Connection
key string
closed int
resp chan *sip.Response
active chan int
*Core
}
// NewTransaction create a new GBtx
func NewTransaction(key string, conn Connection) *GBTx {
tx := &GBTx{conn: conn, key: key, resp: make(chan *sip.Response, 10), active: make(chan int, 1), closed: 0}
go tx.watch()
return tx
}
// Key returns the GBTx Key
func (tx *GBTx) Key() string {
return tx.key
}
func (tx *GBTx) Conn() Connection {
return tx.conn
}
func (tx *GBTx) watch() {
timer := time.NewTimer(20 * time.Second)
defer timer.Stop()
for {
select {
case act := <-tx.active:
if act < 0 {
tx.Close()
return
}
//Println("active tx", tx.Key(), time.Now().Format("2006-01-02 15:04:05"))
case <-timer.C:
tx.Close()
//fmt.Println("watch closed tx", tx.key, time.Now().Format("2006-01-02 15:04:05"))
return
}
}
}
// GetResponse GetResponse
func (tx *GBTx) GetResponse() *sip.Response {
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case res := <-tx.resp:
if res == nil {
return res
}
if res.GetStatusCode() != http.StatusContinue && res.GetStatusCode() != http.StatusSwitchingProtocols {
tx.active <- -1
return res
}
tx.active <- 2
case <-timer.C:
return nil
}
}
}
// Close the Close function closes the GBTx
func (tx *GBTx) Close() {
if tx.closed == 1 {
return
}
//fmt.Printf("closed tx: %s %s TXs: %d\n", tx.key, time.Now().Format("2006-01-02 15:04:05"), len(ActiveTX.Txs))
ActiveTX.rmTX(tx)
if tx.resp != nil {
close(tx.resp)
}
if tx.active != nil {
close(tx.active)
}
tx.closed = 1
}
// ReceiveResponse receive a Response
func (tx *GBTx) ReceiveResponse(msg *sip.Response) {
defer func() {
if r := recover(); r != nil {
fmt.Println("send to closed channel, txkey:", tx.key, "message: \n", msg)
}
}()
//fmt.Println("receiveResponse tx", tx.Key(), time.Now().Format("2006-01-02 15:04:05"))
tx.resp <- msg
tx.active <- 1
}
// Respond Respond
func (tx *GBTx) Respond(res *sip.Response) error {
str, _ := sip.Encode(res.Message)
//fmt.Println("send response,to:", (res.DestAdd).String(), "txkey:", tx.key, "message: \n", string(str))
var err error
if tx.client {
_, err = tx.conn.Write(str)
} else {
_, err = tx.conn.WriteTo(str, res.DestAdd)
}
return err
}
// Request Request
func (tx *GBTx) Request(req *sip.Request) error {
str, _ := sip.Encode(req.Message)
//fmt.Println("send Request,to:", (req.DestAdd).String(), "txkey:", tx.key, "message: \n", string(str))
var err error
if tx.client {
_, err = tx.conn.Write(str)
} else {
_, err = tx.conn.WriteTo(str, req.DestAdd)
}
return err
}
func GetTXKey(msg *sip.Message) (key string) {
if len(msg.CallID) > 0 {
key = msg.CallID
} else {
key = utils.RandString(10)
}
return
}
func (tx *GBTx) SipResponse() (*sip.Response, error) {
response := tx.GetResponse()
if response == nil {
return nil, utils.NewError(nil, "response timeout", "tx key:", tx.Key())
}
if response.GetStatusCode() != http.StatusOK {
return response, utils.NewError(nil, "response fail", response.GetStatusCode(), response.GetReason(), "tx key:", tx.Key())
}
return response, nil
}
func (tx *GBTx) SipRequestForResponse(req *sip.Request) (response *sip.Response, err error) {
err = tx.Request(req)
if err == nil {
return tx.SipResponse()
}
return
}
Go
1
https://gitee.com/slh1992/plugin-sip.git
git@gitee.com:slh1992/plugin-sip.git
slh1992
plugin-sip
plugin-sip
v1.3.9

搜索帮助

53164aa7 5694891 3bd8fe86 5694891