关于tornado:tornado-421-移植指南openEuler-2003-LTS-SP1

介绍简要介绍Tornado是一个Python Web框架和异步网络库,最后由FriendFeed开发。 通过应用非阻塞网络I / O,Tornado能够扩大到成千上万的凋谢连贯,非常适合长时间轮询,WebSocket和须要与每个用户建设长期连贯的其余应用程序。本案例应用x86_64架构虚拟机,通过评估工具x2openEuler评估tornado 4.2.1软件移植到openEuler操作系统的兼容性,并依据评估后果实现软件移植。 语言:C++/Python 一句话形容:一个Python Web框架和异步网络库 开源协定:Apache 倡议的版本倡议应用版本为tornado 4.2.1。 阐明:本文档实用于tornado 4.2.1,其余版本的tornado移植步骤也可参考本文档。环境要求操作系统要求操作系统版本openEuler20.03 LTS SP1CentOS7.6装置操作系统如果是全新装置操作系统,装置形式倡议不要应用最小化装置,否则很多软件包须要手动装置,可抉择“Server with GUI”装置形式。装置openEuler操作系统请参考:https://openeuler.org/zh/docs/20.03_LTS_SP1/docs/Installation/installation.html。 兼容性评估获取tornado的RPM包wget http://mirror.centos.org/centos/7/os/x86_64/Packages/python-tornado-4.2.1-5.el7.x86_64.rpm下载x2openEuler工具下载指引:https://www.openeuler.org/zh/other/migration/部署工具rpm -ivh x2openEuler-2.0.0-1.x86_64.rpm留神:装置rpm时须要应用root用户,且目前须要网络(用于下载安装依赖)留神:依据提醒装置依赖包如bzip2-devel等su x2openEulerx2openEuler redis-db -init顺次录入redis数据库的ip:127.0.0.1端口:6379数据库索引号(0-16):0明码(工具会对明码加密解决):如果redis明码没有设置或者为空时,间接回车即可x2openEuler init source_centos7.6-openEuler20.03-LTS-SP1.tar.gz备注:x2openEuler应用rpm装置实现后会在/opt/x2openEuler目录下带有source_centos7.6-openEuler20.03-LTS-SP1.tar.gz这个默认资源包须要反对centos8.2到openEuler20.03-LTS-SP1的评估,则需获取对应的动态资源包导入,如对应的资源包为source_centos8.2-openEuler20.03-LTS-SP1.tar.gz,导入此包命令:x2openEuler init source_centos8.2-openEuler20.03-LTS-SP1.tar.gz,请示状况抉择对应的资源包扫描软件x2openEuler scan python-tornado-4.2.1-5.el7.x86_64.rpm留神要剖析的移植文件须要有可能让x2openEuler用户能够读取的权限扫描实现后会在/opt/x2openEuler/output目录生成html格局的报告查看评估后果软件兼容性评估报告分三块内容展现软件兼容性,别离是依赖包兼容性、C/C++接口兼容性、java接口兼容性,依赖包兼容性反映了软件包装置过程中的间接依赖,非100%表明无奈正确装置;接口兼容性反映的是单个软件运行过程中对其余软件包、动静库或零碎接口的调用变动,非100%表明在某个性能调用时可能会触发异样,未调用到时可能体现失常;局部后果倡议人工复核,最终软件包应用建优先级倡议 openEuler已移植包>openEuler上人工重编译包>centos软件包。 后果:通过报告可知内部接口兼容性100%,依赖包兼容性人工复核后通过,经评估tornado4.2.1软件包在openEuler 20.03 LTS SP1零碎上兼容,可装置此软件包至openEuler 20.03 LTS SP1零碎进行验证。 装置tornadorpm装置因为兼容性报告显示兼容,尝试间接用下载的rpm包装置。 [[email protected] ~]# yum install python-tornado-4.2.1-5.el7.x86_64.rpm -yLast metadata expiration check: 0:11:53 ago on Mon 22 Mar 2021 01:25:06 PM CST.Dependencies resolved.================================================================================ Package Arch Version Repository Size================================================================================Installing: python-tornado x86_64 4.2.1-5.el7 @commandline 641 kInstalling dependencies: python2-backports x86_64 1.0-17.oe1 everything 9.2 k python2-backports-ssl_match_hostname noarch 3.7.0.1-2.oe1 everything 16 k python2-ipaddress noarch 1.0.23-1.oe1 everything 41 k python3-pycurl x86_64 7.43.0.3-1.oe1 OS 65 kTransaction Summary================================================================================Install 5 PackagesTotal size: 772 kTotal download size: 131 kInstalled size: 4.1 MDownloading Packages:(1/4): python2-backports-1.0-17.oe1.x86_64.rpm 53 kB/s | 9.2 kB 00:00(2/4): python2-backports-ssl_match_hostname-3.7 63 kB/s | 16 kB 00:00(3/4): python2-ipaddress-1.0.23-1.oe1.noarch.rp 126 kB/s | 41 kB 00:00(4/4): python3-pycurl-7.43.0.3-1.oe1.x86_64.rpm 113 kB/s | 65 kB 00:00--------------------------------------------------------------------------------Total 226 kB/s | 131 kB 00:00warning: /var/cache/dnf/OS-fcb43ce6e8cef091/packages/python3-pycurl-7.43.0.3-1.oe1.x86_64.rpm: Header V3 RSA/SHA1 Signature, key ID b25e7f66: NOKEYOS 14 kB/s | 2.1 kB 00:00Importing GPG key 0xB25E7F66: Userid : "private OBS (key without passphrase) <[email protected]>" Fingerprint: 12EA 74AC 9DF4 8D46 C69C A0BE D557 065E B25E 7F66 From : http://repo.openeuler.org/openEuler-20.03-LTS-SP1/OS/x86_64/RPM-GPG-KEY-openEulerKey imported successfullyRunning transaction checkTransaction check succeeded.Running transaction testTransaction test succeeded.Running transaction Preparing : 1/1 Installing : python2-ipaddress-1.0.23-1.oe1.noarch 1/5 Installing : python2-backports-1.0-17.oe1.x86_64 2/5 Installing : python2-backports-ssl_match_hostname-3.7.0.1-2.oe1.n 3/5 Installing : python3-pycurl-7.43.0.3-1.oe1.x86_64 4/5 Installing : python-tornado-4.2.1-5.el7.x86_64 5/5 Running scriptlet: python-tornado-4.2.1-5.el7.x86_64 5/5 Verifying : python3-pycurl-7.43.0.3-1.oe1.x86_64 1/5 Verifying : python2-backports-1.0-17.oe1.x86_64 2/5 Verifying : python2-backports-ssl_match_hostname-3.7.0.1-2.oe1.n 3/5 Verifying : python2-ipaddress-1.0.23-1.oe1.noarch 4/5 Verifying : python-tornado-4.2.1-5.el7.x86_64 5/5Installed: python-tornado-4.2.1-5.el7.x86_64 python2-backports-1.0-17.oe1.x86_64 python2-backports-ssl_match_hostname-3.7.0.1-2.oe1.noarch python2-ipaddress-1.0.23-1.oe1.noarch python3-pycurl-7.43.0.3-1.oe1.x86_64Complete!装置胜利。 ...

November 9, 2022 · 2 min · jiezi

