109 lines
1.8 KiB
Go
109 lines
1.8 KiB
Go
package ants
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"gitea.bvbej.com/bvbej/base-golang/pkg/ticker"
|
|
"github.com/panjf2000/ants/v2"
|
|
"go.uber.org/zap"
|
|
"runtime/debug"
|
|
"time"
|
|
)
|
|
|
|
var _ GoroutinePool = (*goroutinePool)(nil)
|
|
|
|
type GoroutinePool interface {
|
|
run()
|
|
|
|
Submit(task func())
|
|
Stop()
|
|
|
|
Size() int
|
|
Running() int
|
|
Free() int
|
|
}
|
|
|
|
type goroutinePool struct {
|
|
pool *ants.Pool
|
|
logger *zap.Logger
|
|
ticker ticker.Ticker
|
|
step int
|
|
}
|
|
|
|
type poolLogger struct {
|
|
zap *zap.Logger
|
|
}
|
|
|
|
func (l *poolLogger) Printf(format string, args ...any) {
|
|
l.zap.Sugar().Infof(format, args)
|
|
}
|
|
|
|
func NewPool(zapLogger *zap.Logger, step int) (GoroutinePool, error) {
|
|
ttl := time.Minute * 5
|
|
|
|
options := ants.Options{
|
|
Nonblocking: true,
|
|
ExpiryDuration: ttl,
|
|
PanicHandler: func(err any) {
|
|
zapLogger.Sugar().Error(
|
|
"GoroutinePool panic",
|
|
zap.String("error", fmt.Sprintf("%+v", err)),
|
|
zap.String("stack", string(debug.Stack())),
|
|
)
|
|
},
|
|
Logger: &poolLogger{zap: zapLogger},
|
|
}
|
|
|
|
antsPool, err := ants.NewPool(step, ants.WithOptions(options))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pool := &goroutinePool{
|
|
pool: antsPool,
|
|
logger: zapLogger,
|
|
ticker: ticker.New(ttl),
|
|
step: step,
|
|
}
|
|
pool.run()
|
|
|
|
return pool, nil
|
|
}
|
|
|
|
func (p *goroutinePool) run() {
|
|
p.ticker.Process(func() {
|
|
if p.Free() > p.step {
|
|
mul := p.Free() / p.step
|
|
p.pool.Tune(p.Size() - p.step*mul)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (p *goroutinePool) Submit(task func()) {
|
|
if p.pool.IsClosed() {
|
|
return
|
|
}
|
|
err := p.pool.Submit(task)
|
|
if errors.Is(err, ants.ErrPoolOverload) {
|
|
p.pool.Tune(p.Size() + p.step)
|
|
p.Submit(task)
|
|
}
|
|
}
|
|
|
|
func (p *goroutinePool) Size() int {
|
|
return p.pool.Cap()
|
|
}
|
|
|
|
func (p *goroutinePool) Running() int {
|
|
return p.pool.Running()
|
|
}
|
|
|
|
func (p *goroutinePool) Free() int {
|
|
return p.pool.Free()
|
|
}
|
|
|
|
func (p *goroutinePool) Stop() {
|
|
p.ticker.Stop()
|
|
p.pool.Release()
|
|
}
|