简介:Serverless 架构的按量付费模式,能够在保障在线编程性能性能的前提下,进一步降低成本。本文将会以阿里云函数计算为例,通过 Serverless 架构实现一个 Python 语言的在线编程性能,并对该性能进一步的优化,使其更加贴近本地本地代码执行体验。
随着计算机科学与技术的倒退,越来越多的人开始接触编程,也有越来越多的在线编程平台诞生。以 Python 语言的在线编程平台为例,大抵能够分为两类:
- 一类是 OJ 类型的,即在线评测的编程平台,这类的平台特点是阻塞类型的执行,即用户须要一次性将代码和规范输出内容提交,当程序执行实现会一次性将后果返回;
- 另一类则是学习、工具类的在线编程平台,例如 Anycodes 在线编程等网站,这一类平台的特点是非阻塞类型的执行,即用户能够实时看到代码执行的后果,以及能够实时内容进行内容的输出。
然而,无论是那种类型的在线编程平台,其背地的外围模块(“代码执行器”或“判题机”)都是极具备钻研价值,一方面,这类网站通常状况下都须要比要严格的“平安机制”,例如程序会不会有恶意代码,呈现死循环、毁坏计算机系统等,程序是否须要隔离运行,运行时是否会获取到其他人提交的代码等;
另一方面,这类平台通常状况下都会对资源耗费比拟大,尤其是较量来长期,更是须要忽然间对相干机器进行扩容,必要时须要大规模集群来进行应答。同时这类网站通常状况下也都有一个比拟大的特点,那就是触发式,即每个代码执行前后实际上并没有十分严密的前后文关系等。
随着 Serverless 架构的一直倒退,很多人发现 Serverless 架构的申请级隔离和极致弹性等个性能够解决传统在线编程平台所遇到的平安问题和资源耗费问题,Serverless 架构的按量付费模式,能够在保障在线编程性能性能的前提下,进一步降低成本。所以,通过 Serverless 架构实现在线编程性能的开发就逐步的被更多人所关注和钻研。本文将会以阿里云函数计算为例,通过 Serverless 架构实现一个 Python 语言的在线编程性能,并对该性能进一步的优化,使其更加贴近本地本地代码执行体验。
在线编程性能开发
一个比较简单的、典型的在线编程性能,在线执行模块通常状况下是须要以下几个能力:
- 在线执行代码
- 用户能够输出内容
- 能够返回后果(规范输入、规范谬误等)
除了在线编程所须要实现的性能之外,在线编程在 Serverless 架构下,所须要实现的业务逻辑,也仅仅被收敛到关注代码执行模块即可:获取客户端发送的程序信息(包含代码、规范输出等),将代码缓存到本地,执行代码,获取后果,但会给客户端,整个架构的流程简图为:
对于执行代码局部,能够通过 Python 语言的 subprocess 依赖中的 Popen() 办法实现,在应用 Popen() 办法时,有几个比拟重要的概念,须要明确:
- subprocess.PIPE:一个能够被用于 Popen 的 stdin、stdout 和 stderr 3 个参数的非凡值,示意须要创立一个新的管道;
- subprocess.STDOUT:一个能够被用于 Popen 的 stderr 参数的输入值,示意子程序的规范谬误会合到规范输入;
所以,当咱们想要实现能够:
进行规范输出(stdin),获取规范输入(stdout)以及规范谬误(stderr)的性能
能够简化代码实现为:
除代码执行局部之外,在 Serverless 架构下,获取到用户代码并将其存储过程中,须要额定留神函数实例中目录的读写权限。通常状况下,在函数计算中,如果不进行硬盘挂载,只有 /tmp/ 目录是有可写入权限的。所以在该我的项目中,咱们将用户传递到服务端的代码进行长期存储时,须要将其写入长期目录 /tmp/,在长期存储代码的时候,还须要额定思考实例复用的状况,所以此时,能够为长期代码提供长期的文件名,例如:
# -*- coding: utf-8 -*-
import randomrandom
Str = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))
path = "/tmp/%s"% randomStr(5)
残缺的代码实现为:
# -*- coding: utf-8 -*-
import json
import uuid
import random
import subprocess
# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))
# Response
class Response:
def __init__(self, start_response, response, errorCode=None):
self.start = start_response
responseBody = {'Error': {"Code": errorCode, "Message": response},
} if errorCode else {'Response': response}
# 默认减少 uuid,便于前期定位
responseBody['ResponseId'] = str(uuid.uuid1())
self.response = json.dumps(responseBody)
def __iter__(self):
status = '200'
response_headers = [('Content-type', 'application/json; charset=UTF-8')]
self.start(status, response_headers)
yield self.response.encode("utf-8")
def WriteCode(code, fileName):
try:
with open(fileName, "w") as f:
f.write(code)
return True
except Exception as e:
print(e)
return False
def RunCode(fileName, input_data=""):
child = subprocess.Popen("python %s" % (fileName),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
output = child.communicate(input=input_data.encode("utf-8"))
return output[0].decode("utf-8")
def handler(environ, start_response):
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))
code = requestBody.get("code", None)
inputData = requestBody.get("input", "")
fileName = "/tmp/" + randomStr(5)
responseData = RunCode(fileName, inputData) if code and WriteCode(code, fileName) else "Error"
return Response(start_response, {"result": responseData})
实现外围的业务逻辑编写之后,咱们能够将代码部署到阿里云函数计算中。部署实现之后,咱们能够取得到接口的长期测试地址。通过 PostMan 对该接口进行测试,以 Python 语言的输入语句为例:
print('HELLO WORLD')
能够看到,当咱们通过 POST 办法,携带代码等作为参数,发动申请后,取得到的响应为:
咱们通过响应后果,能够看到,零碎是能够失常输入咱们的预期后果:“HELLO WORLD”至此咱们实现了规范输入性能的测试,接下来咱们对规范谬误等性能进行测试,此时咱们将刚刚的输入代码进行毁坏:
print('HELLO WORLD)
应用同样的办法,再次进行代码执行,能够看到后果:
后果中,咱们能够看到 Python 的报错信息,是合乎咱们的预期的,至此实现了在线编程性能的规范谬误性能的测试,接下来,咱们进行规范输出性能的测试,因为咱们应用的 subprocess.Popen() 办法,是一种阻塞办法,所以此时咱们须要将代码和规范输出内容一起放到服务端。测试的代码为:
tempInput = input('please input:')
print('Output:', tempInput)
测试的规范输出内容为:“serverless devs”。
当咱们应用同样的办法,发动申请之后,咱们能够看到:
零碎是失常输入预期的后果。至此咱们实现了一个非常简单的在线编程服务的接口。该接口目前只是高级版本,仅用于学习应用,其具备极大的优化空间:
- 超时工夫的解决
- 代码执行实现,能够进行清理
当然,通过这个接口也能够看到这样一个问题:那就是代码执行过程中是阻塞的,咱们没方法进行持续性的输出,也没有方法实时输入,即便须要输出内容也是须要将代码和输出内容一并发送到服务端。这种模式和目前市面上常见的 OJ 模式很相似,然而就单纯的在线编程而言,还须要进一步对我的项目优化,使其能够通过非阻塞办法,实现代码的执行,并且能够持续性的进行输出操作,持续性的进行内容输入。
更贴近“本地”的代码执行器
咱们以一段代码为例:
import time
print("hello world")
time.sleep(10)
tempInput = input("please:")
print("Input data:", tempInput)
当咱们在本地的执行这段 Python 代码时,整体的用户侧的理论体现是:
- 零碎输入 hello world
- 零碎期待 10 秒
- 零碎揭示咱们 please,咱们此时能够输出一个字符串
- 零碎输入 Input data 以及咱们刚刚输出的字符串
然而,这段代码如果利用于传统 OJ 或者刚刚咱们所实现的在线编程零碎中,体现则大不相同:
- 代码与咱们要输出内容一起传给零碎
- 零碎期待 10 秒
- 输入 hello world、please,以及最初输 Input data 和咱们输出的内容
能够看到,OJ 模式上的在线编程性能和本地是有十分大的差距的,至多在体验层面,这个差距是比拟大的。为了缩小这种体验不对立的问题,咱们能够将上上述的架构进一步降级,通过函数的异步触发,以及 Python 语言的 pexpect.spawn() 办法实现一款更贴近本地体验的在线编程性能:
在整个我的项目中,包含了两个函数,两个存储桶:
- 业务逻辑函数:该函数的次要操作是业务逻辑,包含创立代码执行的工作(通过对象存储触发器进行异步函数执行),以及获取函数输入后果以及对工作函数的规范输出进行相干操作等;
- 执行器函数:该函数的次要作用是执行用户的函数代码,这部分是通过对象存储触发,通过下载代码、执行代码、获取输出、输入后果等;代码获取从代码存储桶,输入后果和获取输出从业务存储桶;
- 代码存储桶:该存储桶的作用是存储代码,当用户发动运行代码的申请,业务逻辑函数收到用户代码后,会将代码存储到该存储桶,再由该存储桶处罚异步工作;
- 业务存储桶:该存储桶的作用是两头量的输入,次要包含输入内容的缓存、输出内容的缓存;该局部数据能够通过对象存储的自身个性进行生命周期的制订;
为了让代码在线执行起来,更加贴近本地体验,该计划的代码分为两个函数,别离进行业务逻辑解决和在线编程外围性能。
其中业务逻辑处理函数,次要是:
- 获取用户的代码信息,生成代码执行 ID,并将代码存到对象存储,异步触发在线编程函数的执行,返回生成代码执行 ID;
- 获取用户的输出信息和代码执行 ID,并将内容存储到对应的对象存储中;
- 获取代码的输入后果,依据用户指定的代码执行 ID,将执行后果从对象存储中读取进去,并返回给用户;
整体的业务逻辑为:
实现的代码为:
# -*- coding: utf-8 -*-
import os
import oss2
import json
import uuid
import random
# 根本配置信息
AccessKey = {"id": os.environ.get('AccessKeyId'),
"secret": os.environ.get('AccessKeySecret')
}
OSSCodeConf = {'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketCodeName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
OSSTargetConf = {'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketTargetName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
# 获取获取 / 上传文件到 OSS 的长期地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])
# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))
# Response
class Response:
def __init__(self, start_response, response, errorCode=None):
self.start = start_response
responseBody = {'Error': {"Code": errorCode, "Message": response},
} if errorCode else {'Response': response}
# 默认减少 uuid,便于前期定位
responseBody['ResponseId'] = str(uuid.uuid1())
self.response = json.dumps(responseBody)
def __iter__(self):
status = '200'
response_headers = [('Content-type', 'application/json; charset=UTF-8')]
self.start(status, response_headers)
yield self.response.encode("utf-8")
def handler(environ, start_response):
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))
reqType = requestBody.get("type", None)
if reqType == "run":
# 运行代码
code = requestBody.get("code", None)
runId = randomStr(10)
codeBucket.put_object(runId, code.encode("utf-8"))
responseData = runId
elif reqType == "input":
# 输出内容
inputData = requestBody.get("input", None)
runId = requestBody.get("id", None)
targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))
responseData = 'ok'
elif reqType == "output":
# 获取后果
runId = requestBody.get("id", None)
targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)
with open('/tmp/' + runId) as f:
responseData = f.read()
else:
responseData = "Error"
return Response(start_response, {"result": responseData})
执行器函数,次要是通过代码存储桶触发,从而进行代码执行的模块,这一部分次要包含:
- 从存储桶获取代码,并通过 pexpect.spawn() 进行代码执行;
- 通过 pexpect.spawn().read_nonblocking() 非阻塞的获取间断性的执行后果,并写入到对象存储;
- 通过 pexpect.spawn().sendline() 进行内容输出;
整体流程为:
代码实现为:
# -*- coding: utf-8 -*-
import os
import re
import oss2
import json
import time
import pexpect
# 根本配置信息
AccessKey = {"id": os.environ.get('AccessKeyId'),
"secret": os.environ.get('AccessKeySecret')
}
OSSCodeConf = {'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketCodeName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
OSSTargetConf = {'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketTargetName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
# 获取获取 / 上传文件到 OSS 的长期地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])
def handler(event, context):
event = json.loads(event.decode("utf-8"))
for eveEvent in event["events"]:
# 获取 object
code = eveEvent["oss"]["object"]["key"]
localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]
# 下载代码
codeBucket.get_object_to_file(code, localFileName)
# 执行代码
foo = pexpect.spawn('python %s' % localFileName)
outputData = ""
startTime = time.time()
# timeout 能够通过文件名来进行辨认
try:
timeout = int(re.findall("timeout(.*?)s", code)[0])
except:
timeout = 60
while (time.time() - startTime) / 1000 <= timeout:
try:
tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)
tempOutput = tempOutput.decode("utf-8", "ignore")
if len(str(tempOutput)) > 0:
outputData = outputData + tempOutput
# 输入数据存入 oss
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
except Exception as e:
print("Error:", e)
# 有输出申请被阻塞
if str(e) == "Timeout exceeded.":
try:
# 从 oss 读取数据
targetBucket.get_object_to_file(code + "-input", localFileName + "-input")
targetBucket.delete_object(code + "-input")
with open(localFileName + "-input") as f:
inputData = f.read()
if inputData:
foo.sendline(inputData)
except:
pass
# 程序执行实现输入
elif "End Of File (EOF)" in str(e):
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
return True
# 程序抛出异样
else:
outputData = outputData + "\n\nException: %s" % str(e)
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
return False
当咱们实现外围的业务逻辑编写之后,咱们能够将我的项目部署到线上。
我的项目部署实现之后,和上文的测试方法一样,在这里也通过 PostMan 对接口进行测试。此时,咱们须要设定一个笼罩能较全的测试代码,包含输入打印、输出、一些 sleep() 等办法:
当咱们通过 PostMan 发动申请执行这段代码之后,咱们能够看到零碎为咱们返回了预期的代码执行 ID:
咱们能够看到零碎会返回给咱们一个代码执行 ID,该执行 ID 将会作为咱们整个申请工作的 ID,此时,咱们能够通过获取输入后果的接口,来获取后果:
因为代码中有:
time.sleep(10)
所以,迅速取得后果的时候是看不到后半局部的输入后果,咱们能够设置一个轮训工作,一直通过该 ID 对接口进行刷新:
能够看到,10 秒钟后,代码执行到了输出局部:
tempInput = input('please:')
此时,咱们再通过输出接口,进行输出操作:
实现之后,咱们能够看到输出胜利(result: ok)的后果,此时咱们持续刷新之前获取后果局部的申请:
能够看到,咱们曾经取得到了所有后果的输入。
绝对于上文的在线编程性能,这种“更贴近本地的代码执行器“变得复杂了很多,然而在理论应用的过程中,却能够更好的模拟出本地执行代码时的一些景象,例如代码的休眠、阻塞、内容的输入等。
总结
无论是简略的在线代码执行器局部,还是更贴近“本地”的代码执行器局部,这篇文章在所利用的内容是绝对宽泛的。通过这篇文章你能够看到:
- HTTP 触发器的根本应用办法;对象存储触发器的根本应用方;
- 函数计算组件、对象存储组件的根本应用办法,组件间依赖的实现办法;
同时,通过这篇文章,也能够从一个侧面看到这样一个常见问题的简略解答:我有一个我的项目,我是每个接口一个函数,还是多个接口复用一个函数?
针对这个问题,其实最次要的是看业务自身的诉求,如果多个接口表白的含意是统一的,或者是同类的,相似的,并且多个接口的资源耗费是相似的,那么放在一个函数中来通过不同的门路进行辨别是齐全能够的;如果呈现资源耗费差距较大,或者函数类型、规模、类别区别过大的时候,将多个接口放在多个函数下也是没有问题的。
本文实际上是抛砖引玉,无论是 OJ 零碎的“判题机”局部,还是在线编程工具的“执行器局部”,都能够很好的和 Serverless 架构有着比拟乏味的结合点。这种结合点不仅仅能够解决传统在线编程所头疼的事件(平安问题,资源耗费问题,并发问题,流量不稳固问题),更能够将 Serverless 的价值在一个新的畛域施展进去。
原文链接
本文为阿里云原创内容,未经容许不得转载。