关于tornado:青山不遮毕竟东流集成Web30身份钱包MetaMask以太坊一键登录Tornado6Vuejs3

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_213 上世纪九十年代,海湾战争的时候,一位美军军官放心他们的五角大楼会被敌人的一枚导弹干掉,从而导致在寰球的美军基地处于瘫痪状态。这时候,有一位蠢才的科学家说,最好的核心就是没有核心。是的,这就是最奢侈的去中心化思维,于是互联网呈现了。一个没有互联网的时代是无奈设想的,互联网的外围就是把一个信息分成若干的小件,用不同的路径流传进来,怎么不便怎么走。 三十年后的明天,去中心化身份逐步被宽泛采纳。用户的局部在线流动在链上是公开的,可通过加密钱包搜寻到,用户在链上发明、奉献、赚取和领有的货色,都反映了他们的爱好,也逐步积攒成该用户的身份和标识。 当咱们的用户厌倦了传统的电子邮件/明码注册流程时,他们会抉择Google、GitHub等社交登录形式,这种形式尽管节约了用户的工夫,但登录信息也会被第三方平台记录,也就是说咱们用平台账号做了什么,平台都会高深莫测,甚至还会对咱们的行为进行剖析、画像。那么有没有一种登录形式,它的所有信息都只保留在客户端和后端,并不牵扯三方平台受权,最大化的保障用户隐衷呢?Web3.0给咱们提供了一种抉择:MetaMask。 MetaMaskMetaMask是用于与以太坊区块链进行交互的软件加密货币钱包。MetaMask容许用户通过浏览器插件或挪动应用程序拜访其以太坊钱包,而后能够应用这些扩大程序与去中心化应用程序进行交互。当然了,首先须要领有一个MetaMask钱包,进入https://chrome.google.com/web... 装置metamask浏览器插件: 随后点开插件,创立账号,记录明码、钱包地址、以及助记词等信息。 装置好插件之后,咱们就能够利用这个插件和网站利用做交互了。 钱包登录流程登录逻辑和传统的三方登录还是有差别的,传统三方登录个别是首先跳转三方平台进行受权操作,随后三方平台将code验证码返回给登录平台,登录平台再应用code申请三方平台换取token,再通过token申请用户账号信息,而钱包登录则是先在前端通过Web3.js浏览器插件中保留的私钥对钱包地址进行签名操作,随后将签名和钱包地址发送到后端,后端利用Web3的库用同样的算法进行验签操作,如果验签通过,则将钱包信息存入token,并且返回给前端。 前端签名操作首先须要下载前端的Web3.0操作库,https://docs.ethers.io/v4/,随后集成到登录页面中: <script src="{{ static_url("js/ethers-v4.min.js") }}"></script> <script src="{{ static_url("js/axios.js") }}"></script> <script src="{{ static_url("js/vue.js") }}"></script>这里咱们基于Vue.js配合Axios应用。 接着申明登录激活办法: sign_w3:function(){ that = this; ethereum.enable().then(function () { this.provider = new ethers.providers.Web3Provider(web3.currentProvider); this.provider.getNetwork().then(function (result) { if (result['chainId'] != 1) { console.log("Switch to Mainnet!") } else { // okay, confirmed we're on mainnet this.provider.listAccounts().then(function (result) { console.log(result); this.accountAddress = result[0]; // figure out the user's Eth address this.provider.getBalance(String(result[0])).then(function (balance) { var myBalance = (balance / ethers.constants.WeiPerEther).toFixed(4); console.log("Your Balance: " + myBalance); }); // get a signer object so we can do things that need signing this.signer = provider.getSigner(); var rightnow = (Date.now()/1000).toFixed(0) var sortanow = rightnow-(rightnow%600) this.signer.signMessage("Signing in to "+document.domain+" at "+sortanow, accountAddress, "test password!") .then((signature) => { that.handleAuth(accountAddress,signature); }); console.log(this.signer); }) } }) }) },通过应用signMessage办法返回签名,这里加签过程中应用基于工夫戳的随机数避免未签名,以后端签名生成好之后,立即异步申请后盾接口: ...

May 31, 2022 · 3 min · jiezi

tornado处理get请求时持续返回304状态码

Tornado源码分析 --- Etag实现Etag(URL的Entity Tag):对于具体Etag是什么,请求流程,实现原理,这里不进行介绍,可以参考下面链接:http://www.oschina.net/questi...https://zh.wikipedia.org/wiki... Tornado实现分析:先从Tornado处理一个请求的调用顺序开始看(摘自文档:http://www.tornadoweb.cn/documentation):程序为每一个请求创建一个 RequestHandler 对象程序调用 initialize() 函数,这个函数的参数是 Application 配置中的关键字 参数定义。(initialize 方法是 Tornado 1.1 中新添加的,旧版本中你需要 重写 __init__ 以达到同样的目的) initialize 方法一般只是把传入的参数存 到成员变量中,而不会产生一些输出或者调用像 send_error 之类的方法。程序调用 prepare()。无论使用了哪种 HTTP 方法,prepare 都会被调用到,因此 这个方法通常会被定义在一个基类中,然后在子类中重用。prepare可以产生输出 信息。如果它调用了finish(或send_error` 等函数),那么整个处理流程 就此结束。程序调用某个 HTTP 方法:例如 get()、post()、put() 等。如果 URL 的正则表达式模式中有分组匹配,那么相关匹配会作为参数传入方法。在一个请求结束的时候肯定会进行Etag的处理,所以找到调用的 finish() 函数:finish() 函数 ---- 地址:tornado/web.py(删除了部分不在此主题的代码) def finish(self, chunk=None): # Automatically support ETags and add the Content-Length header if # we have not flushed any content yet. if not self._headers_written: if (self._status_code == 200 and self.request.method in ("GET", "HEAD") and "Etag" not in self._headers): self.set_etag_header() if self.check_etag_header(): self._write_buffer = [] self.set_status(304) if self._status_code in (204, 304): assert not self._write_buffer, "Cannot send body with %s" % self._status_code self._clear_headers_for_304() elif "Content-Length" not in self._headers: content_length = sum(len(part) for part in self._write_buffer) self.set_header("Content-Length", content_length)分析: 在调用 finish() 函数的时候,对HTTP请求进行判断,如果 状态码为200,请求的方法为 GET 或 HEAD,并且 Etag 不在HTTP头信息里面,则说明该请求是第一次发生。接下来,调用 set_etag_header() 函数,将 etag 写入到 header头信息中 ...

August 21, 2019 · 2 min · jiezi

sqlalchemy-配置多连接读写库后的关系设置

前言一般来说,解决sqlalchemy 连接多个库的最简单的方式是新建两个或多个db.session 相互没有关联,modle配置不同的db.session来连接,这样的话,relationship正常配置就行,不用特殊配置.如果这样解决的话,也就不用看下面的配置了 # -*- coding:utf-8 -*-import flaskfrom flask_sqlalchemy import SQLAlchemy # Flask-SQLAlchemy 2.3.2from datetime import datetimefrom sqlalchemy.orm import backref, foreign # SQLAlchemy 1.3.1app = flask.Flask(__name__)app.config['DEBUG'] = Trueapp.config['SQLALCHEMY_BINDS'] = { 'read_db': 'mysql://reader:test@127.0.0.1:3306/test?charset=utf8', 'write_db': 'mysql://writer:test@127.0.0.2:3306/test?charset=utf8'}app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falseapp.config['SQLALCHEMY_ECHO'] = Falsedb = SQLAlchemy(app)class RDriver(db.Model): __bind_key__ = 'read_db' __tablename__ = 'driver' # __table_args__ = {'schema': 'test'} # 不可以加上 id = db.Column(db.Integer, primary_key=True, autoincrement=True) fk_user_id = db.Column(db.Integer, db.ForeignKey("user.id")) driver_name = db.Column(db.String(7)) create_time = db.Column(db.TIMESTAMP, default=datetime.now)class RUser(db.Model): __bind_key__ = 'read_db' __tablename__ = 'user' # __table_args__ = {'schema': 'test'} id = db.Column(db.Integer, primary_key=True, autoincrement=True) user_name = db.Column(db.String(32), index=True, unique=True) user_password = db.Column(db.String(32)) create_time = db.Column(db.TIMESTAMP, default=datetime.now) update_time = db.Column(db.TIMESTAMP, default=datetime.now) # 如下的五种方式都是可以的 # driver_fk = db.relationship("RDriver", foreign_keys='RDriver.fk_user_id') # driver_fk = db.relationship("RDriver", primaryjoin=lambda: RDriver.fk_user_id == RUser.id, viewonly=True) # driver_fk = db.relationship("RDriver", primaryjoin=RDriver.fk_user_id == id) fk_driver = db.relationship("RDriver", primaryjoin='RDriver.fk_user_id == RUser.id') # driver_fk = db.relationship("RDriver", backref=db.backref('user', lazy=True), # primaryjoin=lambda: RDriver.fk_user_id == RUser.id, viewonly=True)class WDriver(db.Model): __bind_key__ = 'write_db' __tablename__ = 'driver' __table_args__ = {'schema': 'test', 'extend_existing': True} # 这个配置很关键 id = db.Column(db.Integer, primary_key=True, autoincrement=True) fk_user_id = db.Column(db.Integer, db.ForeignKey("test.user.id")) # test.user.id很关键 plate = db.Column(db.String(7)) create_at = db.Column(db.TIMESTAMP, default=datetime.now)class WUser(db.Model): __bind_key__ = 'write_db' __tablename__ = 'user' __table_args__ = {'schema': 'test', 'extend_existing': True} # 这个配置很关键 id = db.Column(db.Integer, primary_key=True, autoincrement=True) hash = db.Column(db.String(256), nullable=False) user_no = db.Column(db.String(32), index=True, unique=True) # 用户工号 create_time = db.Column(db.TIMESTAMP, default=datetime.now) update_time = db.Column(db.TIMESTAMP, default=datetime.now) # 以下五种方式都是可以的 # fk_driver = db.relationship("WDriver", foreign_keys='WDriver.fk_user_id', uselist=False) # fk_driver = db.relationship("WDriver", primaryjoin=lambda: WDriver.fk_user_id == WUser.id) fk_driver = db.relationship("WDriver", primaryjoin=WDriver.fk_user_id == id) # fk_driver = db.relationship("WDriver", primaryjoin='WDriver.fk_user_id == WUser.id') # fk_driver = db.relationship("WDriver", backref=db.backref('test.user', lazy=True), # primaryjoin=lambda: WDriver.fk_user_id == WUser.id)r_user_obj = RUser.query.filter_by().first()print("r_user_obj:", r_user_obj)print("r_user_obj.driver_fk:", r_user_obj.fk_driver)w_user_obj = WUser.query.filter_by(id=2188).first()print("w_user_obj:", w_user_obj)print("w_user_obj.driver_fk:", w_user_obj.fk_driver)参考文档:* https://docs.sqlalchemy.org/en/13/orm/relationship_api.html # 值得细看* https://www.osgeo.cn/sqlalchemy/orm/relationship_api.html # 同上,中文* https://www.cnblogs.com/srd945/p/9851227.html* extend_existing: (False)当表已经存在于元数据中时,如果元数据中存在与column_list中的列同名的列,column_list中同名的列会替换掉元数据中已经有的列* useexisting已被废弃, 新版本使用extend_existing总结关系配置参数真的很多,如下,很容易就会出错,需要多读读官方文档,还有就是建立modle时候尽量简洁,风格统一,不要在数据库层建立外键. ...

