265 lines
5.5 KiB
Go
265 lines
5.5 KiB
Go
package downloader
|
|
|
|
import (
|
|
"errors"
|
|
"git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
|
|
"git.bvbej.com/bvbej/base-golang/pkg/downloader/controller"
|
|
"git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
|
|
"git.bvbej.com/bvbej/base-golang/pkg/downloader/protocol/http"
|
|
"git.bvbej.com/bvbej/base-golang/pkg/downloader/util"
|
|
"github.com/google/uuid"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Listener func(event *Event)
|
|
|
|
type TaskInfo struct {
|
|
ID string
|
|
Res *base.Resource
|
|
Opts *base.Options
|
|
Status base.Status
|
|
Progress *Progress
|
|
|
|
fetcher fetcher.Fetcher
|
|
timer *util.Timer
|
|
locker *sync.Mutex
|
|
}
|
|
|
|
type Progress struct {
|
|
// 下载耗时(纳秒)
|
|
Used int64
|
|
// 每秒下载字节数
|
|
Speed int64
|
|
// 已下载的字节数
|
|
Downloaded int64
|
|
}
|
|
|
|
type downloader struct {
|
|
*controller.DefaultController
|
|
fetchBuilders map[string]func() fetcher.Fetcher
|
|
task *TaskInfo
|
|
listener Listener
|
|
finished bool
|
|
finishedCh chan error
|
|
}
|
|
|
|
func newDownloader(f func() (protocols []string, builder func() fetcher.Fetcher), options ...controller.Option) *downloader {
|
|
d := &downloader{
|
|
DefaultController: controller.NewController(options...),
|
|
finishedCh: make(chan error, 1),
|
|
}
|
|
|
|
d.fetchBuilders = make(map[string]func() fetcher.Fetcher)
|
|
protocols, builder := f()
|
|
for _, p := range protocols {
|
|
d.fetchBuilders[strings.ToUpper(p)] = builder
|
|
}
|
|
|
|
return d
|
|
}
|
|
|
|
func (d *downloader) buildFetcher(URL string) (fetcher.Fetcher, error) {
|
|
parseURL, err := url.Parse(URL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if fetchBuilder, ok := d.fetchBuilders[strings.ToUpper(parseURL.Scheme)]; ok {
|
|
fetched := fetchBuilder()
|
|
fetched.Setup(d.DefaultController)
|
|
return fetched, nil
|
|
}
|
|
return nil, errors.New("unsupported protocol")
|
|
}
|
|
|
|
func (d *downloader) Resolve(req *base.Request) (*base.Resource, error) {
|
|
fetched, err := d.buildFetcher(req.URL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return fetched.Resolve(req)
|
|
}
|
|
|
|
func (d *downloader) Create(res *base.Resource, opts *base.Options) (err error) {
|
|
fetched, err := d.buildFetcher(res.Req.URL)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !res.Range || opts.Connections < 1 {
|
|
opts.Connections = 1
|
|
}
|
|
err = fetched.Create(res, opts)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
task := &TaskInfo{
|
|
ID: uuid.New().String(),
|
|
Res: res,
|
|
Opts: opts,
|
|
Status: base.DownloadStatusStart,
|
|
Progress: &Progress{},
|
|
fetcher: fetched,
|
|
timer: &util.Timer{},
|
|
locker: new(sync.Mutex),
|
|
}
|
|
d.task = task
|
|
task.timer.Start()
|
|
d.emit(EventKeyStart)
|
|
err = fetched.Start()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
err = fetched.Wait()
|
|
if err != nil {
|
|
d.emit(EventKeyError, err)
|
|
task.Status = base.DownloadStatusError
|
|
} else {
|
|
task.Progress.Used = task.timer.Used()
|
|
if task.Res.TotalSize == 0 {
|
|
task.Res.TotalSize = task.fetcher.Progress().TotalDownloaded()
|
|
}
|
|
used := task.Progress.Used / int64(time.Second)
|
|
if used == 0 {
|
|
used = 1
|
|
}
|
|
task.Progress.Speed = task.Res.TotalSize / used
|
|
task.Progress.Downloaded = task.Res.TotalSize
|
|
d.emit(EventKeyDone)
|
|
task.Status = base.DownloadStatusDone
|
|
}
|
|
d.finished = true
|
|
d.emit(EventKeyFinally, err)
|
|
d.finishedCh <- err
|
|
}()
|
|
|
|
// 每秒统计一次下载速度
|
|
go func() {
|
|
for !d.finished {
|
|
if d.task.Status == base.DownloadStatusPause {
|
|
continue
|
|
}
|
|
|
|
current := d.task.fetcher.Progress().TotalDownloaded()
|
|
d.task.Progress.Used = d.task.timer.Used()
|
|
d.task.Progress.Speed = current - d.task.Progress.Downloaded
|
|
d.task.Progress.Downloaded = current
|
|
d.emit(EventKeyProgress)
|
|
|
|
time.Sleep(time.Second)
|
|
}
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
func (d *downloader) Pause() error {
|
|
d.task.locker.Lock()
|
|
defer d.task.locker.Unlock()
|
|
d.task.timer.Pause()
|
|
err := d.task.fetcher.Pause()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.emit(EventKeyPause)
|
|
d.task.Status = base.DownloadStatusPause
|
|
return nil
|
|
}
|
|
|
|
func (d *downloader) Continue() error {
|
|
d.task.locker.Lock()
|
|
defer d.task.locker.Unlock()
|
|
d.task.timer.Continue()
|
|
err := d.task.fetcher.Continue()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.emit(EventKeyContinue)
|
|
d.task.Status = base.DownloadStatusStart
|
|
return nil
|
|
}
|
|
|
|
func (d *downloader) Listener(fn Listener) {
|
|
d.listener = fn
|
|
}
|
|
|
|
func (d *downloader) emit(eventKey EventKey, errs ...error) {
|
|
if d.listener != nil {
|
|
var err error
|
|
if len(errs) > 0 {
|
|
err = errs[0]
|
|
}
|
|
d.listener(&Event{
|
|
Key: eventKey,
|
|
Task: d.task,
|
|
Err: err,
|
|
})
|
|
}
|
|
}
|
|
|
|
var _ Boot = (*boot)(nil)
|
|
|
|
type Boot interface {
|
|
URL(url string) Boot
|
|
Extra(extra any) Boot
|
|
Listener(listener Listener) Boot
|
|
Create(opts *base.Options) <-chan error
|
|
}
|
|
|
|
type boot struct {
|
|
url string
|
|
extra any
|
|
listener Listener
|
|
downloader *downloader
|
|
}
|
|
|
|
func (b *boot) resolve() (*base.Resource, error) {
|
|
return b.downloader.Resolve(&base.Request{
|
|
URL: b.url,
|
|
Extra: b.extra,
|
|
})
|
|
}
|
|
|
|
func (b *boot) URL(url string) Boot {
|
|
b.url = url
|
|
return b
|
|
}
|
|
|
|
func (b *boot) Extra(extra any) Boot {
|
|
b.extra = extra
|
|
return b
|
|
}
|
|
|
|
func (b *boot) Listener(listener Listener) Boot {
|
|
b.listener = listener
|
|
return b
|
|
}
|
|
|
|
func (b *boot) Create(opts *base.Options) <-chan error {
|
|
res, err := b.resolve()
|
|
if err != nil {
|
|
b.downloader.finishedCh <- err
|
|
return b.downloader.finishedCh
|
|
}
|
|
b.downloader.Listener(b.listener)
|
|
|
|
err = b.downloader.Create(res, opts)
|
|
if err != nil {
|
|
b.downloader.finishedCh <- err
|
|
return b.downloader.finishedCh
|
|
}
|
|
|
|
return b.downloader.finishedCh
|
|
}
|
|
|
|
// New 一个文件对应一个实例
|
|
func New(options ...controller.Option) Boot {
|
|
return &boot{
|
|
downloader: newDownloader(http.FetcherBuilder, options...),
|
|
}
|
|
}
|