乐趣区

关于javascript:flvjs系列二网速计算与数据缓存

flv.js 源码知识点(二)

在上一篇文章中次要解说了 flv.js 的整顿流程,明天解说其中的网速计算和数据缓存解决。

1 网速计算

在音视频播放的场景中,用户的网速是影响体验的重要因素,播放器在播放的过程中,能够计算单位工夫获取的数据量来掂量网速。flv.js 的实例提供了 statistics_info 事件获取以后的网速。

flvPlayer.on('statistics_info', function(res) {console.log('statistics_info',res);
})

res 构造如下:

{
    currentSegmentIndex: 0,
    decodedFrames: 15,
    droppedFrames: 0,
    hasRedirect: false,
    loaderType: "fetch-stream-loader",
    playerType: "FlvPlayer",
    speed: 395.19075278358656,
    totalSegmentCount: 1,
    url: "https:/example.com/1.flv"
}

其中的 speed 字段就是网速,单位是 KB/s, 上面就看对于网速计算相干的局部。statistics_info 事件中获取网速的整体流程如下图:

  • IOController中管制每次把加载的字节数增加到 SpeedSampler 中,对外提供的 lastSecondKBps 属性是最近有数据一秒的网速。
  • TransmuxingController中管制播放器在加载数据的时候开启定时器获取统计数据,向上触发事件。

外围的计算还是 SpeedSampler 类, lastSecondKBps是 getter 属性获取最近有数据一秒的网速,代码含意参考正文。

 get lastSecondKBps () {
    // 如果够 1s 计算 this._lastSecondBytes
    this.addBytes(0)
    
    // 上 1 秒的_lastSecondBytes 有数据 就间接返回
    // 这个奇妙的是 感觉不是精确的 1s 然而又是精确的 因为如果是超过 1 秒就不持续增加了 1 秒内的就增加进去了。// 如果上一秒有数据则返回
    if (this._lastSecondBytes !== 0) {return this._lastSecondBytes / 1024} else {
      // 如果上一秒的速度是 0,并且间隔上次计算超过了 500ms 则用_intervalBytes 和 durationSeconds 进行计算
      if (this._now() - this._lastCheckpoint >= 500) {
        // if time interval since last checkpoint has exceeded 500ms
        // the speed is nearly accurate
        return this.currentKBps
      } else {
        // We don't know
        return 0
      }
    }
  }

上面是 addBytes 办法,依据本次调用的工夫和上一次计算工夫的差值做不同解决,具体参见代码正文,这种计算的思路是挺奇妙的,开始认为不准切,然而认真思考是能精确计算最近有数据一秒的网速。始终强调是最近有数据一秒的网速而不是上一秒的网速。

addBytes (bytes) {
    // 如果是第一次调用则 记录_firstCheckpoint _lastCheckpoint
    if (this._firstCheckpoint === 0) {this._firstCheckpoint = this._now()
      this._lastCheckpoint = this._firstCheckpoint
      this._intervalBytes += bytes
      this._totalBytes += bytes
    } else if (this._now() - this._lastCheckpoint < 1000) {
      // 小于 1s 就增加 _intervalBytes
      this._intervalBytes += bytes
      this._totalBytes += bytes
    } else { // duration >= 1000
    
      // 只有大于 1 秒的时候才计算_lastSecondBytes 
      // 就是这 1s 内的_intervalBytes
      this._lastSecondBytes = this._intervalBytes
      
      this._intervalBytes = bytes // 并且从新开始计算_intervalBytes 大于 1 秒的这次数据算在下 1 秒
      
      this._totalBytes += bytes
      this._lastCheckpoint = this._now()}
}

上面是 currentKBps getter 属性, 在lastSecondKBps 中只有当超过因为如果 durationSeconds 大于 0.5 时才应用 currentKBps 属性,因为如果 durationSeconds 过小,会过大预计了网速。

get currentKBps () {this.addBytes(0)

    let durationSeconds = (this._now() - this._lastCheckpoint) / 1000
    if (durationSeconds == 0) durationSeconds = 1
    return (this._intervalBytes / durationSeconds) / 1024
  }

均匀网速averageKBps,如果中途呈现网络中断或者暂停的状况会拉低均匀网速。