June 4, 2019 · 2 min · jiezi

Tornado进阶从源码开始转载

一、tornado概述 二、epoll预备 三、源码结构 四、tornado的http层 五、tornado的tcp层 六、tornado的ioloop分析 七、tornado请求1 八、tornado请求2 ps: 在此感谢jason_wang同学,附其博客地址

May 31, 2019 · 1 min · jiezi

开源运维自动化平台opendevops

开源运维自动化平台-opendevops简介官网 | Github | 在线体验 CODO是一款为用户提供企业多混合云、自动化运维、完全开源的云管理平台。 CODO前端基于Vue iview开发、为用户提供友好的操作界面,增强用户体验。 CODO后端基于Python Tornado开发,其优势为轻量、简洁清晰、异步非阻塞。 CODO开源多云管理平台将为用户提供多功能:ITSM、基于RBAC权限系统、Web Terminnal登陆日志审计、录像回放、强大的作业调度系统、CMDB、监控报警系统、DNS管理、配置中心等 产品架构 产品功能 Demo我们提供了Demo供使用者体验,可点击Try Online Demo快速进行体验。 <img src="https://img.alicdn.com/tfs/TB...; width="180" /> 地址:http://demo.opendevops.cn/login用户:demo密码:2ZbFYNv9WibWcR7GB6kcEY 开始使用注意:由于是微服务部署比较复杂,我们目前只支持分布式一步步部署,depoly.sh暂不更新,后续会提供其余快速部署方式,请知悉。官方文档更新日志快速体验分布式部署文档模块链接CODO 项目我们是使用模块化、微服务化,以下为各个模块地址,同时也欢迎业界感兴趣各位大佬前来贡献前端代码:codo管理后端:codo-admin定时任务:codo-cron任务调度:codo-task资产管理:codo-cmdb配置中心:codo-kerrigan运维工具:codo-tools域名管理:codo-dns功能截图

May 27, 2019 · 1 min · jiezi

用tornado实现图片标记

背景介绍 在文章Keras入门(四)之利用CNN模型轻松破解网站验证码中,其中的验证码图片标记是采用tornado实现的网页实现的。本文将会讲述如何利用tornado来实现图片标记。 我们的示例图片如下:我们实现用tornado来实现一个网站,能够很好地帮助我们完成图片标记,也就是我们只需要输入图片中的数字,那么保存后的图片名称就是输入的数字。 下面,让我们来一起看一下怎么这个功能?项目结构 项目的名称为captcha_tagging,项目的完整结构如下:其中,images文件夹中的图片是我们需要标记的图片,标记完后的图片存放在new_images文件夹,网页模板文件为index.html,控制程序的脚本为server.py。程序实现 网页模板文件index.html的完整代码如下:<!DOCTYPE html><html lang=“en”><head> <meta charset=“UTF-8”> <title>图片浏览</title></head><body> <div align=“center”> <br><br> <img src="{{static_url(‘images/%s’ % img_src)}}" style=“width:100;height:44”/> <form action=’/index’ method=‘post’> value: <input type=“text” name=“rename” /><br> imgName: <input type=“text” name=“imgname” value="{{imgname}}"/><br> <button type=“submit”>提交</button> </form> </div></body></html> 控制程序的脚本server.py的完整代码如下:# -- coding: utf-8 --import cv2import randomimport os.pathimport tornado.webimport tornado.ioloopimport tornado.optionsimport tornado.httpserverfrom tornado.options import define, options# 保存图片def tag_picture(imagepath, name): image = cv2.imread(imagepath, 1) cv2.imwrite(’%s/new_images/%s.png’ % (os.path.dirname(os.path.abspath(file)), name), image)#定义端口为9100define(“port”, default=9100, type=int)# 随机获取目录下的一张图片def get_image(dir): files = os.listdir(dir) return random.choice(files)class ImageHandler(tornado.web.RequestHandler): # get函数 def get(self): dir = ‘./static/images’ img_src = get_image(dir) self.render(‘index.html’, img_src=img_src, imgname=img_src) # post函数 def post(self): filename = self.get_argument(‘rename’) imgname = self.get_argument(‘imgname’) imagepath = os.path.dirname(file)+’/static/images/%s’ % imgname tag_picture(imagepath, filename) # 保存新图片 os.system(‘rm -rf %s’ % imagepath) # 删除原图片 print(len(os.listdir(’./static/images’))) # 剩余图片数量 dir = ‘./static/images’ img_src = get_image(dir) self.render(‘index.html’, img_src=img_src, imgname=img_src)# 主函数def main(): # 开启tornado服务 tornado.options.parse_command_line() # 定义app app = tornado.web.Application( handlers=[(r’/index’, ImageHandler) ], # 网页路径控制 template_path=os.path.join(os.path.dirname(file), “templates”), # 模板路径 static_path=os.path.join(os.path.dirname(file), “static”), # 静态文件路径 ) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()main()程序运行 运行server.py,在浏览器中输入localhost:9100/index,界面如下:在value文本框中输入验证码中的数字8513,然后点提交即可,我们就会发现在new_images文件夹下已保存好了我们刚才标记的图片,而且页面会自动跳转至下一张需要标记的图片。 利用这个程序,我们只需要标记看到的图片,后台会帮你保存标记的结果,并删除原来的图片,提示你还剩多少张图片需要标记,如果显示剩余数量为0,则标记完毕。笔者在将近一小时的时间里标记了1000张验证码。总结 想要用神经网络模型去训练数据,那么首先就需要标记好的数据。如果需要自己标记数据的话,这将是非常麻烦且耗时的过程,笔者提供了一种图片标记的思路,希望能给读者一些启发~本项目已放至Github,地址为:https://github.com/percent4/CAPTCHA-Recognizition 。注意:本人现已开通微信公众号: Python爬虫与算法(微信号为:easy_web_scrape), 欢迎大家关注哦~~ ...

March 26, 2019 · 1 min · jiezi

django开发-d'jango和tornado的不同

