From 17727b109e5c9e38ff7733a3e55326889473ebe1 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 10 Jul 2024 13:43:06 +0800 Subject: [PATCH 1/2] init --- doc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc.go b/doc.go index 2eb732f..2323d2b 100644 --- a/doc.go +++ b/doc.go @@ -9,6 +9,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-10 // Package process // provides a process manager. -- Gitee From c07f45a9df5459ad8debd6f52db2032f6429fd41 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Sat, 13 Jul 2024 14:56:18 +0800 Subject: [PATCH 2/2] init --- examples/simple/main.go | 76 ++++++++ go.mod | 2 + go.sum | 2 + init.go | 30 +++ src/error.go | 23 +++ src/process.go | 413 ++++++++++++++++++++++++++++++++++++++++ src/provider.go | 40 ++++ 7 files changed, 586 insertions(+) create mode 100644 examples/simple/main.go create mode 100644 go.sum create mode 100644 init.go create mode 100644 src/error.go create mode 100644 src/process.go create mode 100644 src/provider.go diff --git a/examples/simple/main.go b/examples/simple/main.go new file mode 100644 index 0000000..496e48c --- /dev/null +++ b/examples/simple/main.go @@ -0,0 +1,76 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-10 + +package main + +import ( + "context" + "fmt" + "gitee.com/go-libs/process" + "time" +) + +type my struct { + name string +} + +func (o *my) After(ctx context.Context) (err error) { println(o.name, ":", "done"); return nil } + +func (o *my) Run(ctx context.Context) (err error) { + println(o.name, ":", "run") + for { + select { + case <-ctx.Done(): + return + } + } +} + +func main() { + var ( + ctx = context.Background() + err error + proc = process.New("main", &my{name: "main"}) + ) + + // proc.Add( + // process.New("ch 1", &my{name: "ch 1"}).Prevent(true), + // process.New("ch 2", &my{name: "ch 2"}).Prevent(true), + // ) + + go func() { + for i := 0; i < 3; i++ { + time.Sleep(time.Second * 3) + + if err != nil { + return + } + + println("----") + + cn := fmt.Sprintf(`ch-%d`, i) + ch := process.New(cn, &my{name: cn}).Prevent(true) + proc.Add(ch) // .Restart() + } + + time.Sleep(time.Second * 3) + println("----") + proc.Stop() + }() + + if err = proc.Start(ctx); err != nil { + println("start error:", err.Error()) + } +} diff --git a/go.mod b/go.mod index 5865d6a..2d7372c 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module gitee.com/go-libs/process go 1.18 + +require github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7790d7c --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/init.go b/init.go new file mode 100644 index 0000000..c2e46be --- /dev/null +++ b/init.go @@ -0,0 +1,30 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-10 + +package process + +import "gitee.com/go-libs/process/src" + +type ( + // Process + // is an interface for executor that work like os process. + Process = src.Process + + // Provider + // is an interface for executor of the process. + Provider = src.Provider +) + +var New = src.New diff --git a/src/error.go b/src/error.go new file mode 100644 index 0000000..aa31fbf --- /dev/null +++ b/src/error.go @@ -0,0 +1,23 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-10 + +package src + +import "fmt" + +var ( + ErrNilContext = fmt.Errorf(`context is nil`) + ErrStartedAlready = fmt.Errorf(`started already`) +) diff --git a/src/process.go b/src/process.go new file mode 100644 index 0000000..0038631 --- /dev/null +++ b/src/process.go @@ -0,0 +1,413 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-10 + +package src + +import ( + "context" + "fmt" + "github.com/google/uuid" + "sync" + "time" +) + +type ( + // Process + // is an interface for executor that work like os process. + Process interface { + // Add + // child processes to current process. + // + // .Add(p1, p2, p3) + Add(children ...Process) Process + + // Del + // delete child processes from current process. + // + // .Del(p1, p2, p3) + Del(children ...Process) Process + + // GetChild + // returns child process with given name. + GetChild(name string) (p Process, exists bool) + + // GetChildren + // returns child processes mapping. + GetChildren() (children map[string]Process) + + // GetName + // returns process name. + GetName() string + + // GetUid + // returns process uid. + GetUid() string + + // Prevent + // to remove from parent when current process stopped. + Prevent(prevent bool) Process + + // Restart + // send a restart signal to current process. + Restart() + + // Start + // process with bio until stopped. + Start(ctx context.Context) (err error) + + // Started + // return true if current process is started or stopping. + Started() (yes bool) + + // Stop + // send a stop signal to current process. + Stop() + + // Stopped + // return true if current process is stopped or not started. + Stopped() bool + } + + process struct { + mu *sync.RWMutex + name, uid string + provider Provider + + cancel context.CancelFunc + ctx context.Context + children map[string]Process + parent Process + started, quited, prevent bool + } +) + +// New +// creates new process instance. +func New(name string, provider Provider) Process { + return &process{ + mu: new(sync.RWMutex), + name: name, uid: uuid.NewString(), + provider: provider, + children: make(map[string]Process), + } +} + +// Add +// child processes to current process. +func (o *process) Add(children ...Process) Process { + count := 0 + + // Iterate + // child processes then add to current process. + for _, child := range children { + if child == nil { + continue + } + + // Stop + // last process with the same name. + func() { + o.mu.RLock() + defer o.mu.RUnlock() + + if p, ok := o.children[child.GetName()]; ok { + p.Stop() + } + }() + + // Bind + // parent process on current process. + count++ + child.(*process).parent = o + + // Add + // child to current process. + o.mu.Lock() + o.children[child.GetName()] = child + o.mu.Unlock() + } + + // Recall + // do child if add in started process. + if o.Started() && count > 0 { + o.doChild() + } + return o +} + +// Del +// delete child processes from current process. +func (o *process) Del(children ...Process) Process { + o.mu.Lock() + defer o.mu.Unlock() + + // Iterate + // child processes then remove from current process. + for _, child := range children { + if child == nil { + continue + } + + // Delete + // child from current process with same name and unique id are equals. + if p, ok := o.children[child.GetName()]; ok && child.GetUid() == p.GetUid() { + delete(o.children, child.GetName()) + } + } + return o +} + +// GetChild +// returns child process with given name. +func (o *process) GetChild(name string) (p Process, exists bool) { + o.mu.RLock() + p, exists = o.children[name] + o.mu.RUnlock() + return +} + +// GetChildren +// returns child processes mapping. +func (o *process) GetChildren() (children map[string]Process) { + o.mu.RLock() + children = o.children + o.mu.RUnlock() + return +} + +// GetName +// returns process name. +func (o *process) GetName() string { + return o.name +} + +// GetUid +// returns process unique id. +func (o *process) GetUid() string { + return o.uid +} + +// Prevent +// to remove from parent when current process stopped. +func (o *process) Prevent(prevent bool) Process { + o.mu.Lock() + defer o.mu.Unlock() + o.prevent = prevent + return o +} + +// Restart +// send restart signal to current process. +func (o *process) Restart() { + o.mu.Lock() + defer o.mu.Unlock() + + if o.ctx != nil && o.ctx.Err() == nil { + o.quited = false + o.cancel() + } +} + +// Start +// process with bio until stopped. +func (o *process) Start(ctx context.Context) (err error) { + o.mu.Lock() + + // Return + // error if parent context is nil. + if ctx == nil { + o.mu.Unlock() + err = ErrNilContext + return + } + + // Return + // error if started already. + if o.started { + o.mu.Unlock() + err = ErrStartedAlready + return + } + + // Set + // process states. + o.quited = false + o.started = true + o.mu.Unlock() + + // Called + // when process end. + defer func() { + // Cancel + // current process context if not done. + func() { + o.mu.RLock() + defer o.mu.RUnlock() + if o.ctx != nil && o.ctx.Err() == nil { + o.cancel() + } + }() + + // Unset process states. + o.mu.Lock() + o.ctx, o.cancel = nil, nil + o.started = false + o.mu.Unlock() + + // Remove + // from parent if not prevented. + if !o.prevent && o.parent != nil { + o.parent.Del(o) + } + }() + + // Start + // process providers. + for { + // Return + // by parent context cancelled. + if ctx.Err() != nil { + return + } + + // Return + // by stop signal. + if o.quited { + return + } + + // Reset + // current process context. + o.mu.Lock() + o.ctx, o.cancel = context.WithCancel(ctx) + o.mu.Unlock() + + // Do before + // callback in provider. Continue next loop if error returned. + if v, ok := o.provider.(ProviderBefore); ok { + if o.doCallback(v.Before) != nil { + continue + } + } + + // Call + // children starter. + o.doChild() + + // Call runner + // callback in provider. + if o.doCallback(o.provider.Run) != nil { + } + + // Wait + // all children stopped. + o.doClean() + + // Do after + // callback in provider. + if v, ok := o.provider.(ProviderAfter); ok { + if o.doCallback(v.After) != nil { + } + } + } +} + +// Started +// return true if current process is started or stopping. +func (o *process) Started() (yes bool) { + o.mu.RLock() + yes = o.started + o.mu.RUnlock() + return +} + +// Stop +// send a stop signal to current process. +func (o *process) Stop() { + o.mu.Lock() + defer o.mu.Unlock() + + if o.ctx != nil && o.ctx.Err() == nil { + o.quited = true + o.cancel() + } +} + +// Stopped +// return true if current process is stopped or not started. +func (o *process) Stopped() (yes bool) { + return !o.Started() +} + +// +---------------------------------------------------------------------------+ +// | Access methods | +// +---------------------------------------------------------------------------+ + +// DoCallback +// call provider callback. +func (o *process) doCallback(callback func(ctx context.Context) error) (err error) { + // Catch + // runtime panic into error. + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf(`%v`, r) + } + }() + + // Return + // runtime error. + err = callback(o.ctx) + return +} + +// DoChild +// start child processes in goroutines. +func (o *process) doChild() { + o.mu.RLock() + children := o.children + o.mu.RUnlock() + + // Iterate + // child process. + for _, v := range children { + if v.Started() { + continue + } + + // Start + // child process in goroutine. + go func(ctx context.Context, child Process) { + if err := child.Start(ctx); err != nil { + } + }(o.ctx, v) + } +} + +// DoClean +// wait all child processes stopped. +func (o *process) doClean() { + if func() (started bool) { + for _, child := range o.children { + if child.Started() { + started = true + break + } + } + return + }() { + time.Sleep(time.Millisecond * 10) + o.doClean() + } +} diff --git a/src/provider.go b/src/provider.go new file mode 100644 index 0000000..ff0c9e3 --- /dev/null +++ b/src/provider.go @@ -0,0 +1,40 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-10 + +package src + +import "context" + +type ( + // Provider + // is an interface for executor of the process. + Provider interface { + // Run + // is a callable function of the provider. + Run(ctx context.Context) (err error) + } + + ProviderAfter interface { + // After + // is an optional callable function of the provider. + After(ctx context.Context) (err error) + } + + ProviderBefore interface { + // Before + // is an optional callable function of the provider. + Before(ctx context.Context) (err error) + } +) -- Gitee