get averageKBps () {let durationSeconds = (this._now() - this._firstCheckpoint) / 1000
    return (this._totalBytes / durationSeconds) / 1024
 }

2 数据缓存解决

这里讲的缓存是指应用 loader 获取数据后到传给 FLVDemuxer 过程中的缓存。这个过程中为什么须要缓存呢?因为 FLV 格局数据的解封是以 TAG 为单位,而过去的数据是流式的字节,不可能每次是残缺的 TAG,所以 FLVDemuxer 每次只解决以后数据中残缺的 TAG,没有解决的局部就缓存起来,和下次获取的数据拼接。

通过下面的原理介绍,你应该能够猜到这个过程是放在 IOController 中,咱们先合成缓存中应用到的几个要害 API 和操作方法。

2.1 二进制缓存区格局

ArrayBuffer 对象用来示意通用的、固定长度的原始二进制数据缓冲区。
你不能间接操作 ArrayBuffer 的内容,而是要通过类型数组对象或 DataView 对象来操作,它们会将缓冲区中的数据表示为特定的格局,并通过这些格局来读写缓冲区的内容。

这里的定义 要害有两点,一是 ArrayBuffer 是固定长度,所以扩大的话须要创立新的而后把数据复制过来,而是不能间接操作,二是 不能间接操作,须要用类型数据对象,咱们这里用Uint8Array,因为 8 位无符号正好是以一个字节为单位。咱们这里对缓存的解决,临时不须要读取指定的字节,目前只须要可能读取指定地位的数据即可。

2.2 缓存区操作 API

Uint8Array 数组类型示意一个 8 位无符号整型数组,创立时内容被初始化为 0。创立完后,能够以对象的形式或应用数组下标索引的形式援用数组中的元素。

new Uint8Array(buffer [, byteOffset [, length]]);

阐明:在 ArrayBuffer 上创立 Uint8Array 对象,使缓存区可操作。
参数:bufferArrayBuffer 对象,byteOffset指定 ArrayBuffer 的起始字节数,length指定创立的长度。

typedarray.set(typedarray[, offset])
阐明:Uint8Array属于 typedarray, set 办法能够从指定类型化数据中读取值,并将其存储在类型化数组中的指定地位。
参数:typedarray是指要拷贝的源数据,offset指拷贝到指标数据的起始地位。

2.3 办法一 扩大缓存

依据下面的 api,把长度为 100 的 ArrayBuffer 扩大为长度为 1000 的ArrabyBuffer

const oldbuffer = new ArrayBuffer(100);
const u1 = new Uint8Array(oldbuffer, 0);
const newbuffer = new ArrayBuffer(1000);
const u2 = new Uint8Array(newbuffer,0);
u2.set(u1,0);
2.4 办法二 生产缓存

记录缓存生产地位,生产一部分后从新设置缓存。

let stashUsed = 100;
let bufferSize = 1024;
let stashBuffer = new ArrayBuffer(1024);

// 生产数据 返回生产的字节数

let consumed = dispatchChunks(stashBuffer.slice(0, stashUsed),stashUsed);
let allBuffer = new Uint8Array(stashBuffer, 0, bufferSize);
let remainBuffer = new Uint8Array(stashBuffer, consumed);
allBuffer.set(remainBuffer,0);
stashUsed = stashUsed-consumed;
2.5 缓存源码

上面就来看 IOController 中缓存数据的代码。
几个变量和办法的含意:

this._stashBuffer  ArrayBuffer 类型   存放数据的缓存区
this._bufferSize  缓存区的大小 this._stashBuffer 的长度
this._stashUsed   缓存区中应用的缓存大小
this._stashByteStart 曾经生产的局部在整个流中的开始地位
this._expandBuffer() 扩大缓存的办法
this.this._dispatchChunks() 生产缓存数据的办法 返回生产的数量
chunk ajax 获取的二进制数据

有了下面的筹备,就能够间接看缓存解决的代码了

