base-golang/pkg/downloader/downloader.go
2024-07-23 10:23:43 +08:00

265 lines
5.5 KiB
Go

package downloader
import (
"errors"
"git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
"git.bvbej.com/bvbej/base-golang/pkg/downloader/controller"
"git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
"git.bvbej.com/bvbej/base-golang/pkg/downloader/protocol/http"
"git.bvbej.com/bvbej/base-golang/pkg/downloader/util"
"github.com/google/uuid"
"net/url"
"strings"
"sync"
"time"
)
type Listener func(event *Event)
type TaskInfo struct {
ID string
Res *base.Resource
Opts *base.Options
Status base.Status
Progress *Progress
fetcher fetcher.Fetcher
timer *util.Timer
locker *sync.Mutex
}
type Progress struct {
// 下载耗时(纳秒)
Used int64
// 每秒下载字节数
Speed int64
// 已下载的字节数
Downloaded int64
}
type downloader struct {
*controller.DefaultController
fetchBuilders map[string]func() fetcher.Fetcher
task *TaskInfo
listener Listener
finished bool
finishedCh chan error
}
func newDownloader(f func() (protocols []string, builder func() fetcher.Fetcher), options ...controller.Option) *downloader {
d := &downloader{
DefaultController: controller.NewController(options...),
finishedCh: make(chan error, 1),
}
d.fetchBuilders = make(map[string]func() fetcher.Fetcher)
protocols, builder := f()
for _, p := range protocols {
d.fetchBuilders[strings.ToUpper(p)] = builder
}
return d
}
func (d *downloader) buildFetcher(URL string) (fetcher.Fetcher, error) {
parseURL, err := url.Parse(URL)
if err != nil {
return nil, err
}
if fetchBuilder, ok := d.fetchBuilders[strings.ToUpper(parseURL.Scheme)]; ok {
fetched := fetchBuilder()
fetched.Setup(d.DefaultController)
return fetched, nil
}
return nil, errors.New("unsupported protocol")
}
func (d *downloader) Resolve(req *base.Request) (*base.Resource, error) {
fetched, err := d.buildFetcher(req.URL)
if err != nil {
return nil, err
}
return fetched.Resolve(req)
}
func (d *downloader) Create(res *base.Resource, opts *base.Options) (err error) {
fetched, err := d.buildFetcher(res.Req.URL)
if err != nil {
return
}
if !res.Range || opts.Connections < 1 {
opts.Connections = 1
}
err = fetched.Create(res, opts)
if err != nil {
return
}
task := &TaskInfo{
ID: uuid.New().String(),
Res: res,
Opts: opts,
Status: base.DownloadStatusStart,
Progress: &Progress{},
fetcher: fetched,
timer: &util.Timer{},
locker: new(sync.Mutex),
}
d.task = task
task.timer.Start()
d.emit(EventKeyStart)
err = fetched.Start()
if err != nil {
return
}
go func() {
err = fetched.Wait()
if err != nil {
d.emit(EventKeyError, err)
task.Status = base.DownloadStatusError
} else {
task.Progress.Used = task.timer.Used()
if task.Res.TotalSize == 0 {
task.Res.TotalSize = task.fetcher.Progress().TotalDownloaded()
}
used := task.Progress.Used / int64(time.Second)
if used == 0 {
used = 1
}
task.Progress.Speed = task.Res.TotalSize / used
task.Progress.Downloaded = task.Res.TotalSize
d.emit(EventKeyDone)
task.Status = base.DownloadStatusDone
}
d.finished = true
d.emit(EventKeyFinally, err)
d.finishedCh <- err
}()
// 每秒统计一次下载速度
go func() {
for !d.finished {
if d.task.Status == base.DownloadStatusPause {
continue
}
current := d.task.fetcher.Progress().TotalDownloaded()
d.task.Progress.Used = d.task.timer.Used()
d.task.Progress.Speed = current - d.task.Progress.Downloaded
d.task.Progress.Downloaded = current
d.emit(EventKeyProgress)
time.Sleep(time.Second)
}
}()
return
}
func (d *downloader) Pause() error {
d.task.locker.Lock()
defer d.task.locker.Unlock()
d.task.timer.Pause()
err := d.task.fetcher.Pause()
if err != nil {
return err
}
d.emit(EventKeyPause)
d.task.Status = base.DownloadStatusPause
return nil
}
func (d *downloader) Continue() error {
d.task.locker.Lock()
defer d.task.locker.Unlock()
d.task.timer.Continue()
err := d.task.fetcher.Continue()
if err != nil {
return err
}
d.emit(EventKeyContinue)
d.task.Status = base.DownloadStatusStart
return nil
}
func (d *downloader) Listener(fn Listener) {
d.listener = fn
}
func (d *downloader) emit(eventKey EventKey, errs ...error) {
if d.listener != nil {
var err error
if len(errs) > 0 {
err = errs[0]
}
d.listener(&Event{
Key: eventKey,
Task: d.task,
Err: err,
})
}
}
var _ Boot = (*boot)(nil)
type Boot interface {
URL(url string) Boot
Extra(extra any) Boot
Listener(listener Listener) Boot
Create(opts *base.Options) <-chan error
}
type boot struct {
url string
extra any
listener Listener
downloader *downloader
}
func (b *boot) resolve() (*base.Resource, error) {
return b.downloader.Resolve(&base.Request{
URL: b.url,
Extra: b.extra,
})
}
func (b *boot) URL(url string) Boot {
b.url = url
return b
}
func (b *boot) Extra(extra any) Boot {
b.extra = extra
return b
}
func (b *boot) Listener(listener Listener) Boot {
b.listener = listener
return b
}
func (b *boot) Create(opts *base.Options) <-chan error {
res, err := b.resolve()
if err != nil {
b.downloader.finishedCh <- err
return b.downloader.finishedCh
}
b.downloader.Listener(b.listener)
err = b.downloader.Create(res, opts)
if err != nil {
b.downloader.finishedCh <- err
return b.downloader.finishedCh
}
return b.downloader.finishedCh
}
// New 一个文件对应一个实例
func New(options ...controller.Option) Boot {
return &boot{
downloader: newDownloader(http.FetcherBuilder, options...),
}
}