灵感起源

之前在B站看到一个有意思的视频:

【B站】【亦】终极云游戏!五千人同开一辆车,复现经典群体智慧试验

大家能够看看,很有意思。

up主通过代码实现了实时读取直播间里的弹幕内容,进而管制本人的电脑,把弹幕翻译成指令操控《赛博朋克2077》游戏。

观众也越来越多,最初甚至还把间接间搞崩了(当然,其实是因为那天B站全站崩了)。

我非常好奇到底是怎么做到的。

在行看热闹,外行看门道,作为半个外行,咱们就模拟UP主的想法,本人做一个。

所以明天我的指标就是复刻一个 通过弹幕管制直播间 的代码,并且最终在本人的直播间开播。

先给大家看看最终我的成品小视频:

【B站】模拟UP主,做一个弹幕管制的直播间!

看起来是不是很像样了。

初版设计思路

首先在脑海里布局一个大抵的思路,如下图:

这个思路看起来很简略,不过还是得解释一下,首先咱们要搞清楚,弹幕的内容是怎么抓到的。

大部分咱们常见的直播平台,在浏览器端,弹幕都是通过WebSocket来推送给观众的。在手机平板等客户端(非Web端),可能会有一些更加简单的TCP进行弹幕的推送。

对于TCP的音讯投递,有个很好的文章,就是美团的这个:美团终端音讯投递服务Pike的演进之路

归根结底,这些弹幕都是通过在客户端和服务端建设长链接来实现的。

所以,咱们须要做的就是用代码作为客户端,与直播平台进行长链接。这样就能拿到弹幕。

咱们只是须要实现整个弹幕管制的流程,所以弹幕的抓取也不是本文的重点,咱们来淘一个现成的轮子!在Github上一顿找,找到了一个十分不错的开源库,外面可能获取很多直播平台的弹幕:

https://github.com/wbt5/real-url

获取斗鱼&虎牙&哔哩哔哩&抖音&快手等 58 个直播平台的实在流媒体地址(直播源)和弹幕,直播源可在 PotPlayer、flv.js 等播放器中播放。

咱们把代码clone下来,运行main函数,轻易输出一个Bilibili直播间地址,就能拿到直播间实时的弹幕流:

代码里把获取到的一条条弹幕(包含用户名)间接打印在了控制台。

他是如何做到的呢?外围的Python代码如下(不相熟Python?不要紧,就当做伪代码,很容易看懂):

wss_url = 'wss://broadcastlv.chat.bilibili.com/sub'heartbeat = b'\x00\x00\x00\x1f\x00\x10\x00\x01\x00\x00\x00\x02\x00\x00\x00\x01\x5b\x6f\x62\x6a\x65\x63\x74\x20' \                b'\x4f\x62\x6a\x65\x63\x74\x5d '  heartbeatInterval = 60@staticmethodasync def get_ws_info(url):    url = 'https://api.live.bilibili.com/room/v1/Room/room_init?id=' + url.split('/')[-1]    reg_datas = []    async with aiohttp.ClientSession() as session:        async with session.get(url) as resp:            room_json = json.loads(await resp.text())            room_id = room_json['data']['room_id']            data = json.dumps({                'roomid': room_id,                'uid': int(1e14 + 2e14 * random.random()),                'protover': 1            }, separators=(',', ':')).encode('ascii')            data = (pack('>i', len(data) + 16) + b'\x00\x10\x00\x01' +                    pack('>i', 7) + pack('>i', 1) + data)            reg_datas.append(data)    return Bilibili.wss_url, reg_datas

它连上了Bilibili的直播弹幕WSS地址,也就是WebSocket地址,而后伪装成客户端,承受弹幕推送。

OK,做完了第一步,下一步就是用音讯队列将弹幕发送进去。开启独自的消费者接管弹幕。

为了实现上尽量简略,就不上那些业余的音讯队列了,这里用了redis的list作为队列,将弹幕内容放进去。

发送者外围代码如下:

# 链接Redisdef init_redis():    r = redis.Redis(host='localhost', port=6379, decode_responses=True)    return r# 音讯发送者async def printer(q, redis):    while True:        m = await q.get()        if m['msg_type'] == 'danmaku':            print(f'{m["name"]}:{m["content"]}')            list_str = list(m["content"])            print("弹幕拆分:", list_str)            for char in list_str:                if char.lower() in key_list:                    print('推送队列:', char.lower())                    redis.rpush(list_name, char.lower())

实现了弹幕内容的发送后,须要写一个消费者,生产这些弹幕,把外面的指令都提取进去。

并且,在消费者收到弹幕后,如何生产呢?咱们须要一个可能用代码指令管制电脑的方法。

