1 Star 0 Fork 0

qengli/go-sip-ua

Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
Clone or Download
ua.go 16.45 KB
Copy Edit Raw Blame History
LiQing authored 2022-10-17 21:39 +08:00 . 根据仓库地址改换库名
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
package ua
import (
"context"
"fmt"
"strconv"
"sync"
"gitee.com/qengli/go-sip-ua/pkg/account"
"gitee.com/qengli/go-sip-ua/pkg/auth"
"gitee.com/qengli/go-sip-ua/pkg/session"
"gitee.com/qengli/go-sip-ua/pkg/stack"
"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/transaction"
"github.com/ghettovoice/gosip/util"
"gitee.com/qengli/go-sip-ua/pkg/utils"
)
// SessionKey - Session Key for Session Storage
type SessionKey struct {
CallID sip.CallID
BranchID sip.MaybeString
}
// NewSessionKey - Build a Session Key quickly
func NewSessionKey(callID sip.CallID, branchID sip.MaybeString) SessionKey {
return SessionKey{
CallID: callID,
//BranchID: branchID,
}
}
// UserAgentConfig .
type UserAgentConfig struct {
SipStack *stack.SipStack
}
//InviteSessionHandler .
type InviteSessionHandler func(s *session.Session, req *sip.Request, resp *sip.Response, status session.Status)
//RegisterHandler .
type RegisterHandler func(regState account.RegisterState)
//UserAgent .
type UserAgent struct {
InviteStateHandler InviteSessionHandler
RegisterStateHandler RegisterHandler
config *UserAgentConfig
iss sync.Map /*Invite Session*/
log log.Logger
}
//NewUserAgent .
func NewUserAgent(config *UserAgentConfig) *UserAgent {
ua := &UserAgent{
config: config,
iss: sync.Map{},
InviteStateHandler: nil,
RegisterStateHandler: nil,
log: utils.NewLogrusLogger(log.DebugLevel, "UserAgent", nil),
}
stack := config.SipStack
stack.OnRequest(sip.INVITE, ua.handleInvite)
stack.OnRequest(sip.ACK, ua.handleACK)
stack.OnRequest(sip.BYE, ua.handleBye)
stack.OnRequest(sip.CANCEL, ua.handleCancel)
stack.OnRequest(sip.UPDATE, ua.handleUpdate)
return ua
}
func (ua *UserAgent) Log() log.Logger {
return ua.log
}
func (ua *UserAgent) handleInviteState(is *session.Session, request *sip.Request, response *sip.Response, state session.Status, tx *sip.Transaction) {
if request != nil && *request != nil {
is.StoreRequest(*request)
}
if response != nil && *response != nil {
is.StoreResponse(*response)
}
if tx != nil {
is.StoreTransaction(*tx)
}
is.SetState(state)
if ua.InviteStateHandler != nil {
ua.InviteStateHandler(is, request, response, state)
}
}
func (ua *UserAgent) buildRequest(
method sip.RequestMethod,
from *sip.Address,
to *sip.Address,
contact *sip.Address,
recipient sip.SipUri,
routes []sip.Uri,
callID *sip.CallID) (*sip.Request, error) {
builder := sip.NewRequestBuilder()
builder.SetMethod(method)
builder.SetFrom(from)
builder.SetTo(to)
builder.SetContact(contact)
builder.SetRecipient(recipient.Clone())
if len(routes) > 0 {
builder.SetRoutes(routes)
}
if callID != nil {
builder.SetCallID(callID)
}
req, err := builder.Build()
if err != nil {
ua.Log().Errorf("err => %v", err)
return nil, err
}
//ua.Log().Infof("buildRequest %s => \n%v", method, req)
return &req, nil
}
func (ua *UserAgent) SendRegister(profile *account.Profile, recipient sip.SipUri, expires uint32, userdata interface{}) (*Register, error) {
register := NewRegister(ua, profile, recipient, userdata)
err := register.SendRegister(expires)
if err != nil {
ua.Log().Errorf("SendRegister failed, err => %v", err)
return nil, err
}
return register, nil
}
func (ua *UserAgent) Invite(profile *account.Profile, target sip.Uri, recipient sip.SipUri, body *string) (*session.Session, error) {
return ua.InviteWithContext(context.TODO(), profile, target, recipient, body)
}
func (ua *UserAgent) InviteWithContext(ctx context.Context, profile *account.Profile, target sip.Uri, recipient sip.SipUri, body *string) (*session.Session, error) {
from := &sip.Address{
DisplayName: sip.String{Str: profile.DisplayName},
Uri: profile.URI,
Params: sip.NewParams().Add("tag", sip.String{Str: util.RandString(8)}),
}
contact := profile.Contact()
to := &sip.Address{
Uri: target,
}
request, err := ua.buildRequest(sip.INVITE, from, to, contact, recipient, profile.Routes, nil)
if err != nil {
ua.Log().Errorf("INVITE: err = %v", err)
return nil, err
}
if body != nil {
(*request).SetBody(*body, true)
contentType := sip.ContentType("application/sdp")
(*request).AppendHeader(&contentType)
}
var authorizer *auth.ClientAuthorizer = nil
if profile.AuthInfo != nil {
authorizer = auth.NewClientAuthorizer(profile.AuthInfo.AuthUser, profile.AuthInfo.Password)
}
resp, err := ua.RequestWithContext(ctx, *request, authorizer, false, 1)
if err != nil {
ua.Log().Errorf("INVITE: Request [INVITE] failed, err => %v", err)
return nil, err
}
if resp != nil {
stateCode := resp.StatusCode()
ua.Log().Debugf("INVITE: resp %d => %s", stateCode, resp.String())
return nil, fmt.Errorf("Invite session is unsuccessful, code: %d, reason: %s", stateCode, resp.String())
}
callID, ok := (*request).CallID()
if ok {
branchID := utils.GetBranchID(*request)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
return v.(*session.Session), nil
}
}
return nil, fmt.Errorf("invite session not found, unknown errors")
}
func (ua *UserAgent) Request(req *sip.Request) (sip.ClientTransaction, error) {
return ua.config.SipStack.Request(*req)
}
func (ua *UserAgent) handleBye(request sip.Request, tx sip.ServerTransaction) {
ua.Log().Debugf("handleBye: Request => %s, body => %s", request.Short(), request.Body())
response := sip.NewResponseFromRequest(request.MessageID(), request, 200, "OK", "")
if viaHop, ok := request.ViaHop(); ok {
var (
host string
port sip.Port
)
host = viaHop.Host
if viaHop.Params != nil {
if received, ok := viaHop.Params.Get("received"); ok && received.String() != "" {
host = received.String()
}
if viaHop.Port != nil {
port = *viaHop.Port
} else if rport, ok := viaHop.Params.Get("rport"); ok && rport != nil && rport.String() != "" {
if p, err := strconv.Atoi(rport.String()); err == nil {
port = sip.Port(uint16(p))
}
} else {
port = sip.DefaultPort(request.Transport())
}
}
dest := fmt.Sprintf("%v:%v", host, port)
response.SetDestination(dest)
}
tx.Respond(response)
callID, ok := request.CallID()
if ok {
branchID := utils.GetBranchID(request)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
is := v.(*session.Session)
ua.iss.Delete(NewSessionKey(*callID, branchID))
var transaction sip.Transaction = tx.(sip.Transaction)
ua.handleInviteState(is, &request, &response, session.Terminated, &transaction)
}
}
}
func (ua *UserAgent) handleCancel(request sip.Request, tx sip.ServerTransaction) {
ua.Log().Debugf("handleCancel: Request => %s, body => %s", request.Short(), request.Body())
response := sip.NewResponseFromRequest(request.MessageID(), request, 200, "OK", "")
tx.Respond(response)
callID, ok := request.CallID()
if ok {
branchID := utils.GetBranchID(request)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
is := v.(*session.Session)
ua.iss.Delete(NewSessionKey(*callID, branchID))
var transaction sip.Transaction = tx.(sip.Transaction)
is.SetState(session.Canceled)
ua.handleInviteState(is, &request, nil, session.Canceled, &transaction)
}
}
}
func (ua *UserAgent) handleACK(request sip.Request, tx sip.ServerTransaction) {
ua.Log().Debugf("handleACK => %s, body => %s", request.Short(), request.Body())
callID, ok := request.CallID()
if ok {
branchID := utils.GetBranchID(request)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
// handle Ringing or Processing with sdp
is := v.(*session.Session)
is.SetState(session.Confirmed)
ua.handleInviteState(is, &request, nil, session.Confirmed, nil)
}
}
}
func (ua *UserAgent) handleInvite(request sip.Request, tx sip.ServerTransaction) {
ua.Log().Debugf("handleInvite => %s, body => %s", request.Short(), request.Body())
callID, ok := request.CallID()
if ok {
var transaction sip.Transaction = tx.(sip.Transaction)
branchID := utils.GetBranchID(request)
v, found := ua.iss.Load(NewSessionKey(*callID, branchID))
if toHdr, ok := request.To(); ok && toHdr.Params.Has("tag") {
if found {
is := v.(*session.Session)
is.SetState(session.ReInviteReceived)
ua.handleInviteState(is, &request, nil, session.ReInviteReceived, &transaction)
} else {
// reinvite for transaction we have no record of; reject it
response := sip.NewResponseFromRequest(request.MessageID(), request, sip.StatusCode(481), "Call/Transaction does not exist", "")
tx.Respond(response)
}
} else {
if found {
// retransmission; reject it
response := sip.NewResponseFromRequest(request.MessageID(), request, sip.StatusCode(482), "Loop Detected", "")
tx.Respond(response)
} else {
contactHdr, _ := request.Contact()
contactAddr := ua.updateContact2UAAddr(request.Transport(), contactHdr.Address)
contactHdr.Address = contactAddr
is := session.NewInviteSession(ua.RequestWithContext, "UAS", contactHdr, request, *callID, transaction, session.Incoming, ua.Log())
ua.iss.Store(NewSessionKey(*callID, branchID), is)
is.SetState(session.InviteReceived)
ua.handleInviteState(is, &request, nil, session.InviteReceived, &transaction)
is.SetState(session.WaitingForAnswer)
}
}
}
go func() {
cancel := <-tx.Cancels()
if cancel != nil {
ua.Log().Debugf("Cancel => %s, body => %s", cancel.Short(), cancel.Body())
response := sip.NewResponseFromRequest(cancel.MessageID(), cancel, 200, "OK", "")
if callID, ok := response.CallID(); ok {
branchID := utils.GetBranchID(cancel)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
ua.iss.Delete(NewSessionKey(*callID, branchID))
is := v.(*session.Session)
is.SetState(session.Canceled)
ua.handleInviteState(is, &request, &response, session.Canceled, nil)
}
}
tx.Respond(response)
}
}()
go func() {
ack := <-tx.Acks()
if ack != nil {
ua.Log().Debugf("ack => %v", ack)
}
}()
}
func (ua *UserAgent) handleUpdate(request sip.Request, tx sip.ServerTransaction) {
ua.Log().Debugf("handleUpdate: Request => %s", request.Short())
response := sip.NewResponseFromRequest(request.MessageID(), request, 200, "OK", "")
tx.Respond(response)
}
// RequestWithContext .
func (ua *UserAgent) RequestWithContext(ctx context.Context, request sip.Request, authorizer sip.Authorizer, waitForResult bool, attempt int) (sip.Response, error) {
s := ua.config.SipStack
tx, err := s.Request(request)
if err != nil {
return nil, err
}
var cts sip.Transaction = tx.(sip.Transaction)
if request.IsInvite() {
if callID, ok := request.CallID(); ok {
branchID := utils.GetBranchID(request)
if _, found := ua.iss.Load(NewSessionKey(*callID, branchID)); !found {
contactHdr, _ := request.Contact()
contactAddr := ua.updateContact2UAAddr(request.Transport(), contactHdr.Address)
contactHdr.Address = contactAddr
is := session.NewInviteSession(ua.RequestWithContext, "UAC", contactHdr, request, *callID, cts, session.Outgoing, ua.Log())
ua.iss.Store(NewSessionKey(*callID, branchID), is)
is.ProvideOffer(request.Body())
is.SetState(session.InviteSent)
ua.handleInviteState(is, &request, nil, session.InviteSent, &cts)
}
}
}
responses := make(chan sip.Response)
provisionals := make(chan sip.Response)
errs := make(chan error)
go func() {
var lastResponse sip.Response
previousResponses := make([]sip.Response, 0)
previousResponsesStatuses := make(map[sip.StatusCode]bool)
for {
select {
case <-ctx.Done():
if lastResponse != nil && lastResponse.IsProvisional() {
s.CancelRequest(request, lastResponse)
}
if lastResponse != nil {
lastResponse.SetPrevious(previousResponses)
}
errs <- sip.NewRequestError(487, "Request Terminated", request, lastResponse)
// pull out later possible transaction responses and errors
go func() {
for {
select {
case <-tx.Done():
return
case <-tx.Errors():
case <-tx.Responses():
}
}
}()
return
case err, ok := <-tx.Errors():
if !ok {
if lastResponse != nil {
lastResponse.SetPrevious(previousResponses)
}
errs <- sip.NewRequestError(487, "Request Terminated", request, lastResponse)
return
}
switch err.(type) {
case *transaction.TxTimeoutError:
{
errs <- sip.NewRequestError(408, "Request Timeout", request, lastResponse)
return
}
}
//errs <- err
return
case response, ok := <-tx.Responses():
if !ok {
if lastResponse != nil {
lastResponse.SetPrevious(previousResponses)
}
errs <- sip.NewRequestError(487, "Request Terminated", request, lastResponse)
return
}
response = sip.CopyResponse(response)
lastResponse = response
if response.IsProvisional() {
if _, ok := previousResponsesStatuses[response.StatusCode()]; !ok {
previousResponses = append(previousResponses, response)
}
provisionals <- response
continue
}
// success
if response.IsSuccess() {
response.SetPrevious(previousResponses)
if request.IsInvite() {
s.AckInviteRequest(request, response)
s.RememberInviteRequest(request)
go func() {
for response := range tx.Responses() {
s.AckInviteRequest(request, response)
}
}()
}
responses <- response
tx.Done()
return
}
// unauth request
needAuth := (response.StatusCode() == 401 || response.StatusCode() == 407) && attempt < 2
if needAuth && authorizer != nil {
if err := authorizer.AuthorizeRequest(request, response); err != nil {
errs <- err
return
}
if response, err := ua.RequestWithContext(ctx, request, authorizer, true, attempt+1); err == nil {
responses <- response
} else {
errs <- err
}
return
}
// failed request
if lastResponse != nil {
lastResponse.SetPrevious(previousResponses)
}
errs <- sip.NewRequestError(uint(response.StatusCode()), response.Reason(), request, lastResponse)
return
}
}
}()
waitForResponse := func(cts *sip.Transaction) (sip.Response, error) {
for {
select {
case provisional := <-provisionals:
callID, ok := provisional.CallID()
if ok {
branchID := utils.GetBranchID(provisional)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
is := v.(*session.Session)
is.StoreResponse(provisional)
// handle Ringing or Processing with sdp
ua.handleInviteState(is, &request, &provisional, session.Provisional, cts)
if len(provisional.Body()) > 0 {
is.SetState(session.EarlyMedia)
ua.handleInviteState(is, &request, &provisional, session.EarlyMedia, cts)
}
}
}
case err := <-errs:
//TODO: error type switch transaction.TxTimeoutError
switch err.(type) {
case *transaction.TxTimeoutError:
//errs <- sip.NewRequestError(408, "Request Timeout", nil, nil)
return nil, err
}
request := (err.(*sip.RequestError)).Request
response := (err.(*sip.RequestError)).Response
callID, ok := request.CallID()
if ok {
branchID := utils.GetBranchID(request)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
is := v.(*session.Session)
ua.iss.Delete(NewSessionKey(*callID, branchID))
is.SetState(session.Failure)
ua.handleInviteState(is, &request, &response, session.Failure, nil)
}
}
return nil, err
case response := <-responses:
callID, ok := response.CallID()
if ok {
branchID := utils.GetBranchID(response)
if v, found := ua.iss.Load(NewSessionKey(*callID, branchID)); found {
if request.IsInvite() {
is := v.(*session.Session)
is.SetState(session.Confirmed)
ua.handleInviteState(is, &request, &response, session.Confirmed, nil)
} else if request.Method() == sip.BYE {
is := v.(*session.Session)
ua.iss.Delete(NewSessionKey(*callID, branchID))
is.SetState(session.Terminated)
ua.handleInviteState(is, &request, &response, session.Terminated, nil)
}
}
}
return response, nil
}
}
}
if !waitForResult {
go waitForResponse(&cts)
return nil, err
}
return waitForResponse(&cts)
}
func (ua *UserAgent) Shutdown() {
ua.config.SipStack.Shutdown()
}
func (ua *UserAgent) updateContact2UAAddr(transport string, from sip.ContactUri) sip.ContactUri {
stackAddr := ua.config.SipStack.GetNetworkInfo(transport)
ret := from.Clone()
ret.SetHost(stackAddr.Host)
ret.SetPort(stackAddr.Port)
return ret
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/qengli/go-sip-ua.git
git@gitee.com:qengli/go-sip-ua.git
qengli
go-sip-ua
go-sip-ua
v1.1.8

Search