1 Star 0 Fork 0

ixgo/ixUtils

Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
Clone or Download
concurrent.go 8.55 KB
Copy Edit Raw Blame History
lixu authored 2025-09-19 17:12 +08:00 . Initial commit
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
package ixUtils
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
)
// ErrTimeout 超时错误
var ErrTimeout = errors.New("operation timed out")
// ErrCancelled 取消错误
var ErrCancelled = errors.New("operation cancelled")
// WaitGroup 增强版的 WaitGroup,支持超时和取消
type WaitGroup struct {
wg sync.WaitGroup
timeout time.Duration
ctx context.Context
}
// NewWaitGroup 创建新的 WaitGroup
func NewWaitGroup(ctx context.Context, timeout time.Duration) *WaitGroup {
if ctx == nil {
ctx = context.Background()
}
return &WaitGroup{
ctx: ctx,
timeout: timeout,
}
}
// Add 添加等待计数
func (wg *WaitGroup) Add(delta int) {
if wg == nil {
return
}
wg.wg.Add(delta)
}
// Done 完成一个任务
func (wg *WaitGroup) Done() {
if wg == nil {
return
}
wg.wg.Done()
}
// Wait 等待所有任务完成,支持超时和取消
func (wg *WaitGroup) Wait() error {
if wg == nil {
return errors.New("wait group is nil")
}
done := make(chan struct{})
go func() {
wg.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(wg.timeout):
return ErrTimeout
case <-wg.ctx.Done():
return ErrCancelled
}
}
// Pool 协程池
type Pool struct {
workers int
tasks chan func()
wg sync.WaitGroup
closed bool
mu sync.RWMutex
}
// NewPool 创建新的协程池
func NewPool(workers int) *Pool {
if workers <= 0 {
workers = 1
}
return &Pool{
workers: workers,
tasks: make(chan func(), workers*2), // 增加缓冲区大小
closed: false,
}
}
// Start 启动协程池
func (p *Pool) Start() {
if p == nil {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return
}
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.tasks {
if task != nil {
func() {
defer func() {
if r := recover(); r != nil {
// 记录panic但不让程序崩溃
}
}()
task()
}()
}
}
}()
}
}
// Submit 提交任务到协程池
func (p *Pool) Submit(task func()) error {
if p == nil {
return errors.New("pool is nil")
}
if task == nil {
return errors.New("task is nil")
}
p.mu.RLock()
defer p.mu.RUnlock()
if p.closed {
return errors.New("pool is closed")
}
select {
case p.tasks <- task:
return nil
default:
return errors.New("pool is full")
}
}
// Stop 停止协程池
func (p *Pool) Stop() {
if p == nil {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return
}
p.closed = true
close(p.tasks)
p.wg.Wait()
}
// AtomicBool 原子布尔值
type AtomicBool struct {
value uint32
}
// NewAtomicBool 创建新的原子布尔值
func NewAtomicBool(initial bool) *AtomicBool {
var value uint32
if initial {
value = 1
}
return &AtomicBool{value: value}
}
// Set 设置值
func (b *AtomicBool) Set(value bool) {
if b == nil {
return
}
var i uint32
if value {
i = 1
}
atomic.StoreUint32(&b.value, i)
}
// Get 获取值
func (b *AtomicBool) Get() bool {
if b == nil {
return false
}
return atomic.LoadUint32(&b.value) != 0
}
// AtomicInt64 原子整数
type AtomicInt64 struct {
value int64
}
// NewAtomicInt64 创建新的原子整数
func NewAtomicInt64(initial int64) *AtomicInt64 {
return &AtomicInt64{value: initial}
}
// Set 设置值
func (i *AtomicInt64) Set(value int64) {
if i == nil {
return
}
atomic.StoreInt64(&i.value, value)
}
// Get 获取值
func (i *AtomicInt64) Get() int64 {
if i == nil {
return 0
}
return atomic.LoadInt64(&i.value)
}
// Add 增加值
func (i *AtomicInt64) Add(delta int64) int64 {
if i == nil {
return 0
}
return atomic.AddInt64(&i.value, delta)
}
// Once 增强版的 sync.Once,支持重置
type Once struct {
done uint32
mu sync.Mutex
}
// Do 执行函数,确保只执行一次
func (o *Once) Do(f func()) {
if o == nil || f == nil {
return
}
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.mu.Lock()
defer o.mu.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
func() {
defer func() {
if r := recover(); r != nil {
// 记录panic但不让程序崩溃
}
}()
f()
}()
}
}
// Reset 重置 Once,允许再次执行
func (o *Once) Reset() {
if o == nil {
return
}
o.mu.Lock()
defer o.mu.Unlock()
atomic.StoreUint32(&o.done, 0)
}
// Mutex 增强版的 sync.Mutex,支持超时
type Mutex struct {
mu sync.Mutex
timeout time.Duration
}
// NewMutex 创建新的互斥锁
func NewMutex(timeout time.Duration) *Mutex {
return &Mutex{
timeout: timeout,
}
}
// Lock 加锁,支持超时
func (m *Mutex) Lock() error {
if m == nil {
return errors.New("mutex is nil")
}
done := make(chan struct{})
go func() {
m.mu.Lock()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(m.timeout):
return ErrTimeout
}
}
// Unlock 解锁
func (m *Mutex) Unlock() {
if m == nil {
return
}
m.mu.Unlock()
}
// RWMutex 增强版的 sync.RWMutex,支持超时
type RWMutex struct {
mu sync.RWMutex
timeout time.Duration
}
// NewRWMutex 创建新的读写锁
func NewRWMutex(timeout time.Duration) *RWMutex {
return &RWMutex{
timeout: timeout,
}
}
// Lock 写锁,支持超时
func (m *RWMutex) Lock() error {
if m == nil {
return errors.New("rwmutex is nil")
}
done := make(chan struct{})
go func() {
m.mu.Lock()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(m.timeout):
return ErrTimeout
}
}
// Unlock 写解锁
func (m *RWMutex) Unlock() {
if m == nil {
return
}
m.mu.Unlock()
}
// RLock 读锁,支持超时
func (m *RWMutex) RLock() error {
if m == nil {
return errors.New("rwmutex is nil")
}
done := make(chan struct{})
go func() {
m.mu.RLock()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(m.timeout):
return ErrTimeout
}
}
// RUnlock 读解锁
func (m *RWMutex) RUnlock() {
if m == nil {
return
}
m.mu.RUnlock()
}
// Semaphore 信号量
type Semaphore struct {
ch chan struct{}
}
// NewSemaphore 创建新的信号量
func NewSemaphore(size int) *Semaphore {
if size <= 0 {
size = 1
}
return &Semaphore{
ch: make(chan struct{}, size),
}
}
// Acquire 获取信号量
func (s *Semaphore) Acquire() error {
if s == nil {
return errors.New("semaphore is nil")
}
s.ch <- struct{}{}
return nil
}
// Release 释放信号量
func (s *Semaphore) Release() error {
if s == nil {
return errors.New("semaphore is nil")
}
select {
case <-s.ch:
return nil
default:
return errors.New("no semaphore to release")
}
}
// TryAcquire 尝试获取信号量,支持超时
func (s *Semaphore) TryAcquire(timeout time.Duration) error {
if s == nil {
return errors.New("semaphore is nil")
}
select {
case s.ch <- struct{}{}:
return nil
case <-time.After(timeout):
return ErrTimeout
}
}
// Count 获取当前信号量计数
func (s *Semaphore) Count() int {
if s == nil {
return 0
}
return len(s.ch)
}
// Size 获取信号量大小
func (s *Semaphore) Size() int {
if s == nil {
return 0
}
return cap(s.ch)
}
// Barrier 屏障
type Barrier struct {
count int
parties int
mu sync.Mutex
cond *sync.Cond
phase int
broken bool
timeout time.Duration
}
// NewBarrier 创建新的屏障
func NewBarrier(parties int, timeout time.Duration) *Barrier {
if parties <= 0 {
parties = 1
}
b := &Barrier{
parties: parties,
timeout: timeout,
broken: false,
count: 0,
phase: 0,
}
b.cond = sync.NewCond(&b.mu)
return b
}
// Await 等待所有线程到达屏障
func (b *Barrier) Await() (int, error) {
if b == nil {
return -1, errors.New("barrier is nil")
}
b.mu.Lock()
defer b.mu.Unlock()
if b.broken {
return -1, errors.New("barrier is broken")
}
phase := b.phase
b.count++
if b.count == b.parties {
b.count = 0
b.phase++
b.cond.Broadcast()
return phase, nil
}
done := make(chan struct{})
go func() {
b.cond.Wait()
close(done)
}()
select {
case <-done:
return phase, nil
case <-time.After(b.timeout):
b.broken = true
b.cond.Broadcast()
return -1, ErrTimeout
}
}
// Reset 重置屏障
func (b *Barrier) Reset() {
if b == nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
b.count = 0
b.phase = 0
b.broken = false
b.cond.Broadcast()
}
// IsBroken 检查屏障是否已损坏
func (b *Barrier) IsBroken() bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.broken
}
// GetNumberWaiting 获取当前等待的线程数
func (b *Barrier) GetNumberWaiting() int {
b.mu.Lock()
defer b.mu.Unlock()
return b.count
}
// GetParties 获取屏障的参与线程数
func (b *Barrier) GetParties() int {
return b.parties
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ixgo/utils.git
git@gitee.com:ixgo/utils.git
ixgo
utils
ixUtils
v1.0.3

Search