近年来市场上IP Camera产品价格继续走低,硬件利润薄弱,很多厂商通过增值服务发力,增值服务比拟成熟的业务状态是云端存储,其实现形式是设施检测报警事件触发一段视频上传云端并提供终端用户回看从而产生免费。Amazon Kinesis Video Streams实现云存业务劣势显著,提供Device SDK摄取视频并上传Endpoint,终端用户能够基于HLS/DASH支流的协定播放观看。
本案例基于Serverless形式实现云存计划,Devices推流过程中记录metadata,通过Amazon API Gateway,Amazon Lambda,Amazon DynamoDB留存video metadata,前端利用通过video metadata生成播放URL,通过HLS Player播放。
想要理解更多亚马逊云科技最新技术公布和实际翻新,敬请关注在上海、北京、深圳三地举办的2021亚马逊云科技中国峰会!点击图片报名吧~
筹备工作
- 下载amazon-kinesis-video-streams-producer-sdk-cpp 源码并编译libKinesisVideoProducerJNI
- 下载amazon-kinesis-video-streams-producer-sdk-java源码
- 装置IntelliJ IDEA
架构图
创立Amazon Kinesis Video Streams
登录console,区域抉择新加坡, 抉择Kinesis Video Streams服务,创立视频流名称“kvs-stream”, Data retention抉择7(7天云存)
创立拜访密钥
新建一个用户”CloudStorageUser”, 勾选“Programmatic access”
减少2个权限如下图,这里仅用于测试和演示,生产环境倡议最小化权限。
拷贝并保留Access Key ID和Secret Access Key,抉择Amazon Secrets Manager服务增加密钥,新增Secret key/value, 将AK/SK填入
指定密钥名称“ipc-cloudstorage-access-kvs-secretkey”
拷贝Python3的sample code, 后续Lambda Function中应用。
创立Lambda
创立function抉择名称“save_devices_video_metadata”,Runtime抉择Python 3.6
HttpMethod 为PUT,接管metadata并存表
if event['httpMethod'] == 'PUT' : id = _body['clientID'] deviceID = _body['deviceID'] streamName = _body['streamName'] begTime = _body['begTime'] endTime = _body['endTime'] duration = _body['duration'] res = save_dynamodb_tb(id,deviceID,begTime,endTime,duration,streamName)
HTTPMethod为GET,获取HLS URL
if event['httpMethod'] == 'GET' : streamName = _body['streamName'] begTime = _body['begTime'] endTime = _body['endTime'] duration = _body['duration'] descStream =kvs.describe_stream(StreamName=streamName) Stream_ARN = descStream['StreamInfo']['StreamARN'] get_hls_response = kvs.get_data_endpoint(APIName="GET_HLS_STREAMING_SESSION_URL",StreamARN=Stream_ARN) hls_endpoint = get_hls_response['DataEndpoint'] kvs_client = boto3.client("kinesis-video-archived-media", endpoint_url=hls_endpoint, region_name=REGION_NAME, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) # 获取hls url res = get_hls_url(kvs_client,streamName,begTime,endTime,duration)
Lambda源码:
https://github.com/beiyue/sav...
Lambda 受权
IAM创立Policy抉择JSON, 编辑内容如下, 保留Policy名称“getSecretValueForIPC”
{ "Version": "2012-10-17", "Statement": { "Effect": "Allow", "Action": "secretsmanager:GetSecretValue", "Resource": "<SECRET_ARN>" }}
抉择Lambda Configuration编辑Role
附加策略“getSecretValueForIPC”
附件策略“AmazonKinesisVideoStreamsReadOnlyAccess”
创立DynamoDB表
创立DynamoDB表,新建表名“tb_device_metadata”
元数据蕴含设施ID,Kinesis Stream名称,视频片段开始工夫和完结工夫,视频时长,写入工夫
创立API
在Amazon API Gateway中抉择 ”Create API”,抉择 ”REST API” Build, 抉择API名称 “video_record_metadata” , Endpoint Type抉择“Regional”。
实现后,抉择“Actions”,抉择“Create Resource”资源名称为“ipc-video-metadata”
抉择“Actions”,抉择“Create Method”,抉择PUT,抉择Lambda Function, 勾选Use Lambda Proxy integration,选型Lambda Region “ap-southeast-1”,抉择Lambda Function“save_device_video_metadata”并保留
GET办法同上
创立API模型
左侧抉择Models,创立Model,模型名称“IPCMetaData”
Model shema复制如下
{ "$schema": "http://json-schema.org/draft-04/schema#", "title": "IPCMetaData", "type": "object", "properties": { "clientID": { "type": "string" }, "deviceID": { "type": "string" }, "streamName": { "type": "string" }, "begTime": { "type": "string" }, "endTime": { "type": "string" }, "duration": { "type": "string" } }}
PUT Method 减少申请模型
抉择Resources, 抉择PUT,抉择Method Request,抉择Request Body,减少Model
部署API
抉择Actions , 抉择Deploy API,Stage name抉择“test”, 抉择Deploy.
创立JAVA SDK
在test Stage Editor抉择SDK Generation,抉择Platform,抉择“Java SDK”. 输出参数如下:
点击Generate SDK, 下载genrate code.zip文件,解压关上,关上Terminal执行mvn install
Maven装置本地仓库
mvn install:install-file -Dfile=./target/cloudstorage-demo-1.0.jar -DgroupId=ipc-sdk–DartifactId=cloudstorage-demo -Dversion=1.0 -Dpackaging=jar
pom.xml中减少dependency
<dependency> <groupId>ipc-sdk</groupId> <artifactId>cloudstorage-demo</artifactId> <version>1.0</version></dependency>
创立推流端
下载amazon-kinesis-video-streams-producer-sdk-java,批改DemoAppMain.java
public static void main(final String[] args) { try { final KinesisVideoClient kinesisVideoClient = KinesisVideoJavaClientFactory .createKinesisVideoClient( Regions.AP_SOUTHEAST_1, //指定新加坡区域 AuthHelper.getSystemPropertiesCredentialsProvider()); //文件媒体源 final MediaSource mediaSource = createFileMediaSource(); kinesisVideoClient.registerMediaSource(mediaSource); mediaSource.start(); } catch (final KinesisVideoException e) { throw new RuntimeException(e); }}
NativeKinesisVideoProducerStream.java减少办法putDurationMetaData,提交元数据,这里用到后面的SDK,IPCMetaData用于封装元数据
//记录一段视频片段的Metadataprivate void putDurationMetaData(@Nonnull final KinesisVideoFrame frame){// 每一段视频以关键帧为起始 if(startTime == 0L && frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME){ startTime = frame.getDecodingTs();// 这里视频时长以20秒为单位进行记录 }else if(frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME && (frame.getDecodingTs() - startTime) > 20 * Time.HUNDREDS_OF_NANOS_IN_A_SECOND ){ long index = frame.getIndex(); long endTime = frame.getDecodingTs(); long durationTime = (endTime - startTime)/Time.HUNDREDS_OF_NANOS_IN_A_SECOND; IPCCloudStorageSdk client = IPCCloudStorageSdk.builder().connectionConfiguration( new ConnectionConfiguration() .maxConnections(100) .connectionMaxIdleMillis(1000)) .timeoutConfiguration( new TimeoutConfiguration() .httpRequestTimeout(5000) .totalExecutionTimeout(10000) .socketTimeout(3000)) .build(); //API Gateway JAVA SDK 生成的Model类 IPCMetaData metaData = new IPCMetaData(); //视频片段的惟一标示metaData.setClientID(String.valueOf(Time.getCurrentTime())+String.format("%06d", index)); //设施惟一标示 metaData.setDeviceID(mDeviceInfo.getName()); //开始工夫,毫秒为单位metaData.setBegTime(String.valueOf(startTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND)); //流名称,对应KVS Stream Name metaData.setStreamName(mStreamInfo.getName()); //完结工夫, 毫秒为单位metaData.setEndTime(String.valueOf(endTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND)); //时长,秒为单位 metaData.setDuration(String.valueOf(durationTime)); // API Gateway JAVA SDK 生成的办法申请类 MetaDataInputRequest req = new MetaDataInputRequest().iPCMetaData(metaData); MetaDataInputResult result = client.metaDataInput(req); mLog.info("Duration Metadata : %s, %s, %s ,%s ,%s ,%s ",metaData.getClientID(),metaData.getDeviceID(),metaData.getStreamName(),metaData.getBegTime(),metaData.getEndTime(),metaData.getDuration()); //重置开始工夫,上一个视频完结工夫为下一个视频的开始工夫。 startTime = frame.getDecodingTs(); }}
推流源码:
https://github.com/beiyue/ama...
推流测试
留神这里AK/SK形式仅用于测试和演示,理论生产环境倡议应用更平安形式,比方通过IoT证书获取长期身份,参见https://docs.aws.amazon.com/k...
Amazon DynamoDB中能够看到生成了视频段的metadata
获取播放URL
从Amazon DynamoDB中随机选取一个item的客户端ID,流名称,开始工夫,完结工夫, 时长,terminal执行:
curl -v -X GET 'https://xxxxxx.execute-api.ap-southeast-1.amazonaws.com/test/ipc-video-metadata' -d ' { "clientID": "162369215xxxxxxxx52","streamName": "kvs-stream", "begTime":"1623728297399","endTime":"1623728321043","duration":"22"}'* Trying 52.221.1XX.1XX...* TCP_NODELAY set* Connected to xxxxxx.execute-api.ap-southeast-1.amazonaws.com (52.221.1XX.1XX) port 443 (#0)* ALPN, offering h2* ALPN, offering http/1.1* Cipher selection: ALL:!EXPORT:!EXPORT40:!EXPORT56:!aNULL:!LOW:!RC4:@STRENGTH* successfully set certificate verify locations:* CAfile: /etc/ssl/cert.pem CApath: none* TLSv1.2 (OUT), TLS handshake, Client hello (1):* TLSv1.2 (IN), TLS handshake, Server hello (2):* TLSv1.2 (IN), TLS handshake, Certificate (11):* TLSv1.2 (IN), TLS handshake, Server key exchange (12):* TLSv1.2 (IN), TLS handshake, Server finished (14):* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):* TLSv1.2 (OUT), TLS change cipher, Client hello (1):* TLSv1.2 (OUT), TLS handshake, Finished (20):* TLSv1.2 (IN), TLS change cipher, Client hello (1):* TLSv1.2 (IN), TLS handshake, Finished (20):* SSL connection using TLSv1.2 / ECDHE-RSA-AES128-GCM-SHA256* ALPN, server accepted to use h2* Server certificate:* subject: CN=*.execute-api.ap-southeast-1.amazonaws.com* start date: Aug 29 00:00:00 2020 GMT* expire date: Sep 29 12:00:00 2021 GMT* subjectAltName: host " xxxxxx.execute-api.ap-southeast-1.amazonaws.com" matched cert's "*.execute-api.ap-southeast-1.amazonaws.com"* issuer: C=US; O=Amazon; OU=Server CA 1B; CN=Amazon* SSL certificate verify ok.* Using HTTP2, server supports multi-use* Connection state changed (HTTP/2 confirmed)* Copying HTTP/2 data in stream buffer to connection buffer after upgrade: len=0* Using Stream ID: 1 (easy handle 0x7f80e8005600)> GET /test/ipc-video-metadata HTTP/2> Host: xxxxxx.execute-api.ap-southeast-1.amazonaws.com> User-Agent: curl/7.54.0> Accept: */*> content-type: application/json> day: Thursday> Content-Length: 138>* Connection state changed (MAX_CONCURRENT_STREAMS updated)!* We are completely uploaded and fine< HTTP/2 200< date: Tue, 15 Jun 2021 08:00:11 GMT< content-type: application/json< content-length: 261< x-amzn-requestid: 9323XXX-4ff1-4dcf-ae57-365551b74636< x-amz-apigw-id: A9Oh0FJWXXXX_Q=< x-amzn-trace-id: Root=1-60c85e0b-01f095137c54211e74c3e1f3;Sampled=0<* Connection #0 to host xxxxxx.execute-api.ap-southeast-1.amazonaws.com left intact"https://b-xxxxxx.kinesisvideo.ap-southeast-1.amazonaws.com/hls/v1/getHLSMasterPlaylist.m3u8?SessionToken=XXXXdf9Ro85h4AeC_n2yjc96_YLu1vigKX5qterUpPzxIQYibs4go4_KCqXEiWnXXXXWng92QY5HiGbEIkSa38f9d3XXXXX:~ "
关上https://www.hlsplayer.org/填入URL地址
播放展现
小结
本计划展现了IP Camera 运行 Amazon Kinesis Video Streams实现推流性能;运行Amazon API Gateway + Amazon Lambda + Amazon DynamoDB 留存视频的metadata,依据metadata获取指定时段的视频片段的session url, 播放器可失常播放。很多客户心愿构建云存场景,联合这几个服务一起构建残缺的云存解决方案。
本篇作者
周晓明
亚马逊云科技解决方案架构师
负责基于亚马逊云科技的云计算计划架构的征询和设计,同时致力于物联网方向钻研和推广,在安防监控畛域有丰盛实践经验。