1 Star 0 Fork 0

ZJOOPS / gosip

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
client_tx.go 21.06 KB
Copy Edit Raw Blame History
zhangjun authored 2023-08-25 10:37 . -fix 修复取消事务的bug
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962
package transaction
import (
"fmt"
"sync"
"time"
"github.com/discoviking/fsm"
"gitee.com/zhjun2512/gosip/log"
"gitee.com/zhjun2512/gosip/sip"
"gitee.com/zhjun2512/gosip/timing"
)
type ClientTx interface {
Tx
Responses() <-chan sip.Response
Cancel() error
}
type clientTx struct {
commonTx
responses chan sip.Response
timer_a_time time.Duration // Current duration of timer A.
timer_a timing.Timer
timer_b timing.Timer
timer_d_time time.Duration // Current duration of timer D.
timer_d timing.Timer
timer_m timing.Timer
reliable bool
mu sync.RWMutex
closeOnce sync.Once
}
func NewClientTx(origin sip.Request, tpl sip.Transport, logger log.Logger) (ClientTx, error) {
origin = prepareClientRequest(origin)
key, err := MakeClientTxKey(origin)
if err != nil {
return nil, err
}
tx := new(clientTx)
tx.key = key
tx.tpl = tpl
// buffer chan - about ~10 retransmit responses
tx.responses = make(chan sip.Response, 64)
tx.errs = make(chan error, 64)
tx.done = make(chan bool)
tx.log = logger.
WithPrefix("transaction.ClientTx").
WithFields(
origin.Fields().WithFields(log.Fields{
"transaction_ptr": fmt.Sprintf("%p", tx),
"transaction_key": tx.key,
}),
)
tx.origin = origin.WithFields(log.Fields{
"transaction_ptr": fmt.Sprintf("%p", tx),
"transaction_key": tx.key,
}).(sip.Request)
tx.reliable = tx.tpl.IsReliable(origin.Transport())
return tx, nil
}
func prepareClientRequest(origin sip.Request) sip.Request {
if viaHop, ok := origin.ViaHop(); ok {
if viaHop.Params == nil {
viaHop.Params = sip.NewParams()
}
if !viaHop.Params.Has("branch") {
viaHop.Params.Add("branch", sip.String{Str: sip.GenerateBranch()})
}
} else {
viaHop = &sip.ViaHop{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Params: sip.NewParams().
Add("branch", sip.String{Str: sip.GenerateBranch()}),
}
origin.PrependHeader(sip.ViaHeader{viaHop})
}
return origin
}
func (tx *clientTx) Init() error {
tx.initFSM()
if err := tx.tpl.Send(tx.Origin()); err != nil {
tx.mu.Lock()
tx.lastErr = err
tx.mu.Unlock()
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_transport_err); err != nil {
tx.Log().Errorf("spin FSM to client_input_transport_err failed: %s", err)
}
tx.fsmMu.RUnlock()
return err
}
if tx.reliable {
tx.mu.Lock()
tx.timer_d_time = 0
tx.mu.Unlock()
} else {
// RFC 3261 - 17.1.1.2.
// If an unreliable transport is being used, the client transaction MUST start timer A with a value of T1.
// If a reliable transport is being used, the client transaction SHOULD NOT
// start timer A (Timer A controls request retransmissions).
// Timer A - retransmission
tx.Log().Tracef("timer_a set to %v", Timer_A)
tx.mu.Lock()
tx.timer_a_time = Timer_A
tx.timer_a = timing.AfterFunc(tx.timer_a_time, func() {
select {
case <-tx.done:
return
default:
}
tx.Log().Trace("timer_a fired")
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_timer_a); err != nil {
tx.Log().Errorf("spin FSM to client_input_timer_a failed: %s", err)
}
tx.fsmMu.RUnlock()
})
// Timer D is set to 32 seconds for unreliable transports
tx.timer_d_time = Timer_D
tx.mu.Unlock()
}
// Timer B - timeout
tx.Log().Tracef("timer_b set to %v", Timer_B)
tx.mu.Lock()
tx.timer_b = timing.AfterFunc(Timer_B, func() {
select {
case <-tx.done:
return
default:
}
tx.Log().Trace("timer_b fired")
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_timer_b); err != nil {
tx.Log().Errorf("spin FSM to client_input_timer_b failed: %s", err)
}
tx.fsmMu.RUnlock()
})
tx.mu.Unlock()
tx.mu.RLock()
err := tx.lastErr
tx.mu.RUnlock()
return err
}
func (tx *clientTx) Receive(msg sip.Message) error {
res, ok := msg.(sip.Response)
if !ok {
return &sip.UnexpectedMessageError{
Err: fmt.Errorf("%s recevied unexpected %s", tx, msg.Short()),
Msg: msg.String(),
}
}
res = res.WithFields(log.Fields{
"request_id": tx.origin.MessageID(),
}).(sip.Response)
var input fsm.Input
if res.IsCancel() {
input = client_input_canceled
} else {
tx.mu.Lock()
tx.lastResp = res
tx.mu.Unlock()
switch {
case res.IsProvisional():
input = client_input_1xx
case res.IsSuccess():
input = client_input_2xx
default:
input = client_input_300_plus
}
}
tx.fsmMu.RLock()
defer tx.fsmMu.RUnlock()
return tx.fsm.Spin(input)
}
func (tx *clientTx) Responses() <-chan sip.Response {
return tx.responses
}
func (tx *clientTx) Cancel() error {
tx.fsmMu.RLock()
defer tx.fsmMu.RUnlock()
return tx.fsm.Spin(client_input_cancel)
}
func (tx *clientTx) Terminate() {
select {
case <-tx.done:
return
default:
}
tx.delete()
}
func (tx *clientTx) cancel() {
if !tx.Origin().IsInvite() {
return
}
tx.mu.RLock()
lastResp := tx.lastResp
tx.mu.RUnlock()
cancelRequest := sip.NewCancelRequest("", tx.Origin(), log.Fields{
"sent_at": time.Now(),
})
if err := tx.tpl.Send(cancelRequest); err != nil {
var lastRespStr string
if lastResp != nil {
lastRespStr = lastResp.Short()
}
tx.Log().WithFields(map[string]interface{}{
"invite_request": tx.Origin().Short(),
"invite_response": lastRespStr,
"cancel_request": cancelRequest.Short(),
}).Errorf("send CANCEL request failed: %s", err)
tx.mu.Lock()
tx.lastErr = err
tx.mu.Unlock()
go func() {
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_transport_err); err != nil {
tx.Log().Errorf("spin FSM to client_input_transport_err failed: %s", err)
}
tx.fsmMu.RUnlock()
}()
}
}
func (tx *clientTx) ack() {
tx.mu.RLock()
lastResp := tx.lastResp
tx.mu.RUnlock()
ack := sip.NewAckRequest("", tx.Origin(), lastResp, "", log.Fields{
"sent_at": time.Now(),
})
err := tx.tpl.Send(ack)
if err != nil {
tx.Log().WithFields(log.Fields{
"invite_request": tx.Origin().Short(),
"invite_response": lastResp.Short(),
"ack_request": ack.Short(),
}).Errorf("send ACK request failed: %s", err)
tx.mu.Lock()
tx.lastErr = err
tx.mu.Unlock()
go func() {
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_transport_err); err != nil {
tx.Log().Errorf("spin FSM to client_input_transport_err failed: %s", err)
}
tx.fsmMu.RUnlock()
}()
}
}
// FSM States
const (
client_state_calling = iota
client_state_proceeding
client_state_completed
client_state_accepted
client_state_terminated
)
// FSM Inputs
const (
client_input_1xx fsm.Input = iota
client_input_2xx
client_input_300_plus
client_input_timer_a
client_input_timer_b
client_input_timer_d
client_input_timer_m
client_input_transport_err
client_input_delete
client_input_cancel
client_input_canceled
)
// Initialises the correct kind of FSM based on request method.
func (tx *clientTx) initFSM() {
if tx.Origin().IsInvite() {
tx.initInviteFSM()
} else {
tx.initNonInviteFSM()
}
}
func (tx *clientTx) initInviteFSM() {
tx.Log().Debug("initialising INVITE transaction FSM")
// Define States
// Calling
client_state_def_calling := fsm.State{
Index: client_state_calling,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_proceeding, tx.act_invite_proceeding},
client_input_2xx: {client_state_accepted, tx.act_passup_accept},
client_input_300_plus: {client_state_completed, tx.act_invite_final},
client_input_cancel: {client_state_calling, tx.act_cancel},
client_input_canceled: {client_state_calling, tx.act_invite_canceled},
client_input_timer_a: {client_state_calling, tx.act_invite_resend},
client_input_timer_b: {client_state_terminated, tx.act_timeout},
client_input_transport_err: {client_state_terminated, tx.act_trans_err},
},
}
// Proceeding
client_state_def_proceeding := fsm.State{
Index: client_state_proceeding,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_proceeding, tx.act_passup},
client_input_2xx: {client_state_accepted, tx.act_passup_accept},
client_input_300_plus: {client_state_completed, tx.act_invite_final},
client_input_cancel: {client_state_proceeding, tx.act_cancel_timeout},
client_input_canceled: {client_state_proceeding, tx.act_invite_canceled},
client_input_timer_a: {client_state_proceeding, fsm.NO_ACTION},
client_input_timer_b: {client_state_terminated, tx.act_timeout},
client_input_transport_err: {client_state_terminated, tx.act_trans_err},
},
}
// Completed
client_state_def_completed := fsm.State{
Index: client_state_completed,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_completed, fsm.NO_ACTION},
client_input_2xx: {client_state_completed, fsm.NO_ACTION},
client_input_300_plus: {client_state_completed, tx.act_ack},
client_input_cancel: {client_state_completed, fsm.NO_ACTION},
client_input_canceled: {client_state_completed, fsm.NO_ACTION},
client_input_transport_err: {client_state_terminated, tx.act_trans_err},
client_input_timer_a: {client_state_completed, fsm.NO_ACTION},
client_input_timer_b: {client_state_completed, fsm.NO_ACTION},
client_input_timer_d: {client_state_terminated, tx.act_delete},
},
}
client_state_def_accepted := fsm.State{
Index: client_state_accepted,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_accepted, fsm.NO_ACTION},
client_input_2xx: {client_state_accepted, tx.act_passup},
client_input_300_plus: {client_state_accepted, fsm.NO_ACTION},
client_input_cancel: {client_state_accepted, fsm.NO_ACTION},
client_input_canceled: {client_state_accepted, fsm.NO_ACTION},
client_input_transport_err: {client_state_accepted, func() fsm.Input {
tx.act_trans_err()
return fsm.NO_INPUT
}},
client_input_timer_a: {client_state_accepted, fsm.NO_ACTION},
client_input_timer_b: {client_state_accepted, fsm.NO_ACTION},
client_input_timer_m: {client_state_terminated, tx.act_delete},
},
}
// Terminated
client_state_def_terminated := fsm.State{
Index: client_state_terminated,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_terminated, fsm.NO_ACTION},
client_input_2xx: {client_state_terminated, fsm.NO_ACTION},
client_input_300_plus: {client_state_terminated, fsm.NO_ACTION},
client_input_cancel: {client_state_terminated, fsm.NO_ACTION},
client_input_canceled: {client_state_terminated, fsm.NO_ACTION},
client_input_timer_a: {client_state_terminated, fsm.NO_ACTION},
client_input_timer_b: {client_state_terminated, fsm.NO_ACTION},
client_input_timer_d: {client_state_terminated, fsm.NO_ACTION},
client_input_timer_m: {client_state_terminated, fsm.NO_ACTION},
client_input_delete: {client_state_terminated, tx.act_delete},
client_input_transport_err: {client_state_terminated, fsm.NO_ACTION},
},
}
fsm_, err := fsm.Define(
client_state_def_calling,
client_state_def_proceeding,
client_state_def_completed,
client_state_def_accepted,
client_state_def_terminated,
)
if err != nil {
tx.Log().Errorf("define INVITE transaction FSM failed: %s", err)
return
}
tx.fsmMu.Lock()
tx.fsm = fsm_
tx.fsmMu.Unlock()
}
func (tx *clientTx) initNonInviteFSM() {
tx.Log().Debug("initialising non-INVITE transaction FSM")
// Define States
// "Trying"
client_state_def_calling := fsm.State{
Index: client_state_calling,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_proceeding, tx.act_passup},
client_input_2xx: {client_state_completed, tx.act_non_invite_final},
client_input_300_plus: {client_state_completed, tx.act_non_invite_final},
client_input_timer_a: {client_state_calling, tx.act_non_invite_resend},
client_input_timer_b: {client_state_terminated, tx.act_timeout},
client_input_transport_err: {client_state_terminated, tx.act_trans_err},
client_input_cancel: {client_state_calling, tx.act_nonInviteCancel},
},
}
// Proceeding
client_state_def_proceeding := fsm.State{
Index: client_state_proceeding,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_proceeding, tx.act_passup},
client_input_2xx: {client_state_completed, tx.act_non_invite_final},
client_input_300_plus: {client_state_completed, tx.act_non_invite_final},
client_input_timer_a: {client_state_proceeding, tx.act_non_invite_resend},
client_input_timer_b: {client_state_terminated, tx.act_timeout},
client_input_transport_err: {client_state_terminated, tx.act_trans_err},
client_input_cancel: {client_state_proceeding, fsm.NO_ACTION},
},
}
// Completed
client_state_def_completed := fsm.State{
Index: client_state_completed,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_completed, fsm.NO_ACTION},
client_input_2xx: {client_state_completed, fsm.NO_ACTION},
client_input_300_plus: {client_state_completed, fsm.NO_ACTION},
client_input_timer_a: {client_state_completed, fsm.NO_ACTION},
client_input_timer_b: {client_state_completed, fsm.NO_ACTION},
client_input_timer_d: {client_state_terminated, tx.act_delete},
client_input_cancel: {client_state_completed, fsm.NO_ACTION},
},
}
// Terminated
client_state_def_terminated := fsm.State{
Index: client_state_terminated,
Outcomes: map[fsm.Input]fsm.Outcome{
client_input_1xx: {client_state_terminated, fsm.NO_ACTION},
client_input_2xx: {client_state_terminated, fsm.NO_ACTION},
client_input_300_plus: {client_state_terminated, fsm.NO_ACTION},
client_input_timer_a: {client_state_terminated, fsm.NO_ACTION},
client_input_timer_b: {client_state_terminated, fsm.NO_ACTION},
client_input_timer_d: {client_state_terminated, fsm.NO_ACTION},
client_input_delete: {client_state_terminated, tx.act_delete},
client_input_cancel: {client_state_terminated, fsm.NO_ACTION},
},
}
fsm_, err := fsm.Define(
client_state_def_calling,
client_state_def_proceeding,
client_state_def_completed,
client_state_def_terminated,
)
if err != nil {
tx.Log().Errorf("define non-INVITE transaction FSM failed: %s", err)
return
}
tx.fsmMu.Lock()
tx.fsm = fsm_
tx.fsmMu.Unlock()
}
func (tx *clientTx) resend() {
select {
case <-tx.done:
return
default:
}
tx.Log().Debug("resend origin request")
err := tx.tpl.Send(tx.Origin())
tx.mu.Lock()
tx.lastErr = err
tx.mu.Unlock()
if err != nil {
go func() {
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_transport_err); err != nil {
tx.Log().Errorf("spin FSM to client_input_transport_err failed: %s", err)
}
tx.fsmMu.RUnlock()
}()
}
}
func (tx *clientTx) passUp() {
tx.mu.RLock()
lastResp := tx.lastResp
tx.mu.RUnlock()
if lastResp != nil {
select {
case <-tx.done:
case tx.responses <- lastResp:
}
}
}
func (tx *clientTx) transportErr() {
// todo bloody patch
defer func() { recover() }()
tx.mu.RLock()
err := tx.lastErr
tx.mu.RUnlock()
err = &TxTransportError{
fmt.Errorf("transaction failed to send %s: %w", tx.origin.Short(), err),
tx.Key(),
fmt.Sprintf("%p", tx),
}
select {
case <-tx.done:
case tx.errs <- err:
}
}
func (tx *clientTx) timeoutErr() {
// todo bloody patch
defer func() { recover() }()
err := &TxTimeoutError{
fmt.Errorf("transaction timed out"),
tx.Key(),
fmt.Sprintf("%p", tx),
}
select {
case <-tx.done:
case tx.errs <- err:
}
}
func (tx *clientTx) cancelErr() {
// todo bloody patch
defer func() { recover() }()
err := &TxTimeoutError{
fmt.Errorf("transaction cancel"),
tx.Key(),
fmt.Sprintf("%p", tx),
}
select {
case <-tx.done:
case tx.errs <- err:
}
}
func (tx *clientTx) delete() {
select {
case <-tx.done:
return
default:
}
// todo bloody patch
defer func() { recover() }()
tx.closeOnce.Do(func() {
tx.mu.Lock()
close(tx.done)
close(tx.responses)
close(tx.errs)
tx.mu.Unlock()
tx.Log().Debug("transaction done")
})
time.Sleep(time.Microsecond)
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
if tx.timer_b != nil {
tx.timer_b.Stop()
tx.timer_b = nil
}
if tx.timer_d != nil {
tx.timer_d.Stop()
tx.timer_d = nil
}
tx.mu.Unlock()
}
// Define actions
func (tx *clientTx) act_invite_resend() fsm.Input {
tx.Log().Debug("act_invite_resend")
tx.mu.Lock()
tx.timer_a_time *= 2
tx.timer_a.Reset(tx.timer_a_time)
tx.mu.Unlock()
tx.resend()
return fsm.NO_INPUT
}
func (tx *clientTx) act_invite_canceled() fsm.Input {
tx.Log().Debug("act_invite_canceled")
// nothing to do here for now
return fsm.NO_INPUT
}
func (tx *clientTx) act_non_invite_resend() fsm.Input {
tx.Log().Debug("act_non_invite_resend")
tx.mu.Lock()
tx.timer_a_time *= 2
// For non-INVITE, cap timer A at T2 seconds.
if tx.timer_a_time > T2 {
tx.timer_a_time = T2
}
tx.timer_a.Reset(tx.timer_a_time)
tx.mu.Unlock()
tx.resend()
return fsm.NO_INPUT
}
func (tx *clientTx) act_passup() fsm.Input {
tx.Log().Debug("act_passup")
tx.passUp()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
tx.mu.Unlock()
return fsm.NO_INPUT
}
func (tx *clientTx) act_invite_proceeding() fsm.Input {
tx.Log().Debug("act_invite_proceeding")
tx.passUp()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
if tx.timer_b != nil {
tx.timer_b.Stop()
tx.timer_b = nil
}
tx.mu.Unlock()
return fsm.NO_INPUT
}
func (tx *clientTx) act_invite_final() fsm.Input {
tx.Log().Debug("act_invite_final")
tx.ack()
tx.passUp()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
if tx.timer_b != nil {
tx.timer_b.Stop()
tx.timer_b = nil
}
tx.Log().Tracef("timer_d set to %v", tx.timer_d_time)
tx.timer_d = timing.AfterFunc(tx.timer_d_time, func() {
select {
case <-tx.done:
return
default:
}
tx.Log().Trace("timer_d fired")
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_timer_d); err != nil {
tx.Log().Errorf("spin FSM to client_input_timer_d failed: %s", err)
}
tx.fsmMu.RUnlock()
})
tx.mu.Unlock()
return fsm.NO_INPUT
}
func (tx *clientTx) act_non_invite_final() fsm.Input {
tx.Log().Debug("act_non_invite_final")
tx.passUp()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
if tx.timer_b != nil {
tx.timer_b.Stop()
tx.timer_b = nil
}
tx.Log().Tracef("timer_d set to %v", tx.timer_d_time)
tx.timer_d = timing.AfterFunc(tx.timer_d_time, func() {
select {
case <-tx.done:
return
default:
}
tx.Log().Trace("timer_d fired")
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_timer_d); err != nil {
tx.Log().Errorf("spin FSM to client_input_timer_d failed: %s", err)
}
tx.fsmMu.RUnlock()
})
tx.mu.Unlock()
return fsm.NO_INPUT
}
func (tx *clientTx) act_cancel() fsm.Input {
tx.Log().Debug("act_cancel")
tx.cancel()
return fsm.NO_INPUT
}
func (tx *clientTx) act_cancel_timeout() fsm.Input {
tx.Log().Debug("act_cancel")
tx.cancel()
tx.Log().Tracef("timer_b set to %v", Timer_B)
tx.mu.Lock()
if tx.timer_b != nil {
tx.timer_b.Stop()
}
tx.timer_b = timing.AfterFunc(Timer_B, func() {
select {
case <-tx.done:
return
default:
}
tx.Log().Trace("timer_b fired")
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_timer_b); err != nil {
tx.Log().Errorf("spin FSM to client_input_timer_b failed: %s", err)
}
tx.fsmMu.RUnlock()
})
tx.mu.Unlock()
return fsm.NO_INPUT
}
func (tx *clientTx) act_ack() fsm.Input {
tx.Log().Debug("act_ack")
tx.ack()
return fsm.NO_INPUT
}
func (tx *clientTx) act_trans_err() fsm.Input {
tx.Log().Debug("act_trans_err")
tx.transportErr()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
tx.mu.Unlock()
return client_input_delete
}
func (tx *clientTx) act_timeout() fsm.Input {
tx.Log().Debug("act_timeout")
tx.timeoutErr()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
tx.mu.Unlock()
return client_input_delete
}
func (tx *clientTx) act_nonInviteCancel() fsm.Input {
tx.Log().Debug("act_nonInviteCancel")
tx.cancelErr()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
tx.mu.Unlock()
return client_input_delete
}
func (tx *clientTx) act_passup_delete() fsm.Input {
tx.Log().Debug("act_passup_delete")
tx.passUp()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
tx.mu.Unlock()
return client_input_delete
}
func (tx *clientTx) act_passup_accept() fsm.Input {
tx.Log().Debug("act_passup_accept")
tx.passUp()
tx.ack()
tx.mu.Lock()
if tx.timer_a != nil {
tx.timer_a.Stop()
tx.timer_a = nil
}
if tx.timer_b != nil {
tx.timer_b.Stop()
tx.timer_b = nil
}
tx.Log().Tracef("timer_m set to %v", Timer_M)
tx.timer_m = timing.AfterFunc(Timer_M, func() {
select {
case <-tx.done:
return
default:
}
tx.Log().Trace("timer_m fired")
tx.fsmMu.RLock()
if err := tx.fsm.Spin(client_input_timer_m); err != nil {
tx.Log().Errorf("spin FSM to client_input_timer_m failed: %s", err)
}
tx.fsmMu.RUnlock()
})
tx.mu.Unlock()
return fsm.NO_INPUT
}
func (tx *clientTx) act_delete() fsm.Input {
tx.Log().Debug("act_delete")
tx.delete()
return fsm.NO_INPUT
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/zhjun2512/gosip.git
git@gitee.com:zhjun2512/gosip.git
zhjun2512
gosip
gosip
v0.0.5

Search

344bd9b3 5694891 D2dac590 5694891