关于golang:Go多线程下载优化滑动窗口并发控制

15次阅读

共计 6267 个字符,预计需要花费 16 分钟才能阅读完成。

应用 p2p 下载的时候有用户问是否核心节点下载实现后,能力开始 p2p 下载,尤其是几 GB 甚至几十 GB 的大文件。
理论中核心节点的文件是一边下载一边共享给其余节点的,从分片的角度看,下载完第一个分片,全节点共享下载过程就开始了。
本文介绍 p2p 核心节点作为初始节点下载文件提速的一些优化改良,同时借此分享下 golang 中的一些并发管制的思考

间接下载

最简略的间接下载,应用默认的 http client 发动申请,将数据流写入 writer 即可,实现相干的 writer 即可实现一边下载一边记录分片信息,从而提供给其余节点下载
社区 Dragonfly 我的项目的下载源文件的办法

// 间接下载文件
// writer 一边拷入文件会一边计算分片,每实现一个分片便能够共享给其余节点下载
func directDownload(url string, headers map[string]string, writer io.Writer) error {req, err := http.NewRequest(http.MethodGet, url, nil)
    if err != nil {return err}
    for k, v := range headers {req.Header.Set(k, v)
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil{return err}
    defer resp.Body.Close()
    if _, err := io.Copy(writer, resp.Body); err != nil {return err}
    return nil
}

多线程下载

然而当文件很大,单链接限速时,文件下载较慢,在 http 下载中通常提供了 Ranges:bytes=a- b 来指定本次申请下载 a~b 字节的局部,因而能够利用该性能来做多线程下载
指定线程数,文件大小,实现多线程下载一个文件,这也是 golang 中根底的并发执行子工作,搭配应用 sync.WaitGroup{}来检测所有的子工作是否实现,实现所有的子工作后即可退出

多线程下载在单线程速度无限时提速显著,例如单链接限速 5MB/ s 时,能够通过多线程来减速下载,取得几倍速度,一些开源的百度云下载减速也是多线程减速。

// 文件过大时,每个协程下载的分片仍旧很大,中断后重试的老本略高
func downloadWithMultiThreadsV1(url string, headers map[string]string, filePath string, threads int, size int64) error {pieceSize := size / int64(threads)
    wg := sync.WaitGroup{}
    for i := 0; i < threads; i++ {wg.Add(1)
        go func(pieceNum int64, pieceSize int64, headers map[string]string) {defer wg.Done()
            if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {panic(err)
            }
        }(int64(i), pieceSize, copyHeader(headers))
    }
    wg.Wait()
    return nil
}

// 下载分片写入文件
func downloadWithPiece(url string, headers map[string]string, filePath string, pieceNum int64, pieceSize int64) error {
    startRange := pieceNum * pieceSize
    endRange := (pieceNum+1)*pieceSize - 1
    byteRanges := fmt.Sprintf("bytes=%d-%d", startRange, endRange)
    headers["Range"] = byteRanges
    fw, err := os.OpenFile(filePath, os.O_WRONLY, os.ModePerm)
    if err != nil {return err}
    // 关上文件偏移至该写入的地位
    _, err = fw.Seek(startRange, 0)
    if err != nil {return err}
    err = directDownload(url, headers, fw)
    if err != nil {return err}
    return nil
}

基于小分片多线程下载

大文件在跨 region、跨国传输时,网络稳定容易导致链接断开后,间接开固定线程数去均分文件下载失败后须要重试下载的分片很大,重试老本较高。
因而能够分较小的分片去多线程下载,失败后重试老本也较低,实现断点续传也简略。
一个 1GB 的文件以 16MB 分片去下载,须要 60 次,显然像上一个办法那样间接 for 循环启动去下载会导致协程过多,且源站压力也会大,因而须要管制并发。

间接应用 channel 传递信号来管制并发个数是比拟根底的并发管制伎俩。
常见的并发编程写法可见《Go 语言中文文档》- 并发编程
《Go 语言高级编程》- 常见的并发模式
这样子既实现了多线程下载,在下载单个分片失败后,重试老本也较低。

// 以 16MB 的大小来下载文件
// 同时下载 3 个分片
// 各个分片竞争下载速度,会呈现下载分片 n 的时候速度很慢,而前面始终在下载 n +3、n+ 4 等导致前局部分片迟迟未实现
func downloadWithMultiThreadsV2(url string, headers map[string]string, filePath string, threads int, size int64) error {
    var pieceSize int64 = 16 * 1024 *1024
    totalPieceNum := (size+1) / pieceSize
    maxThreads := make(chan struct{}, threads)
    var pieceNum  int64 = 0
    wg := sync.WaitGroup{}
    for ; pieceNum <= totalPieceNum; pieceNum++{maxThreads <- struct{}{}
        wg.Add(1)
        go func(pieceNum int64, pieceSize int64, headers map[string]string) {defer func() {
                <-maxThreads
                wg.Done()}()
            if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {panic(err)
            }
        }(pieceNum, pieceSize, copyHeader(headers))
    }
    wg.Wait()
    return nil
}

下载文件校验

下载文件经常放心文件是否为原文件,因而下载后须要校验,最罕用的为 md5,然而文件较大时下载实现后,还要再从磁盘读取文件计算 md5,耗时耗力,md5 计算大概在 mac 上单核计算是 750MB/ s 左右,加上从磁盘读取速度就更低了,最好是能一边下一遍校验。
对于单线程下载大文件很容易实现校验,将接管数据流的 writer 与计算 md5 的 writer 组合成一个 writer,就能够边下载边校验 md5 了

// 计算下载的 md5hash
func downloadGetHash(url string, headers map[string]string, fw io.Writer) (string, error) {md5Hash := md5.New()
    writer := io.MultiWriter(fw, md5Hash)
    req, err := http.NewRequest(http.MethodGet, url, nil)
    if err != nil {return "", err}
    for k, v := range headers {req.Header.Set(k, v)
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil{return "", err}
    defer resp.Body.Close()

    if _, err := io.Copy(writer, resp.Body); err != nil {return "", err}
    md5Value := fmt.Sprintf("%x", md5Hash.Sum(nil))
    return md5Value, nil
}

多线程下载校验 md5 的难点

多线程下载时,分片实现程序不统一,无奈像单线程下载那样简略一边下载一边计算,不必反复从磁盘读取。
当然能够从头开始期待下载完一个分片,而后就去校验一个分片,刚下载好去读取进去校验,能够走文件 cache,不会额定占用磁盘 i / o 也是能够承受的。
然而在应用 channel 来管制并发的下载过程,可能呈现还在等第 1 个分片下载实现来校验,然而其余线程曾经下载 1 + n 个分片去了,这时候等第一个分片下载好去校验 md5 时,接下来 2~1+ n 个分片可能曾经不在 cache 里了,得走一遍磁盘读取了。

滑动窗口多线程分片下载

各个分片竞争下载速度,会呈现下载分片 n 的时候速度很慢,而前面始终在下载 n +3、n+ 4 等导致前局部分片迟迟未实现,导致晚期的分片无奈尽快下载实现提供给客户端下载,所以咱们心愿整体是按程序去下载分片的。
需要变成了 x 线程下载时,同时最多在下载 x 个分片,然而 n +1,n+ 2 等实现后还要等第 n 个分片实现能力去下载第 n + x 个分片,其实就是滑动窗口。
滑动窗口来管制并发能够防止后续竞争资源导致先发的工作迟迟不能实现,不会呈现靠前分片远远落后前面的分片,缩小了 cache 淘汰速度

tcp 传输中应用了滑动窗口进步吞吐量,依据网络状况来调整窗口大小,能够参考下文

tcp 滑动窗口简介

tcp 滑动窗口源码简析

咱们在应用多线程下载中应用线程数的窗口大小管制思维就能够了,找了圈 golang 滑动窗口,都是针对网关流量的工夫滑动窗口实现 - -, 只能本人实现了
滑动窗口的实质就是在进行第 n + x 个工作时检测第 n 个工作是否实现,实现了则能够持续,否则期待
思考一下能够应用 map+sync.mux 就能够实现一个简略并发管制的滑动窗口

package lock

import "sync"

type SlideWindowLocker struct {
    threads int
    window map[int]*sync.Mutex
}

func NewSlideWindow(threads int) *SlideWindowLocker  {window := make(map[int]*sync.Mutex)
    for i:=0;i<threads;i++{window[i] = &sync.Mutex{}}
    return &SlideWindowLocker{
        threads: threads,
        window:  window,
    }
}

// 能够获取第 n + x 个锁的前提是第 n 个锁已被解锁
func (sw *SlideWindowLocker) Lock(i int) *sync.Mutex {mux := sw.window[i % sw.threads]
    mux.Lock()
    return mux
}

func (sw *SlideWindowLocker) GetLock(i int) *sync.Mutex {mux := sw.window[i % sw.threads]
    return mux
}

应用滑动窗口来管制并发的多线程下载代码就能够实现了
至此,咱们就实现了一个多线程下载的办法

// 滑动窗口多线程下载文件
// 线程数为 j
// 下载第 n + j 个分片时要求第 n 个分片已实现,否则期待第 n 个分片实现
func downloadWithMultiThreadsV3(url string, headers map[string]string, filePath string, threads int, size int64) error {
    var pieceSize int64 = 16 * 1024 *1024
    totalPieceNum := (size+1) / pieceSize
    var pieceNum  int64 = 0
    wg := sync.WaitGroup{}

    window := NewSlideWindow(threads)
    for ; pieceNum <= totalPieceNum; pieceNum++{mux := window.Lock(int(pieceNum))
        wg.Add(1)
        go func(mux *sync.Mutex, pieceNum int64, pieceSize int64, headers map[string]string) {defer mux.Unlock()
            defer wg.Done()
            if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {panic(err)
            }
        }(mux, pieceNum, pieceSize, copyHeader(headers))
    }
    wg.Wait()
    return nil
}

多线程下载校验 md5 的实现

对于滑动窗口管制并发下载也稍作调整,
每次循环下载以后分片时间接开启多个协程下载之后的分片,应用 mux 管制并发,应用 map 来记录下载实现状况
以后下载实现后立刻将以后分片投入计算 hash

至此实现了多线程下载状况下校验全文 md5,同时又缩小磁盘 i / o 和计算工夫。

例如其余的 sha256 等校验(镜像文件应用该校验)也能够间接减少相应的 writer 来实现,校验文件是否统一

var finishMap = sync.Map{}
// 多线程下载并计算 md5
func downloadWithMultiThreadsV4(url string, headers map[string]string, filePath string, threads int, size int64) error {
    var pieceSize int64 = 16 * 1024 *1024
    totalPieceNum := (size+1) / pieceSize
    var pieceNum  int64 = 0

    md5Hash := md5.New()
    window := NewSlideWindow(threads)
    for ; pieceNum <= totalPieceNum; pieceNum++{
        // 并发下载
        for i := 1; i < threads && int(pieceNum)+i<int(totalPieceNum);i++{go downloadWithPieceWithMux(window.GetLock(int(pieceNum)+i), url, copyHeader(headers), filePath, pieceNum, pieceSize)
        }
        downloadWithPieceWithMux(window.GetLock(int(pieceNum)), url, copyHeader(headers), filePath, pieceNum, pieceSize)
        fo, _ := os.Open(filePath)
        fo.Seek(pieceNum * pieceSize, 0)
        io.CopyN(md5Hash, fo, pieceSize)
    }
    fmt.Sprintf("%x", md5Hash.Sum(nil))
    return nil
}

本处子协程下载分片的谬误间接 panic 仅为了缩小篇幅,理论中调用子协程去执行工作还须要去获取执行后果,获取谬误状况并加以解决。

正文完
 0