flv.js 源码知识点(二)
在上一篇文章中次要解说了 flv.js 的整顿流程,明天解说其中的网速计算和数据缓存解决。
1 网速计算
在音视频播放的场景中,用户的网速是影响体验的重要因素,播放器在播放的过程中,能够计算单位工夫获取的数据量来掂量网速。flv.js 的实例提供了 statistics_info
事件获取以后的网速。
flvPlayer.on('statistics_info', function(res) {console.log('statistics_info',res);
})
res 构造如下:
{
currentSegmentIndex: 0,
decodedFrames: 15,
droppedFrames: 0,
hasRedirect: false,
loaderType: "fetch-stream-loader",
playerType: "FlvPlayer",
speed: 395.19075278358656,
totalSegmentCount: 1,
url: "https:/example.com/1.flv"
}
其中的 speed
字段就是网速,单位是 KB/s
, 上面就看对于网速计算相干的局部。statistics_info
事件中获取网速的整体流程如下图:
IOController
中管制每次把加载的字节数增加到SpeedSampler
中,对外提供的lastSecondKBps
属性是最近有数据一秒的网速。TransmuxingController
中管制播放器在加载数据的时候开启定时器获取统计数据,向上触发事件。
外围的计算还是 SpeedSampler
类, lastSecondKBps
是 getter 属性获取最近有数据一秒的网速,代码含意参考正文。
get lastSecondKBps () {
// 如果够 1s 计算 this._lastSecondBytes
this.addBytes(0)
// 上 1 秒的_lastSecondBytes 有数据 就间接返回
// 这个奇妙的是 感觉不是精确的 1s 然而又是精确的 因为如果是超过 1 秒就不持续增加了 1 秒内的就增加进去了。// 如果上一秒有数据则返回
if (this._lastSecondBytes !== 0) {return this._lastSecondBytes / 1024} else {
// 如果上一秒的速度是 0,并且间隔上次计算超过了 500ms 则用_intervalBytes 和 durationSeconds 进行计算
if (this._now() - this._lastCheckpoint >= 500) {
// if time interval since last checkpoint has exceeded 500ms
// the speed is nearly accurate
return this.currentKBps
} else {
// We don't know
return 0
}
}
}
上面是 addBytes
办法,依据本次调用的工夫和上一次计算工夫的差值做不同解决,具体参见代码正文,这种计算的思路是挺奇妙的,开始认为不准切,然而认真思考是能精确计算最近有数据一秒的网速。始终强调是最近有数据一秒的网速而不是上一秒的网速。
addBytes (bytes) {
// 如果是第一次调用则 记录_firstCheckpoint _lastCheckpoint
if (this._firstCheckpoint === 0) {this._firstCheckpoint = this._now()
this._lastCheckpoint = this._firstCheckpoint
this._intervalBytes += bytes
this._totalBytes += bytes
} else if (this._now() - this._lastCheckpoint < 1000) {
// 小于 1s 就增加 _intervalBytes
this._intervalBytes += bytes
this._totalBytes += bytes
} else { // duration >= 1000
// 只有大于 1 秒的时候才计算_lastSecondBytes
// 就是这 1s 内的_intervalBytes
this._lastSecondBytes = this._intervalBytes
this._intervalBytes = bytes // 并且从新开始计算_intervalBytes 大于 1 秒的这次数据算在下 1 秒
this._totalBytes += bytes
this._lastCheckpoint = this._now()}
}
上面是 currentKBps
getter 属性, 在lastSecondKBps
中只有当超过因为如果 durationSeconds 大于 0.5 时才应用 currentKBps
属性,因为如果 durationSeconds 过小,会过大预计了网速。
get currentKBps () {this.addBytes(0)
let durationSeconds = (this._now() - this._lastCheckpoint) / 1000
if (durationSeconds == 0) durationSeconds = 1
return (this._intervalBytes / durationSeconds) / 1024
}
均匀网速averageKBps
,如果中途呈现网络中断或者暂停的状况会拉低均匀网速。
get averageKBps () {let durationSeconds = (this._now() - this._firstCheckpoint) / 1000
return (this._totalBytes / durationSeconds) / 1024
}
2 数据缓存解决
这里讲的缓存是指应用 loader
获取数据后到传给 FLVDemuxer
过程中的缓存。这个过程中为什么须要缓存呢?因为 FLV 格局数据的解封是以 TAG 为单位,而过去的数据是流式的字节,不可能每次是残缺的 TAG,所以 FLVDemuxer
每次只解决以后数据中残缺的 TAG,没有解决的局部就缓存起来,和下次获取的数据拼接。
通过下面的原理介绍,你应该能够猜到这个过程是放在 IOController
中,咱们先合成缓存中应用到的几个要害 API 和操作方法。
2.1 二进制缓存区格局
ArrayBuffer
对象用来示意通用的、固定长度的原始二进制数据缓冲区。
你不能间接操作 ArrayBuffer
的内容,而是要通过类型数组对象或 DataView
对象来操作,它们会将缓冲区中的数据表示为特定的格局,并通过这些格局来读写缓冲区的内容。
这里的定义 要害有两点,一是 ArrayBuffer
是固定长度,所以扩大的话须要创立新的而后把数据复制过来,而是不能间接操作,二是 不能间接操作,须要用类型数据对象,咱们这里用Uint8Array
,因为 8 位无符号正好是以一个字节为单位。咱们这里对缓存的解决,临时不须要读取指定的字节,目前只须要可能读取指定地位的数据即可。
2.2 缓存区操作 API
Uint8Array
数组类型示意一个 8 位无符号整型数组,创立时内容被初始化为 0。创立完后,能够以对象的形式或应用数组下标索引的形式援用数组中的元素。
new Uint8Array(buffer [, byteOffset [, length]]);
阐明:在 ArrayBuffer
上创立 Uint8Array
对象,使缓存区可操作。
参数:buffer
为 ArrayBuffer
对象,byteOffset
指定 ArrayBuffer
的起始字节数,length
指定创立的长度。
typedarray.set(typedarray[, offset])
阐明:Uint8Array
属于 typedarray
, set 办法能够从指定类型化数据中读取值,并将其存储在类型化数组中的指定地位。
参数:typedarray
是指要拷贝的源数据,offset
指拷贝到指标数据的起始地位。
2.3 办法一 扩大缓存
依据下面的 api,把长度为 100 的 ArrayBuffer
扩大为长度为 1000 的ArrabyBuffer
。
const oldbuffer = new ArrayBuffer(100);
const u1 = new Uint8Array(oldbuffer, 0);
const newbuffer = new ArrayBuffer(1000);
const u2 = new Uint8Array(newbuffer,0);
u2.set(u1,0);
2.4 办法二 生产缓存
记录缓存生产地位,生产一部分后从新设置缓存。
let stashUsed = 100;
let bufferSize = 1024;
let stashBuffer = new ArrayBuffer(1024);
// 生产数据 返回生产的字节数
let consumed = dispatchChunks(stashBuffer.slice(0, stashUsed),stashUsed);
let allBuffer = new Uint8Array(stashBuffer, 0, bufferSize);
let remainBuffer = new Uint8Array(stashBuffer, consumed);
allBuffer.set(remainBuffer,0);
stashUsed = stashUsed-consumed;
2.5 缓存源码
上面就来看 IOController
中缓存数据的代码。
几个变量和办法的含意:
this._stashBuffer ArrayBuffer 类型 存放数据的缓存区
this._bufferSize 缓存区的大小 this._stashBuffer 的长度
this._stashUsed 缓存区中应用的缓存大小
this._stashByteStart 曾经生产的局部在整个流中的开始地位
this._expandBuffer() 扩大缓存的办法
this.this._dispatchChunks() 生产缓存数据的办法 返回生产的数量
chunk ajax 获取的二进制数据
有了下面的筹备,就能够间接看缓存解决的代码了
// 缓存中没有数据的状况
if (this._stashUsed === 0) {
// 间接生产
let consumed = this._dispatchChunks(chunk, byteStart);
// 如果有残余
if (consumed < chunk.byteLength) {
// 未解决的数据长度
let remain = chunk.byteLength - consumed;
// 如果数据超过缓存 则扩大缓存
if (remain > this._bufferSize) {this._expandBuffer(remain);
}
// 在_stashBuffer 上创立 Uint8Array 使其能够操作
let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
// 从 chunk 的 consumed 开始获取数据 而后从第 0 地位开始写入 stashArray 中
stashArray.set(new Uint8Array(chunk, consumed), 0);
// 记录 stashUsed 的大小
this._stashUsed += remain;
// 记录整个流中的开始地位
this._stashByteStart = byteStart + consumed;
}
} else {
// 缓存中有数据的状况
// 先扩大缓存 可能放下已存在的和以后获取的
if (this._stashUsed + chunk.byteLength > this._bufferSize) {this._expandBuffer(this._stashUsed + chunk.byteLength);
}
let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
// 先把获取到的 chunk 放入缓存中 从_stashUsed 的 offset 开始寄存
stashArray.set(new Uint8Array(chunk), this._stashUsed);
// 重置_stashUsed
this._stashUsed += chunk.byteLength;
// 把缓存中的数据全副读出进行生产
let consumed = this._dispatchChunks(this._stashBuffer.slice(0, this._stashUsed), this._stashByteStart);
// 如果生产了有残余
if (consumed < this._stashUsed && consumed > 0) { // unconsumed data remain
// 从 consumed 开始截取数据
let remainArray = new Uint8Array(this._stashBuffer, consumed);
// 从 0 开始设置 剩下的数据作为缓存 并且扭转_stashUsed 记录缓存的地位
stashArray.set(remainArray, 0);
}
// 从新设置_stashUsed
this._stashUsed -= consumed;
this._stashByteStart += consumed;
}
下面的代码是每次来数据都会调用 this._dispatchChunks
进行生产操作,其实还有一种解决状况,通过变量 this._enableStash
管制,下面的状况是 this._enableStash
为false
。如果为 true
的话区别是只有缓存的数据达到 this._stashSize
大小时,才会触发 this._dispatchChunks
进行生产操作。
总体的流程是如果数据小于 this._stashSize
则往缓存中增加,如果大于持续上面的判断
如果缓存中没有数据 则间接生产本地来的数据,如果有数据则生产缓存中的数据 生产之后再把本地来的数据放入缓存。具体参见代码
if (this._stashUsed === 0 && this._stashByteStart === 0) { // seeked? or init chunk?
// This is the first chunk after seek action
this._stashByteStart = byteStart;
}
// 不满_stashSize 就会先往缓存中寄存 _stashSize 会动静调整
if (this._stashUsed + chunk.byteLength <= this._stashSize) {let stashArray = new Uint8Array(this._stashBuffer, 0, this._stashSize);
stashArray.set(new Uint8Array(chunk), this._stashUsed);
this._stashUsed += chunk.byteLength;
} else { // stashUsed + chunkSize > stashSize, size limit exceeded
let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
if (this._stashUsed > 0) { // There're stash datas in buffer
// 如果有缓存 先生产缓存中的数据
let buffer = this._stashBuffer.slice(0, this._stashUsed);
let consumed = this._dispatchChunks(buffer, this._stashByteStart);
if (consumed < buffer.byteLength) {if (consumed > 0) {let remainArray = new Uint8Array(buffer, consumed);
stashArray.set(remainArray, 0);
this._stashUsed = remainArray.byteLength;
this._stashByteStart += consumed;
}
} else {
this._stashUsed = 0;
this._stashByteStart += consumed;
}
// 生产完缓存中的数据之后,而后再把这次过去的 chunk 放入缓存中
if (this._stashUsed + chunk.byteLength > this._bufferSize) {this._expandBuffer(this._stashUsed + chunk.byteLength);
stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
}
stashArray.set(new Uint8Array(chunk), this._stashUsed);
this._stashUsed += chunk.byteLength;
} else {// stash buffer empty, but chunkSize > stashSize (oh, holy shit)
// dispatch chunk directly and stash remain data
// 如果缓存中没有数据 间接生产本次来的数据
let consumed = this._dispatchChunks(chunk, byteStart);
if (consumed < chunk.byteLength) {
let remain = chunk.byteLength - consumed;
if (remain > this._bufferSize) {this._expandBuffer(remain);
stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
}
stashArray.set(new Uint8Array(chunk, consumed), 0);
this._stashUsed += remain;
this._stashByteStart = byteStart + consumed;
}
}
}
对于 this._stashSize
还有两个问题,
一是 this._stashSize
的大小会依据网速进行调整,二是 this._stashSize
是小于等于 this._bufferSize
缓存大小,所以 this._stashSize
变动时也须要扩大缓存。
// 先看获取网速的代码。
// 网速计算
this._speedSampler.addBytes(chunk.byteLength);
// adjust stash buffer size according to network speed dynamically
// 获取以后网速
let KBps = this._speedSampler.lastSecondKBps;
if (KBps !== 0) {
// 正规化网速
let normalized = this._normalizeSpeed(KBps);
if (this._speedNormalized !== normalized) {
this._speedNormalized = normalized;
this._adjustStashSize(normalized);
}
}
其中的 _normalizeSpeed
办法是在给定的速度中二分查找最靠近网速的大小。
this._speedNormalizeList = [64, 128, 256, 384, 512, 768, 1024, 1536, 2048, 3072, 4096];
_normalizeSpeed(input) {
let list = this._speedNormalizeList;
let last = list.length - 1;
let mid = 0;
let lbound = 0;
let ubound = last;
if (input < list[0]) {return list[0];
}
// binary search
while (lbound <= ubound) {mid = lbound + Math.floor((ubound - lbound) / 2);
if (mid === last || (input >= list[mid] && input < list[mid + 1])) {return list[mid];
} else if (list[mid] < input) {lbound = mid + 1;} else {ubound = mid - 1;}
}
}
_adjustStashSize
是调整 this._stashSize
的办法,当缓存的大小小于 this._stashSize
时,则进行扩大。
_adjustStashSize(normalized) {
let stashSizeKB = 0;
// 如果是直播
if (this._config.isLive) {
// live stream: always use single normalized speed for size of stashSizeKB
stashSizeKB = normalized;
} else {if (normalized < 512) {stashSizeKB = normalized;} else if (normalized >= 512 && normalized <= 1024) {stashSizeKB = Math.floor(normalized * 1.5);
} else {stashSizeKB = normalized * 2;}
}
// 最大是 8K
if (stashSizeKB > 8192) {stashSizeKB = 8192;}
let bufferSize = stashSizeKB * 1024 + 1024 * 1024 * 1; // stashSize + 1MB
// 如果缓存小则扩大缓存
if (this._bufferSize < bufferSize) {this._expandBuffer(bufferSize);
}
this._stashSize = stashSizeKB * 1024;
}
扩大缓存的 _expandBuffer
办法和咱们写的 demo 很类似。
_expandBuffer(expectedBytes) {
let bufferNewSize = this._stashSize;
// 每次 *2 直到大于 expectedBytes
while (bufferNewSize + 1024 * 1024 * 1 < expectedBytes) {bufferNewSize *= 2;}
bufferNewSize += 1024 * 1024 * 1; // bufferSize = stashSize + 1MB
if (bufferNewSize === this._bufferSize) {return;}
// 新的缓存区
let newBuffer = new ArrayBuffer(bufferNewSize);
// 旧缓存区有数据 则进行拷贝
if (this._stashUsed > 0) { // copy existing data into new buffer
let stashOldArray = new Uint8Array(this._stashBuffer, 0, this._stashUsed);
let stashNewArray = new Uint8Array(newBuffer, 0, bufferNewSize);
stashNewArray.set(stashOldArray, 0);
}
// 重设缓存区和缓存区大小
this._stashBuffer = newBuffer;
this._bufferSize = bufferNewSize;
}
总结:这篇文章的播种是网速计算的思路能够利用到相似场景中,比方限流。数据缓存中的二进制缓存区的操作方法。下篇文章中解说 FLV 格局的解析和波及到的位操作。
如果感觉有播种请关注微信公众号 前端良文 每周都会分享前端开发中的干货知识点。