diff --git a/src/process.go b/src/process.go index 9e9931746801913c64ee4559e384bc9cb85c1dc0..c0ae7ce2330916424368884b815148e150d4ee38 100644 --- a/src/process.go +++ b/src/process.go @@ -20,6 +20,7 @@ import ( "gitee.com/go-libs/runtime" "github.com/google/uuid" "sync" + "sync/atomic" "time" ) @@ -75,23 +76,24 @@ type ( provider Provider parent Process - children map[string]Process - mu *sync.RWMutex + childMapper map[string]Process + childRunning int32 + mu *sync.Mutex - cancel context.CancelFunc - ctx context.Context - retryCount int32 - started, quit, prevented bool - startedTime time.Time + cancel context.CancelFunc + ctx context.Context + restarting, stopping bool + started, quited, prevented bool + startedTime time.Time } ) // New creates a new process instance. func New(name string, provider Provider) Process { return (&process{ - children: make(map[string]Process), - mu: new(sync.RWMutex), - name: name, uid: uuid.NewString(), + childMapper: make(map[string]Process), + mu: new(sync.Mutex), + name: name, uid: uuid.NewString(), provider: provider, }).init() } @@ -120,7 +122,7 @@ func (o *process) Add(children ...Process) Process { // Update child process mapping. o.mu.Lock() - o.children[child.GetName()] = child + o.childMapper[child.GetName()] = child o.mu.Unlock() } @@ -165,7 +167,7 @@ func (o *process) Del(children ...Process) Process { // Remove from children. o.mu.Lock() - delete(o.children, child.GetName()) + delete(o.childMapper, child.GetName()) o.mu.Unlock() } } @@ -174,14 +176,14 @@ func (o *process) Del(children ...Process) Process { func (o *process) GetChild(name string) (p Process, exists bool) { o.mu.Lock() - p, exists = o.children[name] + p, exists = o.childMapper[name] o.mu.Unlock() return } func (o *process) GetChildren() (children map[string]Process) { o.mu.Lock() - children = o.children + children = o.childMapper o.mu.Unlock() return } @@ -206,7 +208,7 @@ func (o *process) Restart() { defer o.mu.Unlock() if o.ctx != nil && o.ctx.Err() == nil { - o.quit = false + o.quited, o.restarting = false, true o.cancel() } } @@ -227,6 +229,12 @@ func (o *process) Start(ctx context.Context) (err error) { // Lock process state. o.mu.Lock() + // Return if the process state in starting or stopping. + if o.stopping || o.restarting { + o.mu.Unlock() + return + } + // Return if started already. if o.started { o.mu.Unlock() @@ -235,7 +243,7 @@ func (o *process) Start(ctx context.Context) (err error) { } // Update process state to avoid concurrency operations. - o.started, o.quit = true, false + o.started, o.quited, o.restarting, o.stopping = true, false, false, false o.startedTime = time.Now() o.mu.Unlock() runtime.GetCounter().ProcessCounter().Add(o.uid, o.name, o.startedTime) @@ -247,14 +255,18 @@ func (o *process) Start(ctx context.Context) (err error) { // Revert process state. o.mu.Lock() parent, prevented := o.parent, o.prevented - o.ctx, o.cancel, o.parent = nil, nil, nil - o.started, o.quit = false, true o.mu.Unlock() // Notify the parent process to delete the current process. if parent != nil && !prevented { parent.Del(o) } + + // Revert process state as initial. + o.mu.Lock() + o.ctx, o.cancel, o.parent = nil, nil, nil + o.started, o.quited = false, true + o.mu.Unlock() }() // Block until current or parent process stopped. @@ -265,14 +277,14 @@ func (o *process) Start(ctx context.Context) (err error) { } // Return for stop signal. - if o.quit { + if o.quited { return } // Build process context. o.mu.Lock() o.ctx, o.cancel = context.WithCancel(ctx) - o.quit = true + o.quited = true o.mu.Unlock() // Invoke before hook of the provider, Return if error returned. @@ -318,7 +330,7 @@ func (o *process) Stop() { defer o.mu.Unlock() if o.ctx != nil && o.ctx.Err() == nil { - o.quit = true + o.quited, o.stopping = true, true o.cancel() } } @@ -388,23 +400,33 @@ func (o *process) init() *process { } func (o *process) startChildren(ctx context.Context) { - for _, c := range o.GetChildren() { + o.mu.Lock() + defer o.mu.Unlock() + + // Iterate all children. + for _, c := range o.childMapper { // Skip started process. if c.Started() { continue } // Start child process in a goroutine. - go func(child Process) { _ = child.Start(ctx) }(c) + atomic.AddInt32(&o.childRunning, 1) + go func(child Process) { + defer atomic.AddInt32(&o.childRunning, -1) + _ = child.Start(ctx) + }(c) } } func (o *process) waitChildrenStopped() { - for _, child := range o.GetChildren() { - if child.Started() { - child.Stop() - time.Sleep(time.Millisecond * 3) - continue + for { + // All child processes are stopped. + if atomic.LoadInt32(&o.childRunning) == 0 { + break } + + // Sleep for a while and recheck. + time.Sleep(time.Millisecond * 10) } }