// 缓存中没有数据的状况
if (this._stashUsed === 0) {
    // 间接生产
    let consumed = this._dispatchChunks(chunk, byteStart);
    // 如果有残余
    if (consumed < chunk.byteLength) {
        // 未解决的数据长度
        let remain = chunk.byteLength - consumed;
        // 如果数据超过缓存 则扩大缓存
        if (remain > this._bufferSize) {this._expandBuffer(remain);
        }
        // 在_stashBuffer 上创立 Uint8Array 使其能够操作
        let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
        // 从 chunk 的 consumed 开始获取数据 而后从第 0 地位开始写入 stashArray 中
        stashArray.set(new Uint8Array(chunk, consumed), 0);
        // 记录 stashUsed 的大小
        this._stashUsed += remain;
        // 记录整个流中的开始地位
        this._stashByteStart = byteStart + consumed;
    }
} else {
    // 缓存中有数据的状况
    // 先扩大缓存 可能放下已存在的和以后获取的
    if (this._stashUsed + chunk.byteLength > this._bufferSize) {this._expandBuffer(this._stashUsed + chunk.byteLength);
    }
    let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
    // 先把获取到的 chunk 放入缓存中 从_stashUsed 的 offset 开始寄存
    stashArray.set(new Uint8Array(chunk), this._stashUsed);
    // 重置_stashUsed
    this._stashUsed += chunk.byteLength;
    // 把缓存中的数据全副读出进行生产
    let consumed = this._dispatchChunks(this._stashBuffer.slice(0, this._stashUsed), this._stashByteStart);
    // 如果生产了有残余
    if (consumed < this._stashUsed && consumed > 0) {  // unconsumed data remain
        // 从 consumed 开始截取数据
        let remainArray = new Uint8Array(this._stashBuffer, consumed);
        // 从 0 开始设置 剩下的数据作为缓存 并且扭转_stashUsed 记录缓存的地位
        stashArray.set(remainArray, 0);
    }
    // 从新设置_stashUsed
    this._stashUsed -= consumed;
    this._stashByteStart += consumed;
}

下面的代码是每次来数据都会调用 this._dispatchChunks 进行生产操作,其实还有一种解决状况,通过变量 this._enableStash 管制,下面的状况是 this._enableStashfalse。如果为 true 的话区别是只有缓存的数据达到 this._stashSize 大小时,才会触发 this._dispatchChunks 进行生产操作。

总体的流程是如果数据小于 this._stashSize 则往缓存中增加,如果大于持续上面的判断
如果缓存中没有数据 则间接生产本地来的数据,如果有数据则生产缓存中的数据 生产之后再把本地来的数据放入缓存。具体参见代码

if (this._stashUsed === 0 && this._stashByteStart === 0) {  // seeked? or init chunk?
    // This is the first chunk after seek action
    this._stashByteStart = byteStart;
}
// 不满_stashSize 就会先往缓存中寄存 _stashSize 会动静调整 
if (this._stashUsed + chunk.byteLength <= this._stashSize) {let stashArray = new Uint8Array(this._stashBuffer, 0, this._stashSize);
    stashArray.set(new Uint8Array(chunk), this._stashUsed);
    this._stashUsed += chunk.byteLength;
} else {  // stashUsed + chunkSize > stashSize, size limit exceeded
    let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
    if (this._stashUsed > 0) {  // There're stash datas in buffer
        // 如果有缓存 先生产缓存中的数据
        let buffer = this._stashBuffer.slice(0, this._stashUsed);
        let consumed = this._dispatchChunks(buffer, this._stashByteStart);
        if (consumed < buffer.byteLength) {if (consumed > 0) {let remainArray = new Uint8Array(buffer, consumed);
                stashArray.set(remainArray, 0);
                this._stashUsed = remainArray.byteLength;
                this._stashByteStart += consumed;
            }
        } else {
            this._stashUsed = 0;
            this._stashByteStart += consumed;
        }
        // 生产完缓存中的数据之后,而后再把这次过去的 chunk 放入缓存中
        if (this._stashUsed + chunk.byteLength > this._bufferSize) {this._expandBuffer(this._stashUsed + chunk.byteLength);
            stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
        }
        stashArray.set(new Uint8Array(chunk), this._stashUsed);
        this._stashUsed += chunk.byteLength;
    } else {// stash buffer empty, but chunkSize > stashSize (oh, holy shit)
        // dispatch chunk directly and stash remain data
        // 如果缓存中没有数据 间接生产本次来的数据
        let consumed = this._dispatchChunks(chunk, byteStart);
        if (consumed < chunk.byteLength) {
            let remain = chunk.byteLength - consumed;
            if (remain > this._bufferSize) {this._expandBuffer(remain);
                stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
            }
            stashArray.set(new Uint8Array(chunk, consumed), 0);
            this._stashUsed += remain;
            this._stashByteStart = byteStart + consumed;
        }
    }
}

对于 this._stashSize 还有两个问题,
一是 this._stashSize 的大小会依据网速进行调整,二是 this._stashSize 是小于等于 this._bufferSize 缓存大小,所以 this._stashSize 变动时也须要扩大缓存。

// 先看获取网速的代码。

// 网速计算
this._speedSampler.addBytes(chunk.byteLength);

// adjust stash buffer size according to network speed dynamically
// 获取以后网速
let KBps = this._speedSampler.lastSecondKBps;
if (KBps !== 0) {
  // 正规化网速
  let normalized = this._normalizeSpeed(KBps);
  if (this._speedNormalized !== normalized) {
      this._speedNormalized = normalized;
      this._adjustStashSize(normalized);
  }
}

其中的 _normalizeSpeed 办法是在给定的速度中二分查找最靠近网速的大小。

this._speedNormalizeList = [64, 128, 256, 384, 512, 768, 1024, 1536, 2048, 3072, 4096];
_normalizeSpeed(input) {
    let list = this._speedNormalizeList;
    let last = list.length - 1;
    let mid = 0;
    let lbound = 0;
    let ubound = last;
    if (input < list[0]) {return list[0];
    }
    // binary search
    while (lbound <= ubound) {mid = lbound + Math.floor((ubound - lbound) / 2);
        if (mid === last || (input >= list[mid] && input < list[mid + 1])) {return list[mid];
        } else if (list[mid] < input) {lbound = mid + 1;} else {ubound = mid - 1;}
    }
}

_adjustStashSize是调整 this._stashSize 的办法,当缓存的大小小于 this._stashSize 时,则进行扩大。

_adjustStashSize(normalized) {
    let stashSizeKB = 0;
    // 如果是直播 
    if (this._config.isLive) {
        // live stream: always use single normalized speed for size of stashSizeKB
        stashSizeKB = normalized;
    } else {if (normalized < 512) {stashSizeKB = normalized;} else if (normalized >= 512 && normalized <= 1024) {stashSizeKB = Math.floor(normalized * 1.5);
        } else {stashSizeKB = normalized * 2;}
    }
    // 最大是 8K
    if (stashSizeKB > 8192) {stashSizeKB = 8192;}
    let bufferSize = stashSizeKB * 1024 + 1024 * 1024 * 1;  // stashSize + 1MB
    // 如果缓存小则扩大缓存
    if (this._bufferSize < bufferSize) {this._expandBuffer(bufferSize);
    }
    this._stashSize = stashSizeKB * 1024;
}

扩大缓存的 _expandBuffer 办法和咱们写的 demo 很类似。

_expandBuffer(expectedBytes) {
    let bufferNewSize = this._stashSize;
    // 每次 *2 直到大于 expectedBytes
    while (bufferNewSize + 1024 * 1024 * 1 < expectedBytes) {bufferNewSize *= 2;}

    bufferNewSize += 1024 * 1024 * 1;  // bufferSize = stashSize + 1MB
    if (bufferNewSize === this._bufferSize) {return;}
    // 新的缓存区
    let newBuffer = new ArrayBuffer(bufferNewSize);
    // 旧缓存区有数据 则进行拷贝
    if (this._stashUsed > 0) {  // copy existing data into new buffer
        let stashOldArray = new Uint8Array(this._stashBuffer, 0, this._stashUsed);
        let stashNewArray = new Uint8Array(newBuffer, 0, bufferNewSize);
        stashNewArray.set(stashOldArray, 0);
    }
    // 重设缓存区和缓存区大小
    this._stashBuffer = newBuffer;
    this._bufferSize = bufferNewSize;
}

总结:这篇文章的播种是网速计算的思路能够利用到相似场景中,比方限流。数据缓存中的二进制缓存区的操作方法。下篇文章中解说 FLV 格局的解析和波及到的位操作。

如果感觉有播种请关注微信公众号 前端良文 每周都会分享前端开发中的干货知识点。

退出移动版