downloader #3
23
pkg/downloader/base/constants.go
Normal file
23
pkg/downloader/base/constants.go
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
package base
|
||||||
|
|
||||||
|
type Status int
|
||||||
|
|
||||||
|
const (
|
||||||
|
DownloadStatusReady Status = iota
|
||||||
|
DownloadStatusStart
|
||||||
|
DownloadStatusPause
|
||||||
|
DownloadStatusError
|
||||||
|
DownloadStatusDone
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
HttpCodeOK = 200
|
||||||
|
HttpCodePartialContent = 206
|
||||||
|
|
||||||
|
HttpHeaderRange = "Range"
|
||||||
|
HttpHeaderContentLength = "Content-Length"
|
||||||
|
HttpHeaderContentRange = "Content-Range"
|
||||||
|
HttpHeaderContentDisposition = "Content-Disposition"
|
||||||
|
|
||||||
|
HttpHeaderRangeFormat = "bytes=%d-%d"
|
||||||
|
)
|
37
pkg/downloader/base/model.go
Normal file
37
pkg/downloader/base/model.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package base
|
||||||
|
|
||||||
|
// Request 下载请求
|
||||||
|
type Request struct {
|
||||||
|
// 下载链接
|
||||||
|
URL string
|
||||||
|
// 附加信息
|
||||||
|
Extra any
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resource 资源信息
|
||||||
|
type Resource struct {
|
||||||
|
Req *Request
|
||||||
|
// 资源总大小
|
||||||
|
TotalSize int64
|
||||||
|
// 是否支持断点下载
|
||||||
|
Range bool
|
||||||
|
// 资源所包含的文件列表
|
||||||
|
Files []*FileInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// FileInfo 文件信息
|
||||||
|
type FileInfo struct {
|
||||||
|
Name string
|
||||||
|
Path string
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Options 下载选项
|
||||||
|
type Options struct {
|
||||||
|
// 保存文件名
|
||||||
|
Name string
|
||||||
|
// 保存目录
|
||||||
|
Path string
|
||||||
|
// 并发连接数
|
||||||
|
Connections int
|
||||||
|
}
|
172
pkg/downloader/controller/controller.go
Normal file
172
pkg/downloader/controller/controller.go
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/net/proxy"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Controller interface {
|
||||||
|
Touch(name string, size int64) (file *os.File, err error)
|
||||||
|
Open(name string) (file *os.File, err error)
|
||||||
|
Write(name string, offset int64, buf []byte) (int, error)
|
||||||
|
Close(name string) error
|
||||||
|
ContextDialer() (proxy.Dialer, error)
|
||||||
|
ContextCookie() http.CookieJar
|
||||||
|
ContextTimeout() time.Duration
|
||||||
|
ContextProxy() func(*http.Request) (*url.URL, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Option func(*option)
|
||||||
|
|
||||||
|
type option struct {
|
||||||
|
CookieJar http.CookieJar
|
||||||
|
Timeout time.Duration
|
||||||
|
Dialer proxy.Dialer
|
||||||
|
Proxy func(*http.Request) (*url.URL, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCookie(cookieJar http.CookieJar) Option {
|
||||||
|
return func(opt *option) {
|
||||||
|
opt.CookieJar = cookieJar
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithTimeout(timeout time.Duration) Option {
|
||||||
|
return func(opt *option) {
|
||||||
|
opt.Timeout = timeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDialer(dialer proxy.Dialer) Option {
|
||||||
|
return func(opt *option) {
|
||||||
|
opt.Dialer = dialer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithProxy(fn func(*http.Request) (*url.URL, error)) Option {
|
||||||
|
return func(opt *option) {
|
||||||
|
opt.Proxy = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type DefaultController struct {
|
||||||
|
*option
|
||||||
|
Files map[string]*os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewController(options ...Option) *DefaultController {
|
||||||
|
opt := new(option)
|
||||||
|
for _, f := range options {
|
||||||
|
f(opt)
|
||||||
|
}
|
||||||
|
if opt.Timeout == 0 {
|
||||||
|
opt.Timeout = time.Second * 30
|
||||||
|
}
|
||||||
|
if opt.Dialer == nil {
|
||||||
|
opt.Dialer = proxy.FromEnvironment()
|
||||||
|
}
|
||||||
|
return &DefaultController{
|
||||||
|
Files: make(map[string]*os.File),
|
||||||
|
option: opt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) Touch(name string, size int64) (file *os.File, err error) {
|
||||||
|
file, err = os.Create(name)
|
||||||
|
if size > 0 {
|
||||||
|
err = os.Truncate(name, size)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
c.Files[name] = file
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) Open(name string) (file *os.File, err error) {
|
||||||
|
file, err = os.OpenFile(name, os.O_RDWR, os.ModePerm)
|
||||||
|
if err == nil {
|
||||||
|
c.Files[name] = file
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) Write(name string, offset int64, buf []byte) (int, error) {
|
||||||
|
return c.Files[name].WriteAt(buf, offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) Close(name string) error {
|
||||||
|
err := c.Files[name].Close()
|
||||||
|
delete(c.Files, name)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) ContextDialer() (proxy.Dialer, error) {
|
||||||
|
return &DialerWarp{dialer: c.Dialer}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) ContextCookie() http.CookieJar {
|
||||||
|
return c.CookieJar
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) ContextTimeout() time.Duration {
|
||||||
|
return c.Timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DefaultController) ContextProxy() func(*http.Request) (*url.URL, error) {
|
||||||
|
return c.Proxy
|
||||||
|
}
|
||||||
|
|
||||||
|
type DialerWarp struct {
|
||||||
|
dialer proxy.Dialer
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConnWarp struct {
|
||||||
|
conn net.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) Read(b []byte) (n int, err error) {
|
||||||
|
return c.conn.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) Write(b []byte) (n int, err error) {
|
||||||
|
return c.conn.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) Close() error {
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) LocalAddr() net.Addr {
|
||||||
|
return c.conn.LocalAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) RemoteAddr() net.Addr {
|
||||||
|
return c.conn.RemoteAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) SetDeadline(t time.Time) error {
|
||||||
|
return c.conn.SetDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) SetReadDeadline(t time.Time) error {
|
||||||
|
return c.conn.SetReadDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConnWarp) SetWriteDeadline(t time.Time) error {
|
||||||
|
return c.conn.SetWriteDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DialerWarp) Dial(network, addr string) (c net.Conn, err error) {
|
||||||
|
conn, err := d.dialer.Dial(network, addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &ConnWarp{conn: conn}, nil
|
||||||
|
}
|
52
pkg/downloader/fetcher/fetcher.go
Normal file
52
pkg/downloader/fetcher/fetcher.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package fetcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gitea.bvbej.com/bvbej/base-golang/pkg/downloader/base"
|
||||||
|
"gitea.bvbej.com/bvbej/base-golang/pkg/downloader/controller"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Fetcher 对应协议的下载支持
|
||||||
|
type Fetcher interface {
|
||||||
|
// Setup 设置文件相关信息
|
||||||
|
Setup(ctl controller.Controller)
|
||||||
|
// Resolve 解析请求
|
||||||
|
Resolve(req *base.Request) (res *base.Resource, err error)
|
||||||
|
// Create 创建任务
|
||||||
|
Create(res *base.Resource, opts *base.Options) (err error)
|
||||||
|
// Start 开始
|
||||||
|
Start() (err error)
|
||||||
|
// Pause 暂停
|
||||||
|
Pause() (err error)
|
||||||
|
// Continue 继续
|
||||||
|
Continue() (err error)
|
||||||
|
// Progress 获取任务各个文件下载进度
|
||||||
|
Progress() Progress
|
||||||
|
// Wait 该方法会一直阻塞,直到任务下载结束
|
||||||
|
Wait() (err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type DefaultFetcher struct {
|
||||||
|
Ctl controller.Controller
|
||||||
|
DoneCh chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *DefaultFetcher) Setup(ctl controller.Controller) {
|
||||||
|
f.Ctl = ctl
|
||||||
|
f.DoneCh = make(chan error, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *DefaultFetcher) Wait() (err error) {
|
||||||
|
return <-f.DoneCh
|
||||||
|
}
|
||||||
|
|
||||||
|
// Progress 获取任务中各个文件的已下载字节数
|
||||||
|
type Progress []int64
|
||||||
|
|
||||||
|
// TotalDownloaded 获取任务总下载字节数
|
||||||
|
func (p Progress) TotalDownloaded() int64 {
|
||||||
|
total := int64(0)
|
||||||
|
for _, d := range p {
|
||||||
|
total += d
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
409
pkg/downloader/protocol/http/fetcher.go
Normal file
409
pkg/downloader/protocol/http/fetcher.go
Normal file
@ -0,0 +1,409 @@
|
|||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"gitea.bvbej.com/bvbej/base-golang/pkg/downloader/base"
|
||||||
|
"gitea.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
"io"
|
||||||
|
"mime"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RequestError struct {
|
||||||
|
Code int
|
||||||
|
Msg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRequestError(code int, msg string) *RequestError {
|
||||||
|
return &RequestError{Code: code, Msg: msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (re *RequestError) Error() string {
|
||||||
|
return fmt.Sprintf("http request fail,code:%d", re.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Fetcher struct {
|
||||||
|
*fetcher.DefaultFetcher
|
||||||
|
|
||||||
|
res *base.Resource
|
||||||
|
opts *base.Options
|
||||||
|
status base.Status
|
||||||
|
clients []*http.Response
|
||||||
|
chunks []*Chunk
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
pauseCh chan any
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFetcher() *Fetcher {
|
||||||
|
return &Fetcher{
|
||||||
|
DefaultFetcher: new(fetcher.DefaultFetcher),
|
||||||
|
pauseCh: make(chan any),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var protocols = []string{"HTTP", "HTTPS"}
|
||||||
|
|
||||||
|
func FetcherBuilder() ([]string, func() fetcher.Fetcher) {
|
||||||
|
return protocols, func() fetcher.Fetcher {
|
||||||
|
return NewFetcher()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) Resolve(req *base.Request) (*base.Resource, error) {
|
||||||
|
httpReq, err := f.buildRequest(nil, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
client, err := f.buildClient()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// 只访问一个字节,测试资源是否支持Range请求
|
||||||
|
httpReq.Header.Set(base.HttpHeaderRange, fmt.Sprintf(base.HttpHeaderRangeFormat, 0, 0))
|
||||||
|
httpResp, err := client.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// 拿到响应头就关闭,不用加defer
|
||||||
|
_ = httpResp.Body.Close()
|
||||||
|
res := &base.Resource{
|
||||||
|
Req: req,
|
||||||
|
Range: false,
|
||||||
|
Files: []*base.FileInfo{},
|
||||||
|
}
|
||||||
|
if base.HttpCodePartialContent == httpResp.StatusCode {
|
||||||
|
// 返回206响应码表示支持断点下载
|
||||||
|
res.Range = true
|
||||||
|
// 解析资源大小: bytes 0-1000/1001 => 1001
|
||||||
|
contentTotal := path.Base(httpResp.Header.Get(base.HttpHeaderContentRange))
|
||||||
|
if contentTotal != "" {
|
||||||
|
parse, err := strconv.ParseInt(contentTotal, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res.TotalSize = parse
|
||||||
|
}
|
||||||
|
} else if base.HttpCodeOK == httpResp.StatusCode {
|
||||||
|
// 返回200响应码,不支持断点下载,通过Content-Length头获取文件大小,获取不到的话可能是chunked编码
|
||||||
|
contentLength := httpResp.Header.Get(base.HttpHeaderContentLength)
|
||||||
|
if contentLength != "" {
|
||||||
|
parse, err := strconv.ParseInt(contentLength, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res.TotalSize = parse
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, NewRequestError(httpResp.StatusCode, httpResp.Status)
|
||||||
|
}
|
||||||
|
file := &base.FileInfo{
|
||||||
|
Size: res.TotalSize,
|
||||||
|
}
|
||||||
|
contentDisposition := httpResp.Header.Get(base.HttpHeaderContentDisposition)
|
||||||
|
if contentDisposition != "" {
|
||||||
|
_, params, _ := mime.ParseMediaType(contentDisposition)
|
||||||
|
filename := params["filename"]
|
||||||
|
if filename != "" {
|
||||||
|
file.Name = filename
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Get file filename by URL
|
||||||
|
if file.Name == "" && strings.Count(req.URL, "/") > 2 {
|
||||||
|
file.Name = filepath.Base(req.URL)
|
||||||
|
}
|
||||||
|
// unknown file filename
|
||||||
|
if file.Name == "" {
|
||||||
|
file.Name = "unknown"
|
||||||
|
}
|
||||||
|
res.Files = append(res.Files, file)
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) Create(res *base.Resource, opts *base.Options) error {
|
||||||
|
f.res = res
|
||||||
|
f.opts = opts
|
||||||
|
f.status = base.DownloadStatusReady
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) Start() (err error) {
|
||||||
|
// 创建文件
|
||||||
|
name := f.filename()
|
||||||
|
_, err = f.Ctl.Touch(name, f.res.TotalSize)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f.status = base.DownloadStatusStart
|
||||||
|
if f.res.Range {
|
||||||
|
// 每个连接平均需要下载的分块大小
|
||||||
|
chunkSize := f.res.TotalSize / int64(f.opts.Connections)
|
||||||
|
f.chunks = make([]*Chunk, f.opts.Connections)
|
||||||
|
f.clients = make([]*http.Response, f.opts.Connections)
|
||||||
|
for i := 0; i < f.opts.Connections; i++ {
|
||||||
|
var (
|
||||||
|
begin = chunkSize * int64(i)
|
||||||
|
end int64
|
||||||
|
)
|
||||||
|
if i == f.opts.Connections-1 {
|
||||||
|
// 最后一个分块需要保证把文件下载完
|
||||||
|
end = f.res.TotalSize - 1
|
||||||
|
} else {
|
||||||
|
end = begin + chunkSize - 1
|
||||||
|
}
|
||||||
|
chunk := NewChunk(begin, end)
|
||||||
|
f.chunks[i] = chunk
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 只支持单连接下载
|
||||||
|
f.chunks = make([]*Chunk, 1)
|
||||||
|
f.clients = make([]*http.Response, 1)
|
||||||
|
f.chunks[0] = NewChunk(0, 0)
|
||||||
|
}
|
||||||
|
f.fetch()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) Pause() (err error) {
|
||||||
|
if base.DownloadStatusStart != f.status {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.status = base.DownloadStatusPause
|
||||||
|
f.cancel()
|
||||||
|
<-f.pauseCh
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) Continue() (err error) {
|
||||||
|
if base.DownloadStatusStart == f.status || base.DownloadStatusDone == f.status {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.status = base.DownloadStatusStart
|
||||||
|
var name = f.filename()
|
||||||
|
_, err = f.Ctl.Open(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f.fetch()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) Progress() fetcher.Progress {
|
||||||
|
p := make(fetcher.Progress, 0)
|
||||||
|
if len(f.chunks) > 0 {
|
||||||
|
total := int64(0)
|
||||||
|
for _, chunk := range f.chunks {
|
||||||
|
total += chunk.Downloaded
|
||||||
|
}
|
||||||
|
p = append(p, total)
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) filename() string {
|
||||||
|
// 创建文件
|
||||||
|
var filename = f.opts.Name
|
||||||
|
if filename == "" {
|
||||||
|
filename = f.res.Files[0].Name
|
||||||
|
}
|
||||||
|
return filepath.Join(f.opts.Path, filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) fetch() {
|
||||||
|
f.ctx, f.cancel = context.WithCancel(context.Background())
|
||||||
|
eg, _ := errgroup.WithContext(f.ctx)
|
||||||
|
for i := 0; i < f.opts.Connections; i++ {
|
||||||
|
eg.Go(func() error {
|
||||||
|
return f.fetchChunk(i)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := eg.Wait()
|
||||||
|
// 下载停止,关闭文件句柄
|
||||||
|
_ = f.Ctl.Close(f.filename())
|
||||||
|
if f.status == base.DownloadStatusPause {
|
||||||
|
f.pauseCh <- nil
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
f.status = base.DownloadStatusError
|
||||||
|
} else {
|
||||||
|
f.status = base.DownloadStatusDone
|
||||||
|
}
|
||||||
|
f.DoneCh <- err
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) fetchChunk(index int) (err error) {
|
||||||
|
filename := f.filename()
|
||||||
|
chunk := f.chunks[index]
|
||||||
|
|
||||||
|
httpReq, err := f.buildRequest(f.ctx, f.res.Req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := f.buildClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf = make([]byte, 8192)
|
||||||
|
|
||||||
|
// 重试10次
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
// 如果下载完成直接返回
|
||||||
|
if chunk.Status == base.DownloadStatusDone {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 如果已暂停直接跳出
|
||||||
|
if f.status == base.DownloadStatusPause {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
resp *http.Response
|
||||||
|
retry bool
|
||||||
|
)
|
||||||
|
if f.res.Range {
|
||||||
|
httpReq.Header.Set(base.HttpHeaderRange,
|
||||||
|
fmt.Sprintf(base.HttpHeaderRangeFormat, chunk.Begin+chunk.Downloaded, chunk.End))
|
||||||
|
} else {
|
||||||
|
chunk.Downloaded = 0
|
||||||
|
}
|
||||||
|
err = func() error {
|
||||||
|
resp, err = client.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f.clients[index] = resp
|
||||||
|
if resp.StatusCode != base.HttpCodeOK && resp.StatusCode != base.HttpCodePartialContent {
|
||||||
|
err = NewRequestError(resp.StatusCode, resp.Status)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
//请求失败3s后重试
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 请求成功就重置错误次数,连续失败10次才终止
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
retry, err = func() (bool, error) {
|
||||||
|
defer func() {
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
}()
|
||||||
|
var n int
|
||||||
|
for {
|
||||||
|
n, err = resp.Body.Read(buf)
|
||||||
|
if n > 0 {
|
||||||
|
_, err = f.Ctl.Write(filename, chunk.Begin+chunk.Downloaded, buf[:n])
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
chunk.Downloaded += int64(n)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}()
|
||||||
|
if !retry {
|
||||||
|
// 下载成功,跳出重试
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.status == base.DownloadStatusPause {
|
||||||
|
chunk.Status = base.DownloadStatusPause
|
||||||
|
} else if chunk.Downloaded >= chunk.End-chunk.Begin+1 {
|
||||||
|
chunk.Status = base.DownloadStatusDone
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
chunk.Status = base.DownloadStatusError
|
||||||
|
} else {
|
||||||
|
chunk.Status = base.DownloadStatusDone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) buildClient() (*http.Client, error) {
|
||||||
|
dialer, err := f.Ctl.ContextDialer()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
transport := &http.Transport{
|
||||||
|
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
|
return dialer.Dial(network, addr)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if f.Ctl.ContextProxy() != nil {
|
||||||
|
transport.Proxy = f.Ctl.ContextProxy()
|
||||||
|
}
|
||||||
|
return &http.Client{
|
||||||
|
Jar: f.Ctl.ContextCookie(),
|
||||||
|
Timeout: f.Ctl.ContextTimeout(),
|
||||||
|
Transport: transport,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) buildRequest(ctx context.Context, req *base.Request) (httpReq *http.Request, err error) {
|
||||||
|
reqUrl, err := url.Parse(req.URL)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
method string
|
||||||
|
body io.Reader
|
||||||
|
)
|
||||||
|
headers := make(map[string][]string)
|
||||||
|
if req.Extra == nil {
|
||||||
|
method = http.MethodGet
|
||||||
|
} else {
|
||||||
|
extra := req.Extra.(Extra)
|
||||||
|
if extra.Method != "" {
|
||||||
|
method = extra.Method
|
||||||
|
} else {
|
||||||
|
method = http.MethodGet
|
||||||
|
}
|
||||||
|
if len(extra.Header) > 0 {
|
||||||
|
for k, v := range extra.Header {
|
||||||
|
headers[k] = []string{v}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if extra.Body != "" {
|
||||||
|
body = io.NopCloser(bytes.NewBufferString(extra.Body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx != nil {
|
||||||
|
httpReq, err = http.NewRequestWithContext(ctx, method, reqUrl.String(), body)
|
||||||
|
} else {
|
||||||
|
httpReq, err = http.NewRequest(method, reqUrl.String(), body)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
httpReq.Header = headers
|
||||||
|
return httpReq, nil
|
||||||
|
}
|
24
pkg/downloader/protocol/http/model.go
Normal file
24
pkg/downloader/protocol/http/model.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package http
|
||||||
|
|
||||||
|
import "gitea.bvbej.com/bvbej/base-golang/pkg/downloader/base"
|
||||||
|
|
||||||
|
type Chunk struct {
|
||||||
|
Status base.Status
|
||||||
|
Begin int64
|
||||||
|
End int64
|
||||||
|
Downloaded int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewChunk(begin int64, end int64) *Chunk {
|
||||||
|
return &Chunk{
|
||||||
|
Status: base.DownloadStatusReady,
|
||||||
|
Begin: begin,
|
||||||
|
End: end,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Extra struct {
|
||||||
|
Method string
|
||||||
|
Header map[string]string
|
||||||
|
Body string
|
||||||
|
}
|
25
pkg/downloader/util/timer.go
Normal file
25
pkg/downloader/util/timer.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package util
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Timer 计时器
|
||||||
|
type Timer struct {
|
||||||
|
t int64
|
||||||
|
used int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Timer) Start() {
|
||||||
|
t.t = time.Now().UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Timer) Pause() {
|
||||||
|
t.used += time.Now().UnixNano() - t.t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Timer) Continue() {
|
||||||
|
t.t = time.Now().UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Timer) Used() int64 {
|
||||||
|
return (time.Now().UnixNano() - t.t) + t.used
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user