base-golang/pkg/ants/pool.go
2024-07-31 16:49:14 +08:00

109 lines
1.8 KiB
Go

package ants
import (
"errors"
"fmt"
"gitea.bvbej.com/bvbej/base-golang/pkg/ticker"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"runtime/debug"
"time"
)
var _ GoroutinePool = (*goroutinePool)(nil)
type GoroutinePool interface {
run()
Submit(task func())
Stop()
Size() int
Running() int
Free() int
}
type goroutinePool struct {
pool *ants.Pool
logger *zap.Logger
ticker ticker.Ticker
step int
}
type poolLogger struct {
zap *zap.Logger
}
func (l *poolLogger) Printf(format string, args ...any) {
l.zap.Sugar().Infof(format, args)
}
func NewPool(zapLogger *zap.Logger, step int) (GoroutinePool, error) {
ttl := time.Minute * 5
options := ants.Options{
Nonblocking: true,
ExpiryDuration: ttl,
PanicHandler: func(err any) {
zapLogger.Sugar().Error(
"GoroutinePool panic",
zap.String("error", fmt.Sprintf("%+v", err)),
zap.String("stack", string(debug.Stack())),
)
},
Logger: &poolLogger{zap: zapLogger},
}
antsPool, err := ants.NewPool(step, ants.WithOptions(options))
if err != nil {
return nil, err
}
pool := &goroutinePool{
pool: antsPool,
logger: zapLogger,
ticker: ticker.New(ttl),
step: step,
}
pool.run()
return pool, nil
}
func (p *goroutinePool) run() {
p.ticker.Process(func() {
if p.Free() > p.step {
mul := p.Free() / p.step
p.pool.Tune(p.Size() - p.step*mul)
}
})
}
func (p *goroutinePool) Submit(task func()) {
if p.pool.IsClosed() {
return
}
err := p.pool.Submit(task)
if errors.Is(err, ants.ErrPoolOverload) {
p.pool.Tune(p.Size() + p.step)
p.Submit(task)
}
}
func (p *goroutinePool) Size() int {
return p.pool.Cap()
}
func (p *goroutinePool) Running() int {
return p.pool.Running()
}
func (p *goroutinePool) Free() int {
return p.pool.Free()
}
func (p *goroutinePool) Stop() {
p.ticker.Stop()
p.pool.Release()
}