python中常用的几个web框架有django, tornado, flask等,今天来总结一下django和tornado的不同。工作中django和tornado都用过,使用django相对更多一些。个人感觉django虽然好用,有搭建项目快、自带ORM、自动生成路由、自带管理后台等优势;但若实际工作中选择,我还是会偏向于使用tornado框架,因为torndo使用更加灵活,并且支持websocket,tcp等通信协议,最重要的是tornado是异步非阻塞的web框架;而在django中要实现websocket、异步非阻塞等功能则需要引入dwebsocket、celery等第三方模块。本文使用的环境是python3.6, django2.0, tornado5.1。下面主要从以下几个方面介绍一下这两个框架的不同:1.创建项目的方式2.数据库连接3.异步非阻塞请求4.websocket的使用1.项目创建方式1)djangodjango主要是通过下面两个命令创建项目:django-admin startproject Test # 创建项目,名称为Testdjango-admin startpapp Test01 # 创建app,名称为Test01执行完成后,会生成如下的目录结构:D:.│ manage.py│ test.txt│ ├─.idea│ │ misc.xml│ │ modules.xml│ │ Test.iml│ │ workspace.xml│ │ │ └─inspectionProfiles│ profiles_settings.xml│ ├─Test│ settings.py│ urls.py│ wsgi.py│ init.py│ └─Test01 │ admin.py │ apps.py │ models.py │ tests.py │ views.py │ init.py │ └─migrations init.py主要是manage.py,Test,Test01这几个文件和文件夹,manage.py是管理项目的文件,通过它运行django的一些内置命令,如模型迁移、启动项目等;Test/settings.py是配置文件,项目配置存放在这里Test/urls.py是路由文件,负责分发http请求Test01/models.py是模型文件,Test01下创建的模型就放在这里,模型负责将表结构映射到数据库中Test01/views.py是视图文件,django中的视图在这里定义Test01/migrations目录中存放迁移后生成的迁移文件。django项目的基本结构就是这样。2)tornadotornado项目的创建比较灵活,没有什么项目名称和app的概念,全靠自己组织项目,就是创建一个个python文件和python package。可以像下面一样来组织tornado项目:├── App│ ├── init.py│ ├── Shop│ │ ├── init.py│ │ └── views.py│ └── User│ ├── init.py│ └── views.py├── application.py├── Config│ ├── config_base.py│ ├── config_db.conf│ ├── config_db_get.py│ ├── config_engine.py│ ├── init.py├── Models│ ├── init.py│ ├── Shop│ │ └── init.py│ └── User│ ├── BaseClass.py│ ├── init.py│ └── UserModel.py├── server.py├── static│ └── init.py├── templates│ └── init.py├── test.py└── Urls ├── init.py ├── Shop.py └── User.py这里有几个主要文件App, Config, Models, Urls, static, templates, application.py, server.py。项目的app可以集中放在App目录中,与数据库对应的模型文件可以放在Models中,http路由可以放在Urls中,项目配置信息可以放在Config目录中,静态文件和模板分别放在static和templates中。application.py文件可以加载路由信息和项目配置信息,server.py文件负责启动项目。项目的基本配置信息可以放在Config/config_base.py中,如下:# coding=utf-8import osBASE_DIR = os.path.dirname(file)# 参数options = { “port”: 8001,}# 基本配置信息settings = { “debug”: True, “static_path”: os.path.join(BASE_DIR, “static”), “template_path”: os.path.join(BASE_DIR, “templates”)}路由信息可以放在Urls/User.py中,如下:# coding=utf-8from App.UserInfo import viewsuser_urls = [ (r’/user/’, views.IndexHandler),]application.py中加载路由信息和配置信息:# coding=utf-8from tornado import ioloop, httpserverfrom application import Applicationfrom Config import config_baseif name == ‘main’: app = Application() http_server = httpserver.HTTPServer(app) http_server.listen(config_base.options.get(“port”)) ioloop.IOLoop.current().start()2.数据库连接1)djangodjango中使用数据库时,首先要在settings.py中配置数据库信息:DATABASES = { ‘default’: { ‘ENGINE’: ‘django.db.backends.mysql’, # 数据库引擎 ‘NAME’: ‘django_test’, # 你要存储数据的库名,事先要创建之 ‘USER’: ‘root’, # 数据库用户名 ‘PASSWORD’: ’test’, # 密码 ‘HOST’: ’localhost’, # 主机 ‘PORT’: ‘3306’, # 数据库使用的端口 }}然后在每个app下编写完models.py后,执行以下两个命令后,就可以使用数据库了:python manage.py makemigrationspython manage.py migrate可以调用模型管理器对象objects的相应方法,执行增删改查等操作。2)tornado这里说一下在tornado中使用sqlalchemy连接数据库,需要安装sqlalchemy和pymysql。2.2.1)首先在Config/config_db.conf中配置数据库信息:[db_user]name = db_tornado03port = 3306user = roothost = 127.0.0.1pass = testpool_size = 32.2.2)然后在Config/config_engine.py中配置engine:# coding=utf-8from sqlalchemy import create_enginefrom Config.config_db_get import ConfigDBUser# 数据库配置信息 可以配置多个engine, 每个数据库对应一个enginedb_user = ConfigDBUser(“db_user”)engine_user = create_engine( “mysql+pymysql://%s:%s@%s:%d/%s” % ( db_user.get_db_user(), db_user.get_db_pass(), db_user.get_db_host(), db_user.get_db_port(), db_user.get_db_database() ), encoding=‘utf-8’, echo=True, pool_size=20, pool_recycle=100, connect_args={“charset”: ‘utf8mb4’})create_engine用来初始化数据库连接。2.2.3)在Models/UserInfo/BaseClass.py中配置连接数据库的session信息:# coding=utf-8from sqlalchemy.orm import scoped_session, sessionmakerfrom Config.config_engine import engine_userclass BaseClass: def init(self): # 创建session对象,并且用scoped_session维护session对象 # 数据库的增删改查通过session对象来完成 self.engine_user = scoped_session( sessionmaker( bind=engine_user, autocommit=False, autoflush=True, expire_on_commit=False ) )2.2.4)在Models/UserInfo/UserModel.py中配置模型信息,用于映射到数据库中对应的表:# coding=utf-8from sqlalchemy import Table, MetaDatafrom sqlalchemy.ext.declarative import declarative_basefrom Config.config_engine import engine_userBaseModel = declarative_base()def user_model(table_name): class UserModel(BaseModel): tablename = table_name metadata = MetaData(engine_user) Table(tablename, metadata, autoload=True) return UserModel配置模型信息前,需要在数据库中把表创建好,这是就需要写sql语句创建表了。对于熟练sql的同学,写sql语句应该不算什么;对应不熟悉sql的同学,可能更习惯于django中那种创建表的方式。2.2.5)以上都配置好以后,就可以在视图中使用了App/UserInfo/views.py:# coding=utf-8from tornado import webfrom Models.UserInfo.BaseClass import BaseClassfrom Models.UserInfo.UserModel import user_modelclass UserInfoHandler(web.RequestHandler, BaseClass): def get(self): """ 获取用户信息 :return: """ # user_model中的参数对应数据库中的表名 user_info = user_model(“user_info”) # 获取参数 user_id = self.get_query_argument(“id”) # self.engine_user其实就是一个session对象;query()方法会返回一个query.Query对象,通过这个对象查询数据库 user_info_obj = self.engine_user.query(user_info).filter(user_info.id==user_id).first() self.write(user_info_obj.name) self.finish()2.2.6)最后配置好url:Urls/UserInfo.py:# coding=utf-8from App.UserInfo import viewsuser_urls = [ (r’/userinfo’, views.UserInfoHandler),]application.py:# coding=utf-8from tornado import webfrom Config.config_base import settingsfrom Urls.UserInfo import user_urlsfrom Urls.Shop import shop_urls"““路由配置”““class Application(web.Application): def init(self): urls = user_urls + shop_urls super(Application, self).init(urls, **settings)启动服务后,就可以访问了。3.异步非阻塞请求1)djangodjango中可以通过celery来实现异步任务,也可以使用asyncio和aiohttp实现异步。下面讲一下celery的使用:3.1.1)首先需要安装 celery和 django-celery,使用pip安装就行了;3.1.2)然后在zsettings.py中进行如下配置:在INSTALLED_APPS中加入djcelery。import djcelery# Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery taskdjcelery.setup_loader()BROKER_URL = ‘redis://127.0.0.1:6379/2’CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/3’ # 或者使用rabbitmq: BROKER_URL = ‘amqp://test:test@192.168.173.1:5672/testhost’CELERY_RESULT_BACKEND = ‘amqp://test:test@192.168.173.1:5672/testhost'3.1.3)在需要使用异步的app中创建tasks.py文件,然后编辑该文件:# coding=utf-8import timefrom celery import task@taskdef test(data): "”” 预处理 :param data: :return: "”" time.sleep(3) return data耗时的任务就可以放在使用@task修饰的函数中3.1.4)在views.py中调用tasks.py中的函数from rest_framework.response import Responsefrom .tasks import testclass CeleryTrainView(APIView): def get(self, request): try: for i in range(0, 5): ret = test.delay(str(i)) print(“ret:”, ret) except Exception as e: return Response(dict(msg=str(e), code=10001)) return Response(dict(msg=“OK”, code=10000))上面的结果ret是一个AsyncResult对象,可以通过这个对象拿到保存在CELERY_RESULT_BACKEND中的结果。如果想立即得到结果,可以直接调用get()方法,但是这样就会阻塞其他请求,直到结果返回:from rest_framework.response import Responsefrom .tasks import testclass CeleryTrainView(APIView): def get(self, request): try: for i in range(0, 5): ret = test.delay(str(i)) print(“ret:”, ret.get()) except Exception as e: return Response(dict(msg=str(e), code=10001)) return Response(dict(msg=“OK”, code=10000))3.1.5)启动celery#先启动服务器python manage.py runserver#再启动worker python manage.py celery worker2)tornadotornado中实现异步有回调和协程这两种方式,这里只举一个协程实现异步的例子:from tornado import webfrom tornado import genfrom tornado.httpclient import AsyncHTTPClientclass AsyncHandler(web.RequestHandler): @gen.coroutine def get(self, *args, **kwargs): client = AsyncHTTPClient() url = ‘http://ip.taobao.com/service/getIpInfo.php?ip=14.130.112.24' # 根据ip地址获取相关信息 resp = yield client.fetch(url) data = str(resp.body, encoding=“utf-8”) print(“data:”, data) self.write(data) self.finish()或者像下面这样,把获取ip信息的部分封装成一个函数:from tornado import webfrom tornado import genfrom tornado.httpclient import AsyncHTTPClientclass AsyncHandler(web.RequestHandler): @gen.coroutine def get(self, *args, **kwargs): ip_info = yield self.get_ip_info() self.write(ip_info) self.finish() @gen.coroutine def get_ip_info(self): client = AsyncHTTPClient() url = ‘http://ip.taobao.com/service/getIpInfo.php?ip=14.130.112.24' resp = yield client.fetch(url) data = str(resp.body, encoding=“utf-8”) return data也可以同时发起多个异步请求:from tornado import webfrom tornado import genfrom tornado.httpclient import AsyncHTTPClientclass AsyncHandler(web.RequestHandler): @gen.coroutine def get(self, *args, **kwargs): ips = [ “14.130.112.24”, “14.130.112.23”, “14.130.112.22” ] info1, info2, info3 = yield [self.get_ip_info(ips[0]), self.get_ip_info(ips[1]), self.get_ip_info(ips[2])] self.write(info1) self.write(info2) self.write(info3) self.finish() @gen.coroutine def get_ip_info(self, ip): client = AsyncHTTPClient() url = ‘http://ip.taobao.com/service/getIpInfo.php?ip=' + ip resp = yield client.fetch(url) data = str(resp.body, encoding=“utf-8”) return dataAsyncHTTPClient的fetch()方法有两种调用方式,一种是像上面那样只传入一个url的字符串,另一种是接收一个HTTPRequest对象作为参数,像下面这样: @gen.coroutine def get_ip_info(self, ip): client = AsyncHTTPClient() url = ‘http://ip.taobao.com/service/getIpInfo.php?ip=' + ip header = {‘Accept’: ‘application/json;charset=utf-8’, ‘Content-Type’: ‘application/x-www-form-urlencoded;charset=utf-8’} param1 = ’test’ http_request = HTTPRequest(url=url, method=‘POST’, headers=header, body=urlencode({‘param1’: param1})) resp = yield client.fetch(http_request) data = str(resp.body, encoding=“utf-8”) return data4.websocket的使用1)djangodjango中使用websocket需要安装第三方包dwebsocket。2)tornadotornado中实现websocket功能需要用到tornado.websocket模块,主要有以下几个方法:open(), write_message(), on_message(), on_close()open(): 当websocket客户端连接时所做的操作write_message(): 使用这个方法向客户端发送消息on_message(): 接收并处理客户端的消息on_close(): websocket关闭连接时所作的操作下面看一个例子:views.py:from tornado import websocketclass IndexHandler(web.RequestHandler): def get(self, *args, **kwargs): self.render(“chat.html”)class ChatHandler(websocket.WebSocketHandler): clients = set() def open(self, *args, **kwargs): self.clients.add(self) for client in self.clients: client.write_message("%s上线了" % self.request.remote_ip) def on_message(self, message): for client in self.clients: client.write_message("%s: %s" % (self.request.remote_ip, message)) def on_close(self): self.clients.remove(self) for client in self.clients: client.write_message("%s下线了" % self.request.remote_ip) def check_origin(self, origin): """ 用于处理跨域问题 :param origin: :return: """ return True路由:# coding=utf-8from App.UserInfo import viewsuser_urls = [ (r’/index’, views.IndexHandler), (r’/chat’, views.ChatHandler),]chat.html:<!DOCTYPE html><html lang=“en”><head> <meta charset=“UTF-8”> <title>聊天室</title></head><body> <div id=“content” style=“height: 500px;overflow: auto;"></div> <div> <textarea id=“msg”></textarea> <a href=“javascript:;” onclick=“sendMsg()">发送</a> </div> <script src=”{{ static_url(‘js/jquery.min.js’) }}"></script> <script type=“text/javascript”> var ws = new WebSocket(“ws://192.168.1.104:8001/chat”); ws.onmessage = function (data) { $("#content”).append("<p>"+ data.data +"</p>") }; function sendMsg() { var msg = $("#msg").val(); if (msg) { ws.send(msg); } } </script></body></html>上面一个例子通过websocket实现了简单的聊天室功能。以上就简单的比较了django和tornado几个方面的不同,它们各有优缺点,实际工作中可以根据不同的需求选择不同的框架进行开发。如果想了解如何在tornado中使用tcpserver,可以看一下这篇博客:tornado中tcpserver和tcpclient的使用 ...

