From f8d5d12185bc6a8d413ad6aea2caf5b745fdd437 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 18 Dec 2024 13:30:08 +0800 Subject: [PATCH 1/3] update latest and fix fatal error: concurrent map iteration and map write --- export.go | 9 +- examples/simple/main.go => export_test.go | 125 ++++--- src/v2/manager.go | 428 ++++++++++++++++++++++ src/v2/process.go | 72 ++++ 4 files changed, 580 insertions(+), 54 deletions(-) rename examples/simple/main.go => export_test.go (36%) create mode 100644 src/v2/manager.go create mode 100644 src/v2/process.go diff --git a/export.go b/export.go index 3d055bc..6c37451 100644 --- a/export.go +++ b/export.go @@ -16,12 +16,15 @@ // Package process provides a process manager. package process -import "gitee.com/go-libs/process/src" +import ( + "gitee.com/go-libs/process/src" + "gitee.com/go-libs/process/src/v2" +) type ( // Process is an interface used to manage provider that runtime like os // process. - Process = src.Process + Process = v2.Process // Provider is an executor that run int the process. Provider = src.Provider @@ -29,5 +32,5 @@ type ( var ( // New creates a new process. - New = src.New + New = v2.New ) diff --git a/examples/simple/main.go b/export_test.go similarity index 36% rename from examples/simple/main.go rename to export_test.go index bb1ad3e..39038a4 100644 --- a/examples/simple/main.go +++ b/export_test.go @@ -11,36 +11,46 @@ // limitations under the License. // // Author: wsfuyibing <682805@qq.com> -// Date: 2024-07-10 +// Date: 2024-12-17 -package main +package process import ( "context" "fmt" - "gitee.com/go-libs/process" "gitee.com/go-libs/runtime" + "testing" "time" ) -type my struct { +type tp struct { +} + +func (o *tp) Run(ctx context.Context) (err error) { + for { + select { + case <-ctx.Done(): + return + } + } +} + +type t0 struct { name string } -func (o *my) After(_ context.Context) (err error) { - println(o.name, " -> hook.After") +func (o *t0) After(_ context.Context) (err error) { + println(o.name, ".After") return } -func (o *my) Before(_ context.Context) (err error) { - println(o.name, " -> hook.Before") +func (o *t0) Before(_ context.Context) (err error) { + println(o.name, ".Before") return } -func (o *my) Run(ctx context.Context) (err error) { - println(o.name, " -> hook.Run") - counter() - +func (o *t0) Run(ctx context.Context) (err error) { + println(o.name, ".Run") for { select { case <-ctx.Done(): @@ -49,56 +59,69 @@ func (o *my) Run(ctx context.Context) (err error) { } } -func counter() { - c := runtime.GetCounter().ProcessCounter() - fmt.Printf("counter: created=%d, starts=%d, stops=%d, running=%d, fatal=%d\n", - c.GetCreatedCount(), - c.GetStartedCount(), - c.GetStoppedCount(), - c.GetRunningCount(), - c.GetFatalCount(), - ) +func TestNew(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + proc := New("tester", &t0{name: "tester"}) + + go func() { + time.Sleep(time.Second) + cancel() + proc.Stop() + }() + + t.Logf(`proc startup`) + + if err := proc.Start(ctx); err != nil { + t.Logf(`start error: %v`, err) + return + } + + t.Logf(`proc shutdown`) } -func main() { - var ( - ctx = context.Background() - err error - proc = process.New("main", &my{name: "main"}) - ) +func TestNewWithChild(t *testing.T) { + c := context.Background() + d := make(chan bool) + p := New("tester", &tp{}) - // proc.Add( - // process.New("ch 1", &my{name: "ch 1"}).Prevent(true), - // process.New("ch 2", &my{name: "ch 2"}).Prevent(true), - // ) + go func() { + t.Logf(`startup`) + p.Start(c) + t.Logf(`shutdown`) + }() go func() { - for i := 0; i < 3; i++ { - time.Sleep(time.Second * 1) - println("add child...") + time.Sleep(time.Second * 3) + d <- true + }() - if err != nil { - return - } + var last Process - cn := fmt.Sprintf(`ch-%d`, i) - ch := process.New(cn, &my{name: cn}) // .Prevent(true) - proc.Add(ch.Prevent(true)) // .Restart() + time.Sleep(time.Millisecond * 3) + + for i := 0; i < 1000; i++ { + if last != nil { + last.Stop() } - time.Sleep(time.Second * 3) - proc.Restart() + name := fmt.Sprintf(`test-%d`, i) + curr := New(name, &tp{}) + p.Add(curr) - time.Sleep(time.Second * 3) - proc.Stop() + last = curr - }() - - println("startup...") + // p.Del(fmt.Sprintf(`test-%d`, i-1)) + // p.Add(New(fmt.Sprintf(`test-%d`, i), &tp{})) + } - if err = proc.Start(ctx); err != nil { - println("start error:", err.Error()) + for { + select { + case <-d: + p.StopWait() + x := runtime.GetCounter().ProcessCounter() + t.Logf(`counter: created=%d, started=%d, stopped=%d`, x.GetCreatedCount(), x.GetStartedCount(), x.GetStoppedCount()) + return + } } - println("shutdown...") - counter() + } diff --git a/src/v2/manager.go b/src/v2/manager.go new file mode 100644 index 0000000..5711f9c --- /dev/null +++ b/src/v2/manager.go @@ -0,0 +1,428 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-12-17 + +package v2 + +import ( + "context" + "gitee.com/go-libs/process/src" + "gitee.com/go-libs/runtime" + "github.com/google/uuid" + "sync" + "sync/atomic" + "time" +) + +var nilTime = time.Unix(0, 0) + +// This is a process lifecycle manager. +type manager struct { + mu *sync.RWMutex + name, uid string + provider src.Provider + + cancel context.CancelFunc + ctx context.Context + childMapper map[string]Process + childRunning int32 + parent Process + prevented bool + + restarting bool + started, starting, stopped, stopping bool + startTime time.Time +} + +// New creates a new process instance. +func New(name string, provider src.Provider) Process { + return (&manager{ + name: name, uid: uuid.NewString(), + provider: provider, + }).init() +} + +// + Implements. + +func (o *manager) Add(children ...Process) Process { return o.onAdd(children) } +func (o *manager) Del(children ...Process) Process { return o.onDel(children) } +func (o *manager) GetChild(name string) (Process, bool) { return o.getChild(name) } +func (o *manager) GetName() string { return o.name } +func (o *manager) GetUid() string { return o.uid } +func (o *manager) Prevent(yes bool) Process { return o.setPrevent(yes) } +func (o *manager) Restart() { o.onRestart() } +func (o *manager) Start(ctx context.Context) error { return o.onStart(ctx) } +func (o *manager) Started() bool { return o.IsStarted() } +func (o *manager) StartedTime() time.Time { return o.getStartedTime() } +func (o *manager) Starting() bool { return o.IsStarting() } +func (o *manager) Stop() { o.onStop() } +func (o *manager) Stopped() bool { return o.IsStopped() } +func (o *manager) Stopping() bool { return o.IsStopping() } +func (o *manager) StopWait() { o.onWait() } + +// + Access methods. + +func (o *manager) IsStarted() bool { + o.mu.RLock() + defer o.mu.RUnlock() + return o.started +} + +func (o *manager) IsStarting() (yes bool) { + o.mu.RLock() + defer o.mu.RUnlock() + return o.starting +} + +func (o *manager) IsStopped() (yes bool) { + o.mu.RLock() + defer o.mu.RUnlock() + return o.stopped +} + +func (o *manager) IsStopping() (yes bool) { + o.mu.RLock() + defer o.mu.RUnlock() + return o.stopping +} + +// + Getter methods. + +func (o *manager) getChild(name string) (child Process, exists bool) { + o.mu.RLock() + defer o.mu.RUnlock() + child, exists = o.childMapper[name] + return +} + +func (o *manager) getStartedTime() (tm time.Time) { + o.mu.RLock() + defer o.mu.RUnlock() + return o.startTime +} + +// + Event methods. + +func (o *manager) onAdd(children []Process) *manager { + var ( + adds = 0 + ctx context.Context + ) + + // Iterate children and compare with mapping. + o.mu.Lock() + for _, child := range children { + // Skip nil process. + if child == nil { + continue + } + + // Send stop signal to the process with the same name. It's reference + // in mapper will be replaced. + if p, ok := o.childMapper[child.GetName()]; ok { + p.Stop() + } + + // Bind parent to the child then update mapper. + child.(*manager).parent = o + o.childMapper[child.GetName()] = child + + // Add child to mapping. + adds++ + } + + // Read context to start children. + ctx = o.ctx + o.mu.Unlock() + + // Start child process if current process started. + if adds > 0 && ctx != nil && ctx.Err() == nil { + o.onChildrenStart(ctx) + } + return o +} + +func (o *manager) onChildrenStart(ctx context.Context) { + o.mu.RLock() + defer o.mu.RUnlock() + + // Iterate child processes in mapper. Start process in a goroutine if not + // started or stopped. + for _, c := range o.childMapper { + if c.Stopped() { + atomic.AddInt32(&o.childRunning, 1) + go func(child Process) { + defer atomic.AddInt32(&o.childRunning, -1) + _ = child.Start(ctx) + }(c) + } + } +} + +func (o *manager) onChildrenStopped() { + for { + if atomic.LoadInt32(&o.childRunning) == 0 { + return + } + time.Sleep(time.Millisecond) + } +} + +func (o *manager) onDel(children []Process) *manager { + o.mu.Lock() + defer o.mu.Unlock() + + // Iterate children and compare with mapping. + for _, child := range children { + // Skip nil process. + if child == nil { + continue + } + + name := child.GetName() + + // Read process from mapped with name. Only deal with the same uuid. Skip + // to delete if not same. + if p, ok := o.childMapper[name]; ok && p.GetUid() == child.GetUid() { + // Force to stop child process. + child.Stop() + + // Remove from mapping. + delete(o.childMapper, child.GetName()) + } + } + return o +} + +func (o *manager) onHandle(ctx context.Context, hook func(context.Context) error) (err error) { + // Invoked when handler ended. + defer func() { + // Recover and catch runtime panic. + if r := recover(); r != nil { + runtime.GetCounter().ProcessCounter().IncrFatalCount() + + // Set error reason if not specified by handler. + if err == nil { + err = src.ErrHookPanic + } + + // Invoke panic. + o.onPanic(ctx, r) + } + }() + + // Invoke handler. + err = hook(ctx) + return +} + +func (o *manager) onPanic(ctx context.Context, v any) { + // Discard runtime panic. + defer func() { + if r := recover(); r != nil { + runtime.GetCounter().ProcessCounter().IncrFatalCount() + } + }() + + // Invoke handler. + if hook, ok := o.provider.(src.ProviderPanic); ok { + hook.Panic(ctx, v) + } + return +} + +func (o *manager) onRestart() { + o.mu.Lock() + defer o.mu.Unlock() + + // Send signal if the process is running. + if ctx := o.ctx; ctx != nil && ctx.Err() == nil { + o.restarting, o.stopping = true, false + o.cancel() + } +} + +func (o *manager) onStart(ctx context.Context) (err error) { + o.mu.Lock() + + // Return error if the context is nil. + if ctx == nil { + o.mu.Unlock() + err = src.ErrNilContext + return + } + + // Return error if the process provider is nil. + if o.provider == nil { + o.mu.Unlock() + err = src.ErrNilProvider + return + } + + // Return error if middle state check failed. It means that the process not allow + // to call Start() method before stopped. + if o.started || o.starting || o.stopping { + o.mu.Unlock() + return + } + + // Update state to begin startup. + // + // - restarting : [-] + // - started : [-] + // - starting : [+] + // - stopped : [-] + // - stopping : [-] + o.startTime = time.Now() + o.starting, o.stopped = true, false + o.mu.Unlock() + runtime.GetCounter().ProcessCounter().Add(o.uid, o.name, o.startTime) + + // Clean state when the process stopped. + defer func() { + // Lock process state. + o.mu.Lock() + + // Notifier parent process to remove from mapper. + if parent, prevented := o.parent, o.prevented; parent != nil && !prevented { + o.mu.Unlock() + parent.Del(o) + + // Redo lock state. + o.mu.Lock() + } + + // Revert state to initialized. + o.setInitialState() + o.mu.Unlock() + runtime.GetCounter().ProcessCounter().Del(o.uid) + }() + + // Loop until the process stopped. + for { + o.mu.Lock() + + // Return by parent context error. + if ctx.Err() != nil { + o.starting, o.stopping = false, true + o.mu.Unlock() + return + } + + // Return by Stop() method called. + if o.stopping { + o.starting = false + o.mu.Unlock() + return + } + + // Create process context and update state. + o.ctx, o.cancel = context.WithCancel(ctx) + o.started, o.starting = true, false + + // Invoke before hook, return if error occurred. + if hook, ok := o.provider.(src.ProviderBefore); ok { + if err = o.onHandle(o.ctx, hook.Before); err != nil { + o.started, o.stopping = false, true + o.mu.Unlock() + return + } + } + + // Release lock state. + o.mu.Unlock() + + // Start child processes in goroutines. + o.onChildrenStart(o.ctx) + + // Invoke main hook. + err = o.onHandle(o.ctx, o.provider.Run) + + // Cancel the process context to notifier child process to stop. + if o.ctx.Err() == nil { + o.cancel() + } + + // Wait all child process stopped. + o.onChildrenStopped() + + // Invoke after hook of the provider if no error occurred on run hook. + if err == nil { + if hook, ok := o.provider.(src.ProviderAfter); ok { + err = o.onHandle(o.ctx, hook.After) + } + } + + // Update process state when main hook done. + // TODO: it's override restart signal. + o.mu.Lock() + o.ctx, o.cancel = nil, nil + if o.restarting { + o.restarting = false + o.starting = true + } else { + o.stopping = true + } + o.mu.Unlock() + } +} + +func (o *manager) onStop() { + o.mu.Lock() + defer o.mu.Unlock() + + // Send signal if the process is running. + if ctx := o.ctx; ctx != nil && ctx.Err() == nil { + o.restarting, o.stopping = false, true + o.cancel() + } +} + +func (o *manager) onWait() { + // Force to send stop signal. + o.onStop() + + // Return if this process and child processes are stopped. + for { + if o.IsStopped() { + return + } + time.Sleep(time.Millisecond) + } +} + +// + Setter methods. + +func (o *manager) setPrevent(prevented bool) *manager { + o.mu.Lock() + defer o.mu.Unlock() + o.prevented = prevented + return o +} + +func (o *manager) setInitialState() { + o.startTime = nilTime + o.started, o.starting, o.stopping, o.restarting = false, false, false, false + o.stopped = true +} + +// + Constructor + +func (o *manager) init() *manager { + o.childMapper = make(map[string]Process) + o.mu = new(sync.RWMutex) + o.setInitialState() + runtime.GetCounter().ProcessCounter().IncrCreatedCount() + return o +} diff --git a/src/v2/process.go b/src/v2/process.go new file mode 100644 index 0000000..679c930 --- /dev/null +++ b/src/v2/process.go @@ -0,0 +1,72 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-12-18 + +package v2 + +import ( + "context" + "time" +) + +// Process is an interface for managing the lifecycle of virtual processes. It's +// work like a process of os. +type Process interface { + // Add means adding multiple child processes to the current process. + Add(children ...Process) Process + + // Del means deleting multiple child processes from the current + // process. + Del(children ...Process) Process + + // GetChild returns a child process with given name if exists other nil + // returned. + GetChild(name string) (p Process, exists bool) + + // GetName return the process name. + GetName() string + + // GetUid returns the process unique id. + GetUid() string + + // Prevent to remove from parent when current process stopped. + Prevent(prevent bool) Process + + // Restart send a restart signal to current process. + Restart() + + // Start process with bio until stopped. + Start(ctx context.Context) (err error) + + // Started return true if current process is started. + Started() (yes bool) + + // StartedTime return time for Start() method called. + StartedTime() time.Time + + // Starting return true if current process is starting. + Starting() (yes bool) + + // Stop send a stop signal to current process. + Stop() + + // Stopped return true if current process is stopped. + Stopped() bool + + // Stopping return true if current process is stopping. + Stopping() bool + + // StopWait block process until stopped. + StopWait() +} -- Gitee From 22c7521f980a922f3e6b46ba90f85969da01099d Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 18 Dec 2024 13:30:34 +0800 Subject: [PATCH 2/3] update latest and fix fatal error: concurrent map iteration and map write --- src/v2/manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/v2/manager.go b/src/v2/manager.go index 5711f9c..8980508 100644 --- a/src/v2/manager.go +++ b/src/v2/manager.go @@ -365,7 +365,6 @@ func (o *manager) onStart(ctx context.Context) (err error) { } // Update process state when main hook done. - // TODO: it's override restart signal. o.mu.Lock() o.ctx, o.cancel = nil, nil if o.restarting { -- Gitee From 27e648acfbd7534019491ca33c434fd9b037303d Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 18 Dec 2024 13:30:46 +0800 Subject: [PATCH 3/3] update latest and fix fatal error: concurrent map iteration and map write --- export_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/export_test.go b/export_test.go index 39038a4..8e438f7 100644 --- a/export_test.go +++ b/export_test.go @@ -86,7 +86,7 @@ func TestNewWithChild(t *testing.T) { go func() { t.Logf(`startup`) - p.Start(c) + _ = p.Start(c) t.Logf(`shutdown`) }() -- Gitee