咱持续本着不造轮子的准则,找到了一个Python的自动化管制库PyAutoGUI

PyAutoGUI is a cross-platform GUI automation Python module for human beings. Used to programmatically control the mouse & keyboard.

装置上这个库,在代码中引入,便能够通过他的API管制电脑鼠标和键盘执行对应的操作。几乎是完满啊!

消费者(管制电脑)外围Python代码如下:

# 链接Redisdef init_redis():    r = redis.Redis(host='localhost', port=6379, decode_responses=True)    return r# 消费者def control(key_name):    print("key_name =", key_name)    if key_name == None:        print("本次无指令收回")        return    key_name = key_name.lower()    # 管制电脑指令    if key_name in key_list:        print("收回指令", key_name)        pyautogui.keyDown(key_name)        time.sleep(press_sec)        pyautogui.keyUp(key_name)        print("完结指令", key_name)if __name__ == '__main__':    r = init_redis()    print("开始监听弹幕音讯, loop_sec =", loop_sec)    while True:        key_name = r.lpop(list_name)        control(key_name)        time.sleep(loop_sec)

ok,功败垂成,咱们关上弹幕发送队列和消费者,这个一直循环生产的队列就开始运行了。一旦弹幕中有wsad这种管制游戏罕用的按键,电脑就会本人给本人收回指令。

初版运行中的问题

我灰溜溜的关上本人的B站直播间,开始调试,后果发现我还是太天真了。这个初版代码裸露了十分多的问题。咱们一个个来说下是什么问题,我是如何解决的。

指令不人性化

水友们其实很喜爱发送相似www dddd这类反复单词(叠词),但初版的实现只反对单个字幕,水友们发现不得劲,没有作用后,就从直播间走了。

这点很容易解决,把弹幕内容拆分成每个单词,而后再推送给队列。

解决办法:拆解弹幕,把DDD,拆成D,D,D,发送个消费者。

危险指令

首先是玩家的指令超出了应该有的范畴。

在我把赛博朋克游戏关上,让弹幕观众管制游戏里的开车时,有个神秘观众进了直播间,默默发了个“F”,而后。。。

而后游戏里的V(配角名)就从车里下来了,淦,我是让你们开车的,不是让你们下来和警察斗殴的。。。

解决办法:增加弹幕过滤器。

# 将弹幕进行拆分,只发送指定的指令给消费者key_list = ('w', 's', 'a', 'd', 'j', 'k', 'u', 'i', 'z', 'x', 'f', 'enter', 'shift', 'backspace')list_str = list(m["content"])            print("弹幕拆分:", list_str)            for char in list_str:                if char.lower() in key_list:                    print('推送队列:', char.lower())                    redis.rpush(list_name, char.lower())

下面两个问题解决后,发送者就像上面这样运行了:

弹幕指令沉积

这是个很大的问题,如果解决所有水友发送的全副弹幕指令,肯定会存在生产不过去的问题。

解决办法:须要固定工夫解决弹幕,其余摈弃。

if __name__ == '__main__':    r = init_redis()    print("开始监听弹幕音讯, loop_sec =", loop_sec)    while True:        key_name = r.lpop(list_name)        # 每次只取出一个指令,而后把list清空,也就是这个工夫窗口内其余弹幕都扔掉!        r.delete(list_name)        control(key_name)        time.sleep(loop_sec)

弹幕从收回到观众看到后果有提早

在最开始的视频里,你们也能感触到了,从观众的指令收回,到最终被观众看到,大略要经验5秒的提早。其中,起码有3秒,都是网络直播流的提早,这一点,很难去优化。

回炉重造后的版本

通过一系列调优和波及,咱们的版本也算是从V0.1到了V0.2了。猛虎落泪。

上面是重构后的结构图:

后记

在写完这个我的项目后,我在直播间试了很屡次,体验曾经有限靠近UP主过后的视频了。我开播挂在那边良久,然而,人气最高的时候,也只有20几个人,寥寥十几条弹幕,还有很多是我发的。我还冀望着观众可能拉更多人进来一起玩呢,大失所望啊。

由此可得出结论,我,先得有粉丝,能力玩得起来啊,呜呜呜呜。大家要是不介意,能够关注下我的B站账号,也叫:蛮三刀酱。我会偶然抽风发点乏味的技术视频的。

本文实现的全副代码曾经开源在了Github上,大家能够在本人的直播间里试试呀:

https://github.com/qqxx6661/l...

我是在阿里搬砖的工程师 @蛮三刀酱

继续的更新优质文章,离不开你的点赞,转发和分享!

全网惟一技术公众号:后端技术漫谈