February 22, 2019 · 4 min · jiezi

利用tornado实现表格文件预览

项目介绍 本文将介绍笔者的一个项目,主要是利用tornado实现表格文件的预览,能够浏览的表格文件支持CSV以及Excel文件。预览的界面如下: 下面我们将看到这个功能是如何通过tornado来实现的。代码 该项目的代码结构如下图所示: 其中主要分为四个部分:filesstatictemplatespy代码其中,files文件夹为上传的表格文件的存放路径,static为前端的静态文件,后续将不用给出介绍,读者可以从该项目的github中下载(下载地址详见后面),templates文件夹主要存放HTML文件,而py文件用于后端控制。 首先让我们看三个HTML文件,先是upload.html,其代码如下:<!DOCTYPE html><html lang=“en”><head> <meta charset=“UTF-8”> <title>文件上传</title> <link rel=“shortcut icon” href="{{static_url(‘images/flower.ico’)}}"> <link rel=“stylesheet” href="{{static_url(‘CSS/amazeui.min.css’)}}"> <script src="{{static_url(‘JS/amazeui.min.js’)}}"></script> <script> $(function() { $(’#doc-form-file’).on(‘change’, function() { var fileNames = ‘’; $.each(this.files, function() { fileNames += ‘<span class=“am-badge”>’ + this.name + ‘</span> ‘; }); $(’#file-list’).html(fileNames); }); }); </script></head><body> <div align=“center”> <br><br> <h1>表格文件上传</h1> <form action=‘file’ enctype=“multipart/form-data” method=‘post’> <div class=“am-form-group am-form-file”> <button type=“button” class=“am-btn am-btn-primary am-btn-sm”>选择要上传的文件</button> <input id=“doc-form-file” type=“file” name=“file” multiple> </div> <div id=“file-list”></div> <p> <button type=“submit” class=“am-btn am-btn-default”>提交</button> </p> </form> <p><a href="/file_review"><button class=“am-btn am-btn-danger”>查看全部文件</button></a></p> </div></body></html>这个是文件上传的网页,界面如下:选择上传文件,完成上传后,则会显示如下界面:接着是fileReview.html,其代码如下:<!DOCTYPE html><html lang=“en”><head> <meta charset=“UTF-8”> <title>文件浏览</title> <link rel=“shortcut icon” href="{{static_url(‘images/flower.ico’)}}"> <link rel=“stylesheet” href="{{static_url(‘CSS/bootstrap.min.css’)}}"> <link rel=“stylesheet” href="{{static_url(‘CSS/amazeui.min.css’)}}"></head><body> <div align=“center”> <br><br> <h1>文件浏览</h1> <ul class=“list-group” style=“width:800px;text-align:left”> {% for file in files %} {% if file.endswith(’.csv’) or file.endswith(’.xls’) or file.endswith(’.xlsx’) %} <li class=“list-group-item”> <a href={{"/data?file="+file}}>{{ file }}</a></li> {% end %} {% end %} </ul> <a href="/file"><button class=“btn btn-success” id=“review”>文件上传界面</button></a> </div></body></html>该页面主要用于显示上传的表格文件,界面如下:最后是dataReview.html,代码如下:<!DOCTYPE html><html><head> <meta charset=“UTF-8”> <title>数据预览</title> <link rel=“shortcut icon” href="{{static_url(‘images/flower.ico’)}}"> <link rel=“stylesheet” href="{{static_url(‘CSS/table.css’)}}"> <link rel=“stylesheet” href="{{static_url(‘CSS/bootstrap.min.css’)}}"></head><body> <br><br> <div align=“center”> <div style=“width:800px”> <table class=“table table-striped table-bordered table-condensed table-responsive”> <thead id=“index”> <tr> {% for title in data[0] %} <th>{{ title }}</th> {% end %} </tr> </thead> <tbody id=“body”> {% for line in data[1:] %} <tr> {% for cell in line %} <td>{{ cell }}</td> {% end %} </tr> {% end %} </tbody> </table> </div> <a href="/file"><button class=“btn btn-warning” id=“review”>文件上传界面</button></a> </div></body></html>该界面主要用于显示表格文件中的数据,比如刚才上传成功的Excel文件,其中的数据如下: 仅有HTML页面是不够的,我们还需要Python代码来控制网页的运行,这就是server.py,其中的代码如下:# -- coding: utf-8 --import xlrdimport os.pathimport tornado.httpserverimport tornado.ioloopimport tornado.optionsimport tornado.webfrom tornado.options import define, options#定义端口为12306define(“port”, default=12306, help=“run on the given port”, type=int)class UploadFileHandler(tornado.web.RequestHandler): # get函数 def get(self): self.render(‘upload.html’) # post函数 def post(self): # 文件的存放路径 upload_path = os.path.join(os.path.dirname(file), ‘files’) # 提取表单中‘name’为‘file’的文件元数据 file_metas = self.request.files[‘file’] for meta in file_metas: filename = meta[‘filename’] filepath = os.path.join(upload_path, filename) # 有些文件需要已二进制的形式存储,实际中可以更改 with open(filepath, ‘wb’) as up: up.write(meta[‘body’]) self.write("<br><br>") self.write(’<p>上传%s成功!</p>’ % filename) self.write(’<p><a href="/file_review"><button>查看全部文件</button></a></p>’)class FileReviewHandler(tornado.web.RequestHandler): def get(self): # 文件的存放路径 upload_path = os.path.join(os.path.dirname(file), ‘files’) files = os.listdir(upload_path) for file in files: if os.path.isdir(file): files.remove(file) self.render(‘fileReview.html’, files=files)class DataReviewHandler(tornado.web.RequestHandler): def get(self): filename = self.get_argument(‘file’) print(filename) # 文件的存放路径 upload_path = os.path.join(os.path.dirname(file), ‘files’) file_path = os.path.join(upload_path, filename) if filename.endswith(’.csv’): with open(file_path, “r”) as f: data = f.readlines() data = [line.strip().split(’,’) for line in data] elif filename.endswith(’.xls’) or filename.endswith(’.xlsx’): tables = xlrd.open_workbook(file_path) table = tables.sheets()[0] # 第一张表格 nrows = table.nrows # 循环行列表数据 data = [] for i in range(nrows): data.append(table.row_values(i)) else: data = [] self.render(‘dataReview.html’, data=data)# 主函数def main(): # 开启tornado服务 tornado.options.parse_command_line() # 定义app app = tornado.web.Application( handlers=[(r’/file’, UploadFileHandler), (r’/file_review’, FileReviewHandler), (r’/data’, DataReviewHandler) ], # 网页路径控制 template_path=os.path.join(os.path.dirname(file), “templates”), # 模板路径 static_path=os.path.join(os.path.dirname(file), “static”), # 配置静态文件路径 ) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()main()点击运行server.py文件,在浏览中输入“localhost:12306/file”就能看到刚才的文件上传的页面了。 到此,我们就讲完了这个项目的结构,我们省去了static文件的讲述,因为这并不影响程序的运行,只是页面的样式会比较丑陋,如果您想获得较好的浏览效果,可以从该项目的github地址中下载static文件夹,不必再自己重头写起。使用 笔者提供了以下三种方式供读者使用该项目:直接使用github使用docker使用直接使用 读者按照上面的讲解,自己写一个项目,拷贝static文件夹,然后点击运行server.py,在浏览中输入“localhost:12306/file”就能使用该程序来浏览上传的表格了。github使用 从该项目的github地址:https://github.com/percent4/c…,命令如下:git initgit clone https://github.com/percent4/csv_file_review然后安装必要的第三方模块:xlrd, tornado, 点击运行server.py,在浏览中输入“localhost:12306/file”就能使用该程序来浏览上传的表格了。docker使用 首先拉取docker镜像:docker pull jclian91/dockertest:csv_file_review.2019.02.21.2312然后运行该镜像:docker run -p 12306:12306 -v $PWD/db:/root/csv_file_review/src/files -it c97f252cd6e8 bash注意, -it后面为刚才拉取的docker镜像的ID,需要将ID替换为你刚拉取的镜像ID,运行端口为本机的12306,上传的表格数据存放在$PWD/db路径下。进入虚拟机后,运行server.py即可启动服务,[root@fbb2c3fb6ce1 src]# ls__init__.py files server.py static templates[root@fbb2c3fb6ce1 src]# python server.py在浏览中输入“localhost:12306/file”就能使用该程序来浏览上传的表格了。总结 关于本项目的介绍就到这儿了,感谢大家阅读~ 如您对本项目的源代码感兴趣,可参考网址:https://github.com/percent4/c…注意:本人现已开通微信公众号: Python爬虫与算法(微信号为:easy_web_scrape), 欢迎大家关注哦~~ ...

February 22, 2019 · 3 min · jiezi

Flask在Windows环境下的部署

背景由于目前在用的Flask项目涉及到一部分依赖Windows的处理,还无法迁移到linux平台,那么在windows环境下,要怎么部署呢?思路根据Flask官网介绍,由于Flask内置的服务器性能不佳,推荐的主要的部署方式有如下几种:mod_wsgi (Apache)独立 WSGI 容器GunicornTornadoGeventuWSGIFastCGICGI上述这些部署方式,仅Tornado是支持在windows情况下部署的,配合上Nginx可以达到比较好的效果。可已参考Nginx与tornado框架的并发评测。但是在实际使用中发现,tornado 的稳定性虽然很高,但是在tornado上部署Flask,并不会有异步的效果。实际上还是单进程阻塞运行的,即使在Flask中配置了threaded = True也无法实现多线程使用。Flask多线程情况配置启用多线程:# manage.pyfrom flask_script import Serverserver = Server(host=“0.0.0.0”, threaded=True)在Flask中配置两条测试路由import time@main.route(’/test’)def maintest(): return ‘hello world’ @main.route(’/sleep’)def mainsleep(): time.sleep(60) return ‘wake up’先用浏览器访问\sleep:随即立刻访问\test:可见两次访问是不同的线程处理的,不会出现堵塞的情况。tornado + Flask多线程情况使用tornado托管:from tornado.wsgi import WSGIContainerfrom tornado.httpserver import HTTPServerfrom tornado.ioloop import IOLoopfrom yourapplication import apphttp_server = HTTPServer(WSGIContainer(app))http_server.listen(5000)IOLoop.instance().start()先用浏览器访问\sleep:随即立刻访问\test:可以发现,虽然tornado框架是支持异步的,但是由于实际上后台的处理是同步的,从而无法实现异步的处理的效果。如果想后台的处理也异步,则需要直接使用tornado来开发。那么为什么使用tornado来托管flask呢?Tornado 是一个开源的可伸缩的、非阻塞式的 web 服务器和工具集,它驱动了FriendFeed 。因为它使用了 epoll 模型且是非阻塞的,它可以处理数以千计的并发固定连接,这意味着它对实时 web 服务是理想的。把 Flask 集成这个服务是直截了当的根据官网描述,其实也是为了弥足flask自带服务器不稳定的问题。Flask高并发下的表现使用tsung进行压测,压力500:Namehighest 10sec meanlowest 10sec meanHighest RateMean RateMeanCountconnect34.30 msec31.91 msec506 / sec356.60 / sec33.19 msec103908page0.42 sec0.29 sec505 / sec356.32 / sec0.39 sec103782request0.42 sec0.29 sec505 / sec356.32 / sec0.39 sec103782session1mn 24sec10.64 sec11.4 / sec1.21 / sec14.24 sec362CodeHighest RateMean RateTotal number200505 / sec356.32 / sec104792NameHighest RateTotal numbererror_abort0.5 / sec1error_abort_max_conn_retries11.7 / sec362error_connect_econnrefused58.6 / sec1667可见,在500的并发下,效果不佳,有很多的链接拒绝。Flask + Nginx在高并发下的表现使用tsung进行压测,压力500:Namehighest 10sec meanlowest 10sec meanHighest RateMean RateMeanCountconnect0.20 sec30.95 msec1810.5 / sec626.43 / sec0.11 sec189853page0.68 sec0.17 sec1810.1 / sec625.72 / sec0.40 sec189581request0.68 sec0.17 sec1810.1 / sec625.72 / sec0.40 sec189581CodeHighest RateMean RateTotal number200906.4 / sec196.08 / sec606895021443.9 / sec430.02 / sec129006NameHighest RateTotal numbererror_abort0.5 / sec1情况差不多,Flask服务器表现还算稳定,那么尝试增加后台Flask服务器数量(通过多端口实现):python manage.py runserver –port=8001python manage.py runserver –port=8002python manage.py runserver –port=8003python manage.py runserver –port=8004使用tsung进行压测,压力500,4个Flask服务器:Namehighest 10sec meanlowest 10sec meanHighest RateMean RateMeanCountconnect0.18 sec32.57 msec3510.1 / sec639.92 / sec0.11 sec195154page0.49 sec85.30 msec3512.1 / sec639.07 / sec0.35 sec194856request0.49 sec85.30 msec3512.1 / sec639.07 / sec0.35 sec194856CodeHighest RateMean RateTotal number2003510.1 / sec639.50 / sec194986NameHighest RateTotal numbererror_abort0.333333333333333 / sec1这个效果妥妥的。使用tsung进行压测,压力1000,4个Flask服务器:Namehighest 10sec meanlowest 10sec meanHighest RateMean RateMeanCountconnect0.20 sec32.63 msec2983.8 / sec492.94 / sec98.56 msec150793page0.57 sec90.00 msec2976.4 / sec491.31 / sec0.40 sec150275request0.57 sec90.00 msec2976.4 / sec491.31 / sec0.40 sec150275CodeHighest RateMean RateTotal number2002981.4 / sec488.92 / sec14955650292.5 / sec4.02 / sec925NameHighest RateTotal numbererror_abort0.333333333333333 / sec1开始有一些502的超时错误了。使用tsung进行压测,压力1000,4个tornado服务器:Namehighest 10sec meanlowest 10sec meanHighest RateMean RateMeanCountconnect0.18 sec86.24 msec2052.1 / sec693.82 / sec0.14 sec208786page0.52 sec0.24 sec2060.7 / sec693.34 / sec0.45 sec208606request0.52 sec0.24 sec2060.7 / sec693.34 / sec0.45 sec208606CodeHighest RateMean RateTotal number2002056.6 / sec693.67 / sec208703在并发1000的情况下,是否使用tornado托管Flask效果差不多。结论根据上述测试,直接使用Flask服务器的话,由于并发处理较弱,会有各种超时或者连接拒绝的错误。通过搭配Nginx来进行缓冲,通过增加后端服务器数来提供并发处理量。所以最终选择了Nginx+后台4个Flask服务器的方式。由于目前Flask项目全体用户只有几千,目前并发情况很低,该方式完全满足使用。如果在更大型项目中,并发上万,建议还是考虑想办法迁移至Liunx环境,通过官方建议的方式部署。 ...

February 11, 2019 · 2 min · jiezi

tornado 源码之 coroutine 分析

tornado 源码之 coroutine 分析tornado 的协程原理分析 版本:4.3.0为支持异步,tornado 实现了一个协程库。tornado 实现的协程框架有下面几个特点:支持 python 2.7,没有使用 yield from特性,纯粹使用 yield 实现使用抛出异常的方式从协程返回值采用 Future 类代理协程(保存协程的执行结果,当携程执行结束时,调用注册的回调函数)使用 IOLoop 事件循环,当事件发生时在循环中调用注册的回调,驱动协程向前执行由此可见,这是 python 协程的一个经典的实现。本文将实现一个类似 tornado 实现的基础协程框架,并阐述相应的原理。外部库使用 time 来实现定时器回调的时间计算。 bisect 的 insort 方法维护一个时间有限的定时器队列。 functools 的 partial 方法绑定函数部分参数。 使用 backports_abc 导入 Generator 来判断函数是否是生成器。import timeimport bisectimport functoolsfrom backports_abc import Generator as GeneratorTypeFuture是一个穿梭于协程和调度器之间的信使。 提供了回调函数注册(当异步事件完成后,调用注册的回调)、中间结果保存、结束结果返回等功能add_done_callback 注册回调函数,当 Future 被解决时,改回调函数被调用。 set_result 设置最终的状态,并且调用已注册的回调函数协程中的每一个 yield 对应一个协程,相应的对应一个 Future 对象,譬如:@coroutinedef routine_main(): yield routine_simple() yield sleep(1)这里的 routine_simple() 和 sleep(1) 分别对应一个协程,同时有一个 Future 对应。class Future(object): def init(self): self._done = False self._callbacks = [] self._result = None def _set_done(self): self._done = True for cb in self._callbacks: cb(self) self._callbacks = None def done(self): return self._done def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def result(self): return self._resultIOLoop这里的 IOLoop 去掉了 tornado 源代码中 IO 相关部分,只保留了基本需要的功能,如果命名为 CoroutineLoop 更贴切。这里的 IOLoop 提供基本的回调功能。它是一个线程循环,在循环中完成两件事:检测有没有注册的回调并执行检测有没有到期的定时器回调并执行程序中注册的回调事件,最终都会在此处执行。可以认为,协程程序本身、协程的驱动程序 都会在此处执行。协程本身使用 wrapper 包装,并最后注册到 IOLoop 的事件回调,所以它的从预激到结束的代码全部在 IOLoop 回调中执行。而协程预激后,会把 Runner.run() 函数注册到 IOLoop 的事件回调,以驱动协程向前运行。理解这一点对于理解协程的运行原理至关重要。这就是单线程异步的基本原理。因为都在一个线程循环中执行,我们可以不用处理多线程需要面对的各种繁琐的事情。coroutine协程返回值sleepRunner因为没有使用 yield from,协程无法直接返回值,所以使用抛出异常的方式返回。copyrightauthor:bigfish copyright: 许可协议 知识共享署名-非商业性使用 4.0 国际许可协议 ...

January 16, 2019 · 1 min · jiezi

tornado 源码之 iostream.py

iostream.pyA utility class to write to and read from a non-blocking socket.IOStream 对 socket 进行包装,采用注册回调方式实现非阻塞。 通过接口注册各个事件回调_read_callback_write_callback_close_callback_connect_callbackioloop 中 socket 事件发生后,调用 IOStream._handle_events 方法,对事件进行分发。 对应的事件处理过程中,如果满足注册的回调条件,则调用回调函数 回调函数在 IOStream._handle_events 中被调用contentsiostream.pycontentsexampleheadIOStream.__init__IOStream.connectIOStream.read_untilIOStream.read_bytesIOStream.writeIOStream.closeIOStream._handle_eventsIOStream._run_callbackIOStream._run_callbackIOStream._read_from_socketIOStream._read_to_bufferIOStream._read_from_bufferIOStream._handle_connectIOStream._handle_writeIOStream._consumeIOStream._add_io_stateIOStream._read_buffer_sizecopyrightexample一个简单的 IOStream 客户端示例 由此可见, IOStream 是一个异步回调链创建 socket创建 IOStream 对象连接到主机,传入连接成功后回调函数 send_requestsocket 输出数据请求页面,读取 head,传入读取 head 成功后回调函数 on_headers继续读取 body,传入读取 body 成功后回调函数 on_body关闭 stream,关闭 ioloopfrom tornado import ioloopfrom tornado import iostreamimport socketdef send_request(): stream.write(“GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n”) stream.read_until("\r\n\r\n", on_headers)def on_headers(data): headers = {} for line in data.split("\r\n"): parts = line.split(":") if len(parts) == 2: headers[parts[0].strip()] = parts[1].strip() stream.read_bytes(int(headers[“Content-Length”]), on_body)def on_body(data): print data stream.close() ioloop.IOLoop.instance().stop()s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)stream = iostream.IOStream(s)stream.connect((“baidu.com”, 80), send_request)ioloop.IOLoop.instance().start()# html># <meta http-equiv=“refresh” content=“0; url=http://www.baidu.com/"># </html>headfrom future import with_statementimport collectionsimport errnoimport loggingimport socketimport sysfrom tornado import ioloopfrom tornado import stack_contexttry: import ssl # Python 2.6+except ImportError: ssl = NoneIOStream.__init__包装 socket 类 关键语句 self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) 将自身的_handle_events 加入到全局 ioloop poll 事件回调 此时只注册了 ERROR 类型事件_read_buffer: 读缓冲class IOStream(object): def init(self, socket, io_loop=None, max_buffer_size=104857600, read_chunk_size=4096): self.socket = socket self.socket.setblocking(False) self.io_loop = io_loop or ioloop.IOLoop.instance() self.max_buffer_size = max_buffer_size self.read_chunk_size = read_chunk_size self._read_buffer = collections.deque() self._write_buffer = collections.deque() self._write_buffer_frozen = False self._read_delimiter = None self._read_bytes = None self._read_callback = None self._write_callback = None self._close_callback = None self._connect_callback = None self._connecting = False self._state = self.io_loop.ERROR with stack_context.NullContext(): self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state)IOStream.connect连接 socket 到远程地址,非阻塞模式连接 socket注册连接完成回调poll 增加 socket 写事件 def connect(self, address, callback=None): “““Connects the socket to a remote address without blocking. May only be called if the socket passed to the constructor was not previously connected. The address parameter is in the same format as for socket.connect, i.e. a (host, port) tuple. If callback is specified, it will be called when the connection is completed. Note that it is safe to call IOStream.write while the connection is pending, in which case the data will be written as soon as the connection is ready. Calling IOStream read methods before the socket is connected works on some platforms but is non-portable. "”” self._connecting = True try: self.socket.connect(address) except socket.error, e: # In non-blocking mode connect() always raises an exception if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK): raise self._connect_callback = stack_context.wrap(callback) self._add_io_state(self.io_loop.WRITE)IOStream.read_until注册读完成回调尝试从缓冲中读从 socket 中读到缓冲区重复 2,3,没有数据则退出将 socket 读事件加入 poll如果缓存中数据满足条件,则直接执行 callback 并返回, 否则,保存 callback 函数下次 read 事件发生时,_handle_events 处理读事件时,再进行检测及调用 def read_until(self, delimiter, callback): “““Call callback when we read the given delimiter.””” assert not self._read_callback, “Already reading” self._read_delimiter = delimiter self._read_callback = stack_context.wrap(callback) while True: # See if we’ve already got the data from a previous read if self._read_from_buffer(): return self._check_closed() if self._read_to_buffer() == 0: break self._add_io_state(self.io_loop.READ)IOStream.read_bytes参考 read_until,读限定字节 def read_bytes(self, num_bytes, callback): “““Call callback when we read the given number of bytes.””” assert not self._read_callback, “Already reading” if num_bytes == 0: callback(””) return self._read_bytes = num_bytes self._read_callback = stack_context.wrap(callback) while True: if self._read_from_buffer(): return self._check_closed() if self._read_to_buffer() == 0: break self._add_io_state(self.io_loop.READ)IOStream.write def write(self, data, callback=None): “““Write the given data to this stream. If callback is given, we call it when all of the buffered write data has been successfully written to the stream. If there was previously buffered write data and an old write callback, that callback is simply overwritten with this new callback. "”” self._check_closed() self._write_buffer.append(data) self._add_io_state(self.io_loop.WRITE) self._write_callback = stack_context.wrap(callback) def set_close_callback(self, callback): “““Call the given callback when the stream is closed.””” self._close_callback = stack_context.wrap(callback)IOStream.close从 ioloop 移除 socket 事件关闭 socket调用关闭回调 def close(self): “““Close this stream.””” if self.socket is not None: self.io_loop.remove_handler(self.socket.fileno()) self.socket.close() self.socket = None if self._close_callback: self._run_callback(self._close_callback) def reading(self): “““Returns true if we are currently reading from the stream.””” return self._read_callback is not None def writing(self): “““Returns true if we are currently writing to the stream.””” return bool(self._write_buffer) def closed(self): return self.socket is NoneIOStream._handle_events核心回调 任何类型的 socket 事件触发 ioloop 回调_handle_events,然后在_handle_events 再进行分发 值得注意的是,IOStream 不处理连接请求的 read 事件 注意 作为服务端,默认代理的是已经建立连接的 socket# HTTPServer._handle_events# connection 为已经accept的连接stream = iostream.IOStream(connection, io_loop=self.io_loop)作为客户端,需要手动调用 IOStream.connect,连接成功后,成功回调在 write 事件中处理这个实现比较别扭 def _handle_events(self, fd, events): if not self.socket: logging.warning(“Got events for closed stream %d”, fd) return try: # 处理读事件,调用已注册回调 if events & self.io_loop.READ: self._handle_read() if not self.socket: return # 处理写事件,如果是刚建立连接,调用连接建立回调 if events & self.io_loop.WRITE: if self._connecting: self._handle_connect() self._handle_write() if not self.socket: return # 错误事件,关闭 socket if events & self.io_loop.ERROR: self.close() return state = self.io_loop.ERROR if self.reading(): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE if state != self._state: self._state = state self.io_loop.update_handler(self.socket.fileno(), self._state) except: logging.error(“Uncaught exception, closing connection.”, exc_info=True) self.close() raiseIOStream._run_callback执行回调 def _run_callback(self, callback, *args, **kwargs): try: # Use a NullContext to ensure that all StackContexts are run # inside our blanket exception handler rather than outside. with stack_context.NullContext(): callback(*args, **kwargs) except: logging.error(“Uncaught exception, closing connection.”, exc_info=True) # Close the socket on an uncaught exception from a user callback # (It would eventually get closed when the socket object is # gc’d, but we don’t want to rely on gc happening before we # run out of file descriptors) self.close() # Re-raise the exception so that IOLoop.handle_callback_exception # can see it and log the error raiseIOStream._run_callback读回调从 socket 读取数据到缓存无数据,socket 关闭检测是否满足 read_until read_bytes满足则执行对应回调 def _handle_read(self): while True: try: # Read from the socket until we get EWOULDBLOCK or equivalent. # SSL sockets do some internal buffering, and if the data is # sitting in the SSL object’s buffer select() and friends # can’t see it; the only way to find out if it’s there is to # try to read it. result = self._read_to_buffer() except Exception: self.close() return if result == 0: break else: if self._read_from_buffer(): returnIOStream._read_from_socket从 socket 读取数据 def _read_from_socket(self): “““Attempts to read from the socket. Returns the data read or None if there is nothing to read. May be overridden in subclasses. "”” try: chunk = self.socket.recv(self.read_chunk_size) except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return None else: raise if not chunk: self.close() return None return chunkIOStream._read_to_buffer从 socket 读取数据存入缓存 def _read_to_buffer(self): “““Reads from the socket and appends the result to the read buffer. Returns the number of bytes read. Returns 0 if there is nothing to read (i.e. the read returns EWOULDBLOCK or equivalent). On error closes the socket and raises an exception. "”” try: chunk = self._read_from_socket() except socket.error, e: # ssl.SSLError is a subclass of socket.error logging.warning(“Read error on %d: %s”, self.socket.fileno(), e) self.close() raise if chunk is None: return 0 self._read_buffer.append(chunk) if self._read_buffer_size() >= self.max_buffer_size: logging.error(“Reached maximum read buffer size”) self.close() raise IOError(“Reached maximum read buffer size”) return len(chunk)IOStream._read_from_buffer从缓冲中过滤数据 检测是否满足结束条件(read_until/read_bytes),满足则调用之前注册的回调 采用的是查询方式 def _read_from_buffer(self): “““Attempts to complete the currently-pending read from the buffer. Returns True if the read was completed. "”” if self._read_bytes: if self._read_buffer_size() >= self._read_bytes: num_bytes = self._read_bytes callback = self._read_callback self._read_callback = None self._read_bytes = None self._run_callback(callback, self._consume(num_bytes)) return True elif self._read_delimiter: _merge_prefix(self._read_buffer, sys.maxint) loc = self._read_buffer[0].find(self._read_delimiter) if loc != -1: callback = self._read_callback delimiter_len = len(self._read_delimiter) self._read_callback = None self._read_delimiter = None self._run_callback(callback, self._consume(loc + delimiter_len)) return True return FalseIOStream._handle_connect调用连接建立回调,并清除连接中标志 def _handle_connect(self): if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) self._connecting = FalseIOStream._handle_write写事件从缓冲区获取限定范围内数据调用 socket.send 输出数据如果数据发送我且已注册回调,调用发送完成回调 def _handle_write(self): while self._write_buffer: try: if not self._write_buffer_frozen: # On windows, socket.send blows up if given a # write buffer that’s too large, instead of just # returning the number of bytes it was able to # process. Therefore we must not call socket.send # with more than 128KB at a time. _merge_prefix(self._write_buffer, 128 * 1024) num_bytes = self.socket.send(self._write_buffer[0]) self._write_buffer_frozen = False _merge_prefix(self._write_buffer, num_bytes) self._write_buffer.popleft() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): # With OpenSSL, after send returns EWOULDBLOCK, # the very same string object must be used on the # next call to send. Therefore we suppress # merging the write buffer after an EWOULDBLOCK. # A cleaner solution would be to set # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is # not yet accessible from python # (http://bugs.python.org/issue8240) self._write_buffer_frozen = True break else: logging.warning(“Write error on %d: %s”, self.socket.fileno(), e) self.close() return if not self._write_buffer and self._write_callback: callback = self._write_callback self._write_callback = None self._run_callback(callback)IOStream._consume从读缓存消费 loc 长度的数据 def _consume(self, loc): _merge_prefix(self._read_buffer, loc) return self._read_buffer.popleft() def _check_closed(self): if not self.socket: raise IOError(“Stream is closed”)IOStream._add_io_state增加 socket 事件状态 def _add_io_state(self, state): if self.socket is None: # connection has been closed, so there can be no future events return if not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.socket.fileno(), self._state)IOStream._read_buffer_size获取读缓存中已有数据长度 def _read_buffer_size(self): return sum(len(chunk) for chunk in self._read_buffer)class SSLIOStream(IOStream): “““A utility class to write to and read from a non-blocking socket. If the socket passed to the constructor is already connected, it should be wrapped with ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs) before constructing the SSLIOStream. Unconnected sockets will be wrapped when IOStream.connect is finished. "”” def init(self, *args, **kwargs): “““Creates an SSLIOStream. If a dictionary is provided as keyword argument ssl_options, it will be used as additional keyword arguments to ssl.wrap_socket. "”” self._ssl_options = kwargs.pop(‘ssl_options’, {}) super(SSLIOStream, self).init(*args, **kwargs) self._ssl_accepting = True self._handshake_reading = False self._handshake_writing = False def reading(self): return self._handshake_reading or super(SSLIOStream, self).reading() def writing(self): return self._handshake_writing or super(SSLIOStream, self).writing() def _do_ssl_handshake(self): # Based on code from test_ssl.py in the python stdlib try: self._handshake_reading = False self._handshake_writing = False self.socket.do_handshake() except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_WANT_READ: self._handshake_reading = True return elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: self._handshake_writing = True return elif err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): return self.close() elif err.args[0] == ssl.SSL_ERROR_SSL: logging.warning(“SSL Error on %d: %s”, self.socket.fileno(), err) return self.close() raise except socket.error, err: if err.args[0] == errno.ECONNABORTED: return self.close() else: self._ssl_accepting = False super(SSLIOStream, self)._handle_connect() def _handle_read(self): if self._ssl_accepting: self._do_ssl_handshake() return super(SSLIOStream, self)._handle_read() def _handle_write(self): if self._ssl_accepting: self._do_ssl_handshake() return super(SSLIOStream, self)._handle_write() def _handle_connect(self): self.socket = ssl.wrap_socket(self.socket, do_handshake_on_connect=False, **self._ssl_options) # Don’t call the superclass’s _handle_connect (which is responsible # for telling the application that the connection is complete) # until we’ve completed the SSL handshake (so certificates are # available, etc). def _read_from_socket(self): try: # SSLSocket objects have both a read() and recv() method, # while regular sockets only have recv(). # The recv() method blocks (at least in python 2.6) if it is # called when there is nothing to read, so we have to use # read() instead. chunk = self.socket.read(self.read_chunk_size) except ssl.SSLError, e: # SSLError is a subclass of socket.error, so this except # block must come first. if e.args[0] == ssl.SSL_ERROR_WANT_READ: return None else: raise except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return None else: raise if not chunk: self.close() return None return chunkdef _merge_prefix(deque, size): “““Replace the first entries in a deque of strings with a single string of up to size bytes. >>> d = collections.deque([‘abc’, ‘de’, ‘fghi’, ‘j’]) >>> _merge_prefix(d, 5); print d deque([‘abcde’, ‘fghi’, ‘j’]) Strings will be split as necessary to reach the desired size. >>> _merge_prefix(d, 7); print d deque([‘abcdefg’, ‘hi’, ‘j’]) >>> _merge_prefix(d, 3); print d deque([‘abc’, ‘defg’, ‘hi’, ‘j’]) >>> _merge_prefix(d, 100); print d deque([‘abcdefghij’]) "”” prefix = [] remaining = size while deque and remaining > 0: chunk = deque.popleft() if len(chunk) > remaining: deque.appendleft(chunk[remaining:]) chunk = chunk[:remaining] prefix.append(chunk) remaining -= len(chunk) deque.appendleft(’’.join(prefix))def doctests(): import doctest return doctest.DocTestSuite()copyrightauthor:bigfish copyright: 许可协议 知识共享署名-非商业性使用 4.0 国际许可协议 ...

December 21, 2018 · 9 min · jiezi