flv.js源码知识点(二)

在上一篇文章中次要解说了flv.js的整顿流程,明天解说其中的网速计算和数据缓存解决。

1 网速计算

在音视频播放的场景中,用户的网速是影响体验的重要因素,播放器在播放的过程中,能够计算单位工夫获取的数据量来掂量网速。flv.js的实例提供了statistics_info事件获取以后的网速。

flvPlayer.on('statistics_info', function(res) {    console.log('statistics_info',res);})

res构造如下:

{    currentSegmentIndex: 0,    decodedFrames: 15,    droppedFrames: 0,    hasRedirect: false,    loaderType: "fetch-stream-loader",    playerType: "FlvPlayer",    speed: 395.19075278358656,    totalSegmentCount: 1,    url: "https:/example.com/1.flv"}

其中的speed字段就是网速,单位是KB/s, 上面就看对于网速计算相干的局部。statistics_info事件中获取网速的整体流程如下图:

  • IOController中管制每次把加载的字节数增加到SpeedSampler中,对外提供的lastSecondKBps属性是最近有数据一秒的网速。
  • TransmuxingController中管制播放器在加载数据的时候开启定时器获取统计数据,向上触发事件。

外围的计算还是SpeedSampler类, lastSecondKBps是getter属性获取最近有数据一秒的网速,代码含意参考正文。

 get lastSecondKBps () {    // 如果够1s计算 this._lastSecondBytes    this.addBytes(0)        // 上1秒的_lastSecondBytes有数据 就间接返回    // 这个奇妙的是 感觉不是精确的1s 然而又是精确的 因为如果是超过1秒就不持续增加了 1秒内的就增加进去了。        // 如果上一秒有数据则返回    if (this._lastSecondBytes !== 0) {      return this._lastSecondBytes / 1024    } else {      // 如果上一秒的速度是0,并且间隔上次计算超过了500ms 则用_intervalBytes和durationSeconds进行计算      if (this._now() - this._lastCheckpoint >= 500) {        // if time interval since last checkpoint has exceeded 500ms        // the speed is nearly accurate        return this.currentKBps      } else {        // We don't know        return 0      }    }  }

上面是addBytes办法,依据本次调用的工夫和上一次计算工夫的差值做不同解决,具体参见代码正文,这种计算的思路是挺奇妙的,开始认为不准切,然而认真思考是能精确计算最近有数据一秒的网速。始终强调是最近有数据一秒的网速而不是上一秒的网速。

addBytes (bytes) {    // 如果是第一次调用则 记录_firstCheckpoint _lastCheckpoint    if (this._firstCheckpoint === 0) {      this._firstCheckpoint = this._now()      this._lastCheckpoint = this._firstCheckpoint      this._intervalBytes += bytes      this._totalBytes += bytes    } else if (this._now() - this._lastCheckpoint < 1000) {      // 小于1s 就增加 _intervalBytes      this._intervalBytes += bytes      this._totalBytes += bytes    } else { // duration >= 1000          // 只有大于1秒的时候才计算_lastSecondBytes       // 就是这1s内的_intervalBytes      this._lastSecondBytes = this._intervalBytes            this._intervalBytes = bytes // 并且从新开始计算_intervalBytes 大于1秒的这次数据算在下1秒            this._totalBytes += bytes      this._lastCheckpoint = this._now()    }}

上面是currentKBps getter属性,在lastSecondKBps中只有当超过因为如果durationSeconds大于0.5时才应用currentKBps属性,因为如果durationSeconds过小,会过大预计了网速。

get currentKBps () {    this.addBytes(0)    let durationSeconds = (this._now() - this._lastCheckpoint) / 1000    if (durationSeconds == 0) durationSeconds = 1    return (this._intervalBytes / durationSeconds) / 1024  }

均匀网速averageKBps, 如果中途呈现网络中断或者暂停的状况会拉低均匀网速。

get averageKBps () {    let durationSeconds = (this._now() - this._firstCheckpoint) / 1000    return (this._totalBytes / durationSeconds) / 1024 }

2 数据缓存解决

这里讲的缓存是指应用loader获取数据后到传给FLVDemuxer过程中的缓存。这个过程中为什么须要缓存呢?因为FLV格局数据的解封是以TAG为单位,而过去的数据是流式的字节,不可能每次是残缺的TAG,所以FLVDemuxer每次只解决以后数据中残缺的TAG,没有解决的局部就缓存起来,和下次获取的数据拼接。

通过下面的原理介绍,你应该能够猜到这个过程是放在IOController中,咱们先合成缓存中应用到的几个要害API和操作方法。

2.1 二进制缓存区格局

ArrayBuffer 对象用来示意通用的、固定长度的原始二进制数据缓冲区。
你不能间接操作 ArrayBuffer 的内容,而是要通过类型数组对象或 DataView 对象来操作,它们会将缓冲区中的数据表示为特定的格局,并通过这些格局来读写缓冲区的内容。

这里的定义 要害有两点,一是ArrayBuffer是固定长度,所以扩大的话须要创立新的而后把数据复制过来,而是不能间接操作,二是 不能间接操作,须要用类型数据对象,咱们这里用Uint8Array,因为8位无符号正好是以一个字节为单位。咱们这里对缓存的解决,临时不须要读取指定的字节,目前只须要可能读取指定地位的数据即可。

2.2 缓存区操作API

Uint8Array 数组类型示意一个8位无符号整型数组,创立时内容被初始化为0。创立完后,能够以对象的形式或应用数组下标索引的形式援用数组中的元素。

new Uint8Array(buffer [, byteOffset [, length]]);

阐明:在ArrayBuffer上创立Uint8Array对象,使缓存区可操作。
参数: bufferArrayBuffer对象,byteOffset指定ArrayBuffer的起始字节数,length指定创立的长度。

typedarray.set(typedarray[, offset])
阐明:Uint8Array属于typedarray, set办法能够从指定类型化数据中读取值,并将其存储在类型化数组中的指定地位。
参数:typedarray是指要拷贝的源数据,offset指拷贝到指标数据的起始地位。

2.3 办法一 扩大缓存

依据下面的api,把长度为100的ArrayBuffer扩大为长度为1000的ArrabyBuffer

const oldbuffer = new ArrayBuffer(100);const u1 = new Uint8Array(oldbuffer, 0);const newbuffer = new ArrayBuffer(1000);const u2 = new Uint8Array(newbuffer,0);u2.set(u1,0);
2.4 办法二 生产缓存

记录缓存生产地位,生产一部分后从新设置缓存。

let stashUsed = 100;let bufferSize = 1024;let stashBuffer = new ArrayBuffer(1024);// 生产数据 返回生产的字节数let consumed = dispatchChunks(stashBuffer.slice(0, stashUsed),stashUsed);let allBuffer = new Uint8Array(stashBuffer, 0, bufferSize);let remainBuffer = new Uint8Array(stashBuffer, consumed);allBuffer.set(remainBuffer,0);stashUsed = stashUsed-consumed;
2.5 缓存源码

上面就来看IOController中缓存数据的代码。
几个变量和办法的含意:

this._stashBuffer  ArrayBuffer类型   存放数据的缓存区this._bufferSize  缓存区的大小 this._stashBuffer的长度this._stashUsed   缓存区中应用的缓存大小this._stashByteStart 曾经生产的局部在整个流中的开始地位this._expandBuffer() 扩大缓存的办法this.this._dispatchChunks() 生产缓存数据的办法 返回生产的数量chunk ajax获取的二进制数据

有了下面的筹备,就能够间接看缓存解决的代码了

// 缓存中没有数据的状况if (this._stashUsed === 0) {    // 间接生产    let consumed = this._dispatchChunks(chunk, byteStart);    // 如果有残余    if (consumed < chunk.byteLength) {        // 未解决的数据长度        let remain = chunk.byteLength - consumed;        // 如果数据超过缓存 则扩大缓存        if (remain > this._bufferSize) {            this._expandBuffer(remain);        }        // 在_stashBuffer上创立 Uint8Array使其能够操作        let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);        // 从chunk的 consumed开始获取数据 而后从第0地位开始写入stashArray中        stashArray.set(new Uint8Array(chunk, consumed), 0);        // 记录stashUsed的大小        this._stashUsed += remain;        // 记录整个流中的开始地位        this._stashByteStart = byteStart + consumed;    }} else {    // 缓存中有数据的状况    // 先扩大缓存 可能放下已存在的和以后获取的    if (this._stashUsed + chunk.byteLength > this._bufferSize) {        this._expandBuffer(this._stashUsed + chunk.byteLength);    }    let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);    // 先把获取到的chunk 放入缓存中 从_stashUsed的offset开始寄存    stashArray.set(new Uint8Array(chunk), this._stashUsed);    // 重置_stashUsed    this._stashUsed += chunk.byteLength;    // 把缓存中的数据全副读出进行生产    let consumed = this._dispatchChunks(this._stashBuffer.slice(0, this._stashUsed), this._stashByteStart);    // 如果生产了有残余    if (consumed < this._stashUsed && consumed > 0) {  // unconsumed data remain        // 从consumed开始截取数据        let remainArray = new Uint8Array(this._stashBuffer, consumed);        // 从0开始设置 剩下的数据作为缓存 并且扭转_stashUsed 记录缓存的地位        stashArray.set(remainArray, 0);    }    // 从新设置_stashUsed    this._stashUsed -= consumed;    this._stashByteStart += consumed;}

下面的代码是每次来数据都会调用this._dispatchChunks进行生产操作,其实还有一种解决状况,通过变量this._enableStash管制,下面的状况是this._enableStashfalse。如果为true的话区别是只有缓存的数据达到this._stashSize大小时,才会触发this._dispatchChunks进行生产操作。

总体的流程是如果数据小于this._stashSize 则往缓存中增加,如果大于持续上面的判断
如果缓存中没有数据 则间接生产本地来的数据,如果有数据则生产缓存中的数据 生产之后再把本地来的数据放入缓存。具体参见代码

if (this._stashUsed === 0 && this._stashByteStart === 0) {  // seeked? or init chunk?    // This is the first chunk after seek action    this._stashByteStart = byteStart;}// 不满_stashSize 就会先往缓存中寄存 _stashSize会动静调整 if (this._stashUsed + chunk.byteLength <= this._stashSize) {    let stashArray = new Uint8Array(this._stashBuffer, 0, this._stashSize);    stashArray.set(new Uint8Array(chunk), this._stashUsed);    this._stashUsed += chunk.byteLength;} else {  // stashUsed + chunkSize > stashSize, size limit exceeded    let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);    if (this._stashUsed > 0) {  // There're stash datas in buffer        // 如果有缓存 先生产缓存中的数据        let buffer = this._stashBuffer.slice(0, this._stashUsed);        let consumed = this._dispatchChunks(buffer, this._stashByteStart);        if (consumed < buffer.byteLength) {            if (consumed > 0) {                let remainArray = new Uint8Array(buffer, consumed);                stashArray.set(remainArray, 0);                this._stashUsed = remainArray.byteLength;                this._stashByteStart += consumed;            }        } else {            this._stashUsed = 0;            this._stashByteStart += consumed;        }        // 生产完缓存中的数据之后,而后再把这次过去的chunk放入缓存中        if (this._stashUsed + chunk.byteLength > this._bufferSize) {            this._expandBuffer(this._stashUsed + chunk.byteLength);            stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);        }        stashArray.set(new Uint8Array(chunk), this._stashUsed);        this._stashUsed += chunk.byteLength;    } else {  // stash buffer empty, but chunkSize > stashSize (oh, holy shit)        // dispatch chunk directly and stash remain data        // 如果缓存中没有数据 间接生产本次来的数据        let consumed = this._dispatchChunks(chunk, byteStart);        if (consumed < chunk.byteLength) {            let remain = chunk.byteLength - consumed;            if (remain > this._bufferSize) {                this._expandBuffer(remain);                stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);            }            stashArray.set(new Uint8Array(chunk, consumed), 0);            this._stashUsed += remain;            this._stashByteStart = byteStart + consumed;        }    }}

对于this._stashSize还有两个问题,
一是this._stashSize的大小会依据网速进行调整,二是this._stashSize是小于等于this._bufferSize缓存大小,所以this._stashSize变动时也须要扩大缓存。

// 先看获取网速的代码。

//网速计算this._speedSampler.addBytes(chunk.byteLength);// adjust stash buffer size according to network speed dynamically// 获取以后网速let KBps = this._speedSampler.lastSecondKBps;if (KBps !== 0) {  // 正规化网速  let normalized = this._normalizeSpeed(KBps);  if (this._speedNormalized !== normalized) {      this._speedNormalized = normalized;      this._adjustStashSize(normalized);  }}

其中的_normalizeSpeed办法是在给定的速度中二分查找最靠近网速的大小。

this._speedNormalizeList = [64, 128, 256, 384, 512, 768, 1024, 1536, 2048, 3072, 4096];_normalizeSpeed(input) {    let list = this._speedNormalizeList;    let last = list.length - 1;    let mid = 0;    let lbound = 0;    let ubound = last;    if (input < list[0]) {        return list[0];    }    // binary search    while (lbound <= ubound) {        mid = lbound + Math.floor((ubound - lbound) / 2);        if (mid === last || (input >= list[mid] && input < list[mid + 1])) {            return list[mid];        } else if (list[mid] < input) {            lbound = mid + 1;        } else {            ubound = mid - 1;        }    }}

_adjustStashSize是调整this._stashSize的办法,当缓存的大小小于this._stashSize时,则进行扩大。

_adjustStashSize(normalized) {    let stashSizeKB = 0;    // 如果是直播     if (this._config.isLive) {        // live stream: always use single normalized speed for size of stashSizeKB        stashSizeKB = normalized;    } else {        if (normalized < 512) {            stashSizeKB = normalized;        } else if (normalized >= 512 && normalized <= 1024) {            stashSizeKB = Math.floor(normalized * 1.5);        } else {            stashSizeKB = normalized * 2;        }    }    // 最大是8K    if (stashSizeKB > 8192) {        stashSizeKB = 8192;    }    let bufferSize = stashSizeKB * 1024 + 1024 * 1024 * 1;  // stashSize + 1MB    // 如果缓存小则扩大缓存    if (this._bufferSize < bufferSize) {        this._expandBuffer(bufferSize);    }    this._stashSize = stashSizeKB * 1024;}

扩大缓存的_expandBuffer办法和咱们写的demo很类似。

_expandBuffer(expectedBytes) {    let bufferNewSize = this._stashSize;    // 每次*2 直到大于expectedBytes    while (bufferNewSize + 1024 * 1024 * 1 < expectedBytes) {        bufferNewSize *= 2;    }    bufferNewSize += 1024 * 1024 * 1;  // bufferSize = stashSize + 1MB    if (bufferNewSize === this._bufferSize) {        return;    }    // 新的缓存区    let newBuffer = new ArrayBuffer(bufferNewSize);    // 旧缓存区有数据 则进行拷贝    if (this._stashUsed > 0) {  // copy existing data into new buffer        let stashOldArray = new Uint8Array(this._stashBuffer, 0, this._stashUsed);        let stashNewArray = new Uint8Array(newBuffer, 0, bufferNewSize);        stashNewArray.set(stashOldArray, 0);    }    // 重设缓存区和缓存区大小    this._stashBuffer = newBuffer;    this._bufferSize = bufferNewSize;}

总结: 这篇文章的播种是网速计算的思路能够利用到相似场景中,比方限流。数据缓存中的二进制缓存区的操作方法。下篇文章中解说FLV格局的解析和波及到的位操作。

如果感觉有播种请关注微信公众号 前端良文 每周都会分享前端开发中的干货知识点。