taro1.安装 tarojsnpm install -g @tarojs/cli2.初始化项目taro init taro-login3.进入目录cd taro-login4.运行编译npm run dev:weapp5.修改文件 /src/app.js 代码…class App extends Component { config = { pages: [ ‘pages/user/user’, // new ‘pages/index/index’, ],6.微信登录需求如果我们需要用户一进入就取得用户的授权,以便于进行某些记录用户信息的操作,而微信又要求用户去点页面上的某个按钮才能获取信息,那怎么办呢?只能把一个按钮放在用户不能不点的地方,那就只有弹窗了。微信 wx.showModal 不能满足我们的需求,只能自己造一个,在用户第一次进来的时候弹窗,再次进来的时候则不显示。为了让这个组件具有拓展性,我们根据传入的值来修改 确认 位置按钮的属性,如果是授权的弹窗就改按钮属性为 openType=‘getUserInfo’。(摘自Taro 多端开发实现原理与项目实战)7.新建文件夹和modal.js文件/src/components/modal/modal.jsimport Taro, { Component } from ‘@tarojs/taro’import { View, Button } from ‘@tarojs/components’import ‘./modal.scss’class Modal extends Component { constructor() { super(…arguments) this.state = {} } onConfirmClick = () => { this.props.onConfirmCallback() } onCancelClick = () => { this.props.onCancelCallback() } onAuthConfirmClick = (e) => { this.props.onConfirmCallback(e.detail) } preventTouchMove = (e) => { e.stopPropagation() } render() { const { title, contentText, cancelText, confirmText, isAuth } = this.props return ( <View className=‘toplife_modal’ onTouchMove={this.preventTouchMove}> <View className=‘toplife_modal_content’> <View className=‘toplife_modal_title’>{title}</View> <View className=‘toplife_modal_text’>{contentText}</View> <View className=‘toplife_modal_btn’> <Button className=‘toplife_modal_btn_cancel’ onClick={this.onCancelClick}>{cancelText}</Button> {!isAuth ? <Button className=‘toplife_modal_btn_confirm’ onClick={this.onConfirmClick}>{confirmText}</Button> : <Button className=‘toplife_modal_btn_confirm’ openType=‘getUserInfo’ onGetUserInfo={this.onAuthConfirmClick}>授权</Button>} </View> </View> </View> ) }}Modal.defaultProps = { title: ‘’, contentText: ‘’, cancelText: ‘取消’, confirmText: ‘确定’, isAuth: false, onCancelCallback: () => { }, onConfirmCallback: () => { }}export default ModalModal 组件还算比较简单,组件的属性:字段说明title提示的标题contentText提示的描述cancelText取消按钮的文案cancelCallback取消回调的函数confirmText确认按钮的文案confirmCallback确认回调函数isAuth标记是否为授权按钮在内部设置了一个函数 preventTouchMove,其作用是弹窗出现蒙层的时候,阻止在蒙版上的滑动手势 onTouchMove。另外一个函数 authConfirmClick, 当 isAuth 为真时,确认按钮为取得个人信息的授权按钮,此时把个人信息当值传递给调用的函数。(摘自Taro 多端开发实现原理与项目实战)8.添加modal.scss文件/postcss-pxtransform rn eject enable/.toplife_modal { position: fixed; width: 100%; height: 100%; left: 0; top: 0; background-color: rgba(0, 0, 0, .8); z-index: 100; &_content { position: absolute; left: 50%; top: 50%; width: 600px; height: 320px; transform: translate(-50%, -50%); background-color: #fff; color: #232321; text-align: center; border-radius: 30px; } &_title { margin-top: 40px; font-size: 32px; } &_text { margin-top: 40px; font-size: 24px; } &_btn { position: absolute; bottom: 0; left: 0; width: 100%; height: 88px; border-top: 2px solid #eee; &_cancel { color: #8c8c8c; border-radius: 0; border: 0; border-right: 2px solid #eee; border-bottom-left-radius: 30px; } &_confirm { color: #666; border-radius: 0; border: 0; border-bottom-right-radius: 30px; } button { display: block; float: left; width: 50%; height: 88px; text-align: center; line-height: 88px; font-size: 32px; box-sizing: border-box; background-color: #fff; &::after { border: 0; } } }}9.新建文件/src/page/user/user.js,在user.js中引用该Modal组件import Taro, { Component } from ‘@tarojs/taro’;import { View, Image, Text } from ‘@tarojs/components’;import classnames from ‘classnames’import Modal from ‘../../components/modal/modal’;import { setGlobalData } from ‘../../utils/globalData’;import { getUserInfo, getIsAuth } from ‘../../utils/getUser’;class Info extends Component { config = { navigationBarTitleText: ‘TARO商城’, enablePullDownRefresh: true, backgroundTextStyle: ‘dark’, disableScroll: true } constructor() { super(…arguments) this.state = { animationClass: ‘’, showAuthModal: false, shouldIndexHidden: false, } this.env = process.env.TARO_ENV } hideAuthModal() { this.setState({ showAuthModal: false }) Taro.setStorage({ key: ‘isHomeLongHideAuthModal’, data: true }) } onProcessAuthResult = (userData) => { Taro.setStorage({ key: ‘isHomeLongHideAuthModal’, data: true }) if (userData.userInfo) { setGlobalData(‘userData’, userData) } this.setState({ showAuthModal: false }) getIsAuth() } async onPullDownRefresh() { if (this.state.shouldIndexHidden) { Taro.stopPullDownRefresh() // 停止下拉刷新 } else { await this.props.onFetchIndexList() Taro.stopPullDownRefresh() // 停止下拉刷新 } } componentDidMount() { if (this.env === ‘weapp’) { // 用类名来控制动画 setTimeout(async () => { const userData = await getUserInfo(); Taro.getStorage({ key: ‘isHomeLongHideAuthModal’, success: (res) => { const isHomeLongHideAuthModal = res.data; let showAuthModal if (!userData && !this.state.showAuthModal && !isHomeLongHideAuthModal) { showAuthModal = true } else { showAuthModal = false } this.setState({ animationClass: ‘animation’, showAuthModal }) }, fail: () => { let showAuthModal if (!userData && !this.state.showAuthModal) { showAuthModal = true } else { showAuthModal = false } this.setState({ animationClass: ‘animation’, showAuthModal }) } }) }, 1000) getIsAuth() } else if (this.env === ‘h5’ || this.env === ‘rn’) { console.log(‘h5登录’) } } render() { const { animationClass, shouldIndexHidden, showAuthModal } = this.state const { loginname, avatar_url } = this.props; const indexClassNames = classnames(‘container’, ‘index’, animationClass, { hidden: shouldIndexHidden }) return ( <View className={indexClassNames}> <View className=‘login-head’> <Image className=‘login-head-back’ src={require(’../../assets/img/loginBack.jpg’)} /> <Image className=‘login-head-head’ src={avatar_url ? avatar_url : require(’../../assets/img/head.png’)} /> {loginname ? <Text classnames=‘login-head-name’>{loginname}</Text> : null} </View> {showAuthModal && <Modal title=‘授权提示’ contentText=‘诚邀您完成授权,尊享畅游体验’ onCancelCallback={this.hideAuthModal.bind(this)} onConfirmCallback={this.onProcessAuthResult.bind(this)} isAuth />} </View> ) }}export default Info我们是如何保证这个应用只有一次授权弹窗呢? 关键代码是 Taro.setStorageSync(‘isHomeLongHideAuthModal’, true) ,如果弹出了一次,就在本地存一个标记已经弹过授权框,下一次弹窗之前可以根据此判断。至此我们完成了授权处理,但如果可以的话还是要优雅一些,在需要的时候才征求用户授权,保证用户体验。(摘自Taro 多端开发实现原理与项目实战)10.新建几个辅助文件/src/utils/globalData.jsconst globalData = {}export function setGlobalData(key, val) { globalData[key] = val}export function getGlobalData(key) { return globalData[key]}/src/utils/request.jsimport Taro from ‘@tarojs/taro’;import ‘@tarojs/async-await’;export function getJSON(url, data) { Taro.showLoading(); return Taro.request({ url: url, data: data, method: ‘GET’ }).then(result => { Taro.hideLoading(); return result; })}export function postJSON(url, data) { Taro.showLoading() return Taro.request({ header: { ‘content-type’: ‘application/json’ }, url: url, data: data, method: ‘POST’ }).then(result => { Taro.hideLoading(); return result; });}/src/constants/apiconst rootPath = ‘http://127.0.0.1:5000/v1’;const apiObject = { registerclient: rootPath + ‘/client/register’, //注册用户 getusertoken: rootPath + ‘/token’, // 登录成功之后获取用户token checkusertoken: rootPath + ‘/token/secret’, //验证用户token getuserinfo: rootPath + ‘/user’, //获取用户信息}export default apiObject;11. 新建一个登录获取token的函数/src/utils/getUser.jsimport Taro from ‘@tarojs/taro’import { getGlobalData } from ‘./globalData’import api from ‘../constants/api’;import { postJSON } from ‘../utils/request’;async function getUserInfo() { const userData = getGlobalData(‘userData’) if (userData) { return userData } try { const userData = await Taro.getUserInfo() return userData } catch (err) { console.log(err) console.log(‘微信登录或用户接口故障’) return null }}async function getIsAuth() { const loginRes = await Taro.login() let { userInfo } = await getUserInfo() let isAuth = false if (userInfo) { // 使用微信注册新用户 let result = await postJSON(api.registerclient, { “avatar”: userInfo.avatarUrl, “sex”: userInfo.gender, “nickname”: userInfo.nickName, “account”: loginRes.code, “type”: 200 }); if (result.data.error_code == 0) { // 登录用户,获取token,缓存到前端 const tokenRes = await Taro.login() let auth_token = await postJSON(api.getusertoken, { “account”: tokenRes.code, “type”: 200 }) if (auth_token.statusCode == 201) { Taro.setStorage({ key: ’token’, data: auth_token.data.token })// 设置到缓存 Taro.showToast({ title: ‘授权成功’ }) userInfo.isAuth = true isAuth = true } } else { Taro.showToast({ title: ‘授权失败,请稍后再试’, icon: ’none’ }) } } else { userInfo = { isAuth: false } } console.log(‘isAuth: ‘, isAuth) return isAuth}export { getUserInfo, getIsAuth}flask1.文件目录├── app│ ├── init.py│ ├── api│ │ ├── init.py│ │ └── v1│ │ ├── init.py│ │ ├── client.py│ │ ├── token.py│ │ └── user.py│ ├── apps.py│ ├── config│ │ ├── secure.py│ │ └── settings.py│ ├── libs│ │ ├── enums.py│ │ ├── error.py│ │ ├── error_code.py│ │ ├── format_time.py│ │ ├── get_openid.py│ │ ├── redprint.py│ │ ├── scope.py│ │ └── token_auth.py│ ├── models│ │ ├── init.py│ │ ├── pycache│ │ │ ├── init.cpython-36.pyc│ │ │ ├── base.cpython-36.pyc│ │ │ └── user.cpython-36.pyc│ │ ├── base.py│ │ └── user.py│ └── validators│ ├── init.py│ ├── base.py│ └── forms.py├── manage.py└── requirements.txt2.新建虚拟环境略过requirements.txtFlaskFlask-SQLAlchemypsycopg2-binarycymysqlFlask-Testingcoverageflake8flask-debugtoolbarflask-corsflask-migrateflask-bcryptpyjwtgunicornrequestsflask-httpauthflask-wtf3. 新建app目录和__init.py文件和apps.py# File: /app/apps.py# -- coding: utf-8 --from flask import Flask as _Flaskfrom flask.json import JSONEncoder as _JSONEncoderfrom app.libs.error_code import ServerErrorfrom datetime import dateclass JSONEncoder(_JSONEncoder): def default(self, o): if hasattr(o, ‘keys’) and hasattr(o, ‘getitem’): return dict(o) if isinstance(o, date): return o.strftime(’%Y-%m-%d’) raise ServerError()class Flask(_Flask): json_encoder = JSONEncoder# File: /app/init.py# -- coding: utf-8 --from .apps import Flaskfrom flask_debugtoolbar import DebugToolbarExtensionfrom flask_cors import CORSfrom flask_migrate import Migratefrom flask_bcrypt import Bcryptfrom app.models.base import db# instantiate 实例化toolbar = DebugToolbarExtension()migrate = Migrate(db=db)bcrypt = Bcrypt()def create_app(): # instantiate the app app = Flask(name) # enable CORS CORS(app) # set config app.config.from_object(‘app.config.settings’) app.config.from_object(‘app.config.secure’) # set up extensions toolbar.init_app(app) migrate.init_app(app, db) bcrypt.init_app(app) # register blueprints register_blueprints(app) register_plugin(app) # shell context for flask cli @app.shell_context_processor def ctx(): return {‘app’: app, ‘db’: db} return appdef register_blueprints(app): from app.api.v1 import create_blueprint_v1 app.register_blueprint(create_blueprint_v1(), url_prefix=’/v1’)def register_plugin(app): db.init_app(app) with app.app_context(): db.create_all()4.新建配置文件/app/config/目录,在这个目录下新建两个文件settings.py和secure.py# File: /app/config/settings.py# -- coding: utf-8 --# TOKEN_EXPIRATION = 30 * 24 * 3600DEBUG = ’true’TOKEN_EXPIRATION_DAYS = 30TOKEN_EXPIRATION_SECONDS = 0# encryption的复杂程度,默认值为12BCRYPT_LOG_ROUNDS = 4# File: /app/config/secure.py# -- coding: utf-8 --SQLALCHEMY_DATABASE_URI = \ ‘mysql+cymysql://root:root1234@localhost/flask-rest’SECRET_KEY = ‘‘SQLALCHEMY_TRACK_MODIFICATIONS = TrueMINA_APP = { ‘AppID’: ‘’, ‘AppSecret’: ‘’}5.在根目录下新建一个 manage.py#File: /manage.py# -- coding: utf-8 --from werkzeug.exceptions import HTTPExceptionfrom app import create_appfrom app.libs.error import APIExceptionfrom app.libs.error_code import ServerErrorapp = create_app()@app.errorhandler(Exception)def framework_error(e): “““全局拦截异常””” if isinstance(e, APIException): return e if isinstance(e, HTTPException): code = e.code msg = e.description error_code = 1007 return APIException(msg, code, error_code) else: if app.config[‘DEBUG’]: return ServerError() else: raise eif name == ‘main’: app.run()6.配置全局错误处理新建文件夹 /app/libs/#File: /app/libs/error.py# -- coding: utf-8 --“““自定义错误文件”““from flask import request, jsonfrom werkzeug.exceptions import HTTPExceptionclass APIException(HTTPException): “““自定义api请求错误,返回的json格式””” code = 500 msg = ‘抱歉,后台发生了错误 ( ̄︶ ̄)!’ error_code = 999 def init(self, msg=None, code=None, error_code=None, headers=None): if code: self.code = code if error_code: self.error_code = error_code if msg: self.msg = msg super(APIException, self).init(msg, None) def get_body(self, environ=None): body = dict( msg=self.msg, error_code=self.error_code, request=request.method + ’ ’ + self.get_url_no_param() ) text = json.dumps(body) return text def get_headers(self, environ=None): return [(‘Content-Type’, ‘application/json’)] @staticmethod def get_url_no_param(): full_path = str(request.full_path) main_path = full_path.split(’?’) return main_path[0]#File: /app/libs/error_code.py# -- coding: utf-8 --from werkzeug.exceptions import HTTPExceptionfrom app.libs.error import APIExceptionclass Success(APIException): code = 201 msg = ‘success’ error_code = 0class DeleteSuccess(Success): code = 202 error_code = 1class ServerError(APIException): code = 500 msg = ‘抱歉,后台发生了错误 ( ̄︶ ̄)!’ error_code = 999class ClientTypeError(APIException): code = 400 msg = ‘未检测到客户端类型’ error_code = 1006class ParameterException(APIException): code = 400 msg = ‘无效参数’ error_code = 1000class NotFound(APIException): code = 404 msg = ‘没有找到对应的资源 O__O…’ error_code = 1001class AuthFailed(APIException): code = 401 error_code = 1005 msg = ‘认证失败’class Forbidden(APIException): code = 403 error_code = 1004 msg = ‘禁止访问,不在对应权限内’class SingleLogin(APIException): code = 400 error_code = 2002 msg = ‘请重新登录’class DuplicateAct(APIException): code = 400 error_code = 2001 msg = ‘请勿重复操作'7.自定义红图#File: /app/libs/redprint.py# -- coding: utf-8 --class Redprint: def init(self, name): self.name = name self.mound = [] def route(self, rule, **options): def decorator(f): self.mound.append((f, rule, options)) return f return decorator def register(self, bp, url_prefix=None): if url_prefix is None: url_prefix = ‘/’ + self.name for f, rule, options in self.mound: endpoint = self.name + ‘+’ + \ options.pop(“endpoint”, f.name) bp.add_url_rule(url_prefix + rule, endpoint, f, **options)8.新建/app/api/v1/ 文件夹#File: /app/api/v1/init.py# -- coding: utf-8 --from flask import Blueprintfrom app.api.v1 import user, client, tokendef create_blueprint_v1(): bp_v1 = Blueprint(‘v1’, name) user.api.register(bp_v1) client.api.register(bp_v1) token.api.register(bp_v1) return bp_v19.新建注册接口client.py#File: /app/api/v1/client.py# -- coding: utf-8 --from app.libs.error_code import Success, ParameterExceptionfrom app.libs.redprint import Redprintfrom app.models.user import Userfrom app.validators.forms import ClientForm, UserEmailForm, MinaFormfrom app.libs.enums import ClientTypeEnumfrom app.libs.get_openid import get_openidapi = Redprint(‘client’)@api.route(’/register’, methods=[‘POST’])def create_client(): form = ClientForm().validate_for_api() promise = { ClientTypeEnum.USER_EMAIL: __register_user_by_email, ClientTypeEnum.USER_MINA: __register_user_by_mina, } promiseform.type.data return Success()def __register_user_by_email(): form = UserEmailForm().validate_for_api() User.register_by_email(form.nickname.data, form.account.data, form.secret.data)def __register_user_by_mina(): form = MinaForm().validate_for_api() account = get_openid(form.account.data) if account is None: raise ParameterException else: User.register_by_mina(form.nickname.data, account, form.sex.data, form.avatar.data)10.登录apitoken.py#File: /app/api/v1/token.py# -- coding: utf-8 --import jwtimport datetimefrom flask import current_app, jsonifyfrom app.libs.enums import ClientTypeEnumfrom app.libs.error_code import AuthFailedfrom app.libs.redprint import Redprintfrom app.models.user import Userfrom app.validators.forms import ClientForm, TokenFormfrom app.libs.format_time import get_format_timestampapi = Redprint(’token’)@api.route(’’, methods=[‘POST’])def get_token(): “““登录功能,认证成功返回token””” form = ClientForm().validate_for_api() promise = { ClientTypeEnum.USER_EMAIL: User.verify, ClientTypeEnum.USER_MINA: User.mina_login, } identity = promise[ClientTypeEnum(form.type.data)]( form.account.data, form.secret.data ) # Token token = generate_auth_token(identity[‘uid’], form.type.data, identity[’login_time’], identity[‘scope’]) t = {’token’: token.decode(‘ascii’)} return jsonify(t), 201@api.route(’/secret’, methods=[‘POST’])def get_token_info(): “““获取令牌信息””” form = TokenForm().validate_for_api() auth_token = form.token.data try: data = jwt.decode(auth_token, current_app.config[‘SECRET_KEY’]) except jwt.ExpiredSignatureError: raise AuthFailed(msg=‘token is expired’, error_code=1003) except jwt.InvalidTokenError: raise AuthFailed(msg=‘token is invalid’, error_code=1002) r = { ‘scope’: data[‘scope’], ‘create_at’: get_format_timestamp(data[‘iat’]), ’expire_in’: get_format_timestamp(data[’exp’]), ‘uid’: data[‘uid’], ’login_time’: get_format_timestamp(data[’login_time’]) } return jsonify(r)def generate_auth_token(uid, ac_type, login_time, scope=None): “““生成令牌””” try: payload = { ’exp’: datetime.datetime.utcnow() + datetime.timedelta( days=current_app.config[‘TOKEN_EXPIRATION_DAYS’], seconds=current_app.config[‘TOKEN_EXPIRATION_SECONDS’], ), ‘iat’: datetime.datetime.utcnow(), ‘uid’: uid, ’type’: ac_type.value, ’login_time’: login_time, ‘scope’: scope, } return jwt.encode( payload, current_app.config[‘SECRET_KEY’], algorithm=‘HS256’ ) except Exception as e: return e11.用户接口user.py#File: /app/api/v1/user.py# -- coding: utf-8 --from flask import jsonify, gfrom app.libs.error_code import DeleteSuccessfrom app.libs.redprint import Redprintfrom app.libs.token_auth import authfrom app.models.base import dbfrom app.models.user import Userapi = Redprint(‘user’)@api.route(’/<int:uid>’, methods=[‘GET’])@auth.login_requireddef super_get_user(uid): user = User.query.filter_by(id=uid).first_or_404() return jsonify(user)@api.route(’’, methods=[‘GET’])@auth.login_requireddef get_user(): uid = g.user.uid user = User.query.filter_by(id=uid).first_or_404() return jsonify(user)@api.route(’/<int:uid>’, methods=[‘DELETE’])def super_delete_user(uid): with db.auto_commit(): user = User.query.filter_by(id=uid).first_or_404() user.delete() return DeleteSuccess()@api.route(’’, methods=[‘DELETE’])@auth.login_requireddef delete_user(): uid = g.user.uid with db.auto_commit(): user = User.query.filter_by(id=uid).first_or_404() user.delete() return DeleteSuccess()@api.route(’’, methods=[‘PUT’])def update_user(): return ‘update'12.新建用户models#File: /app/models/base.py# -- coding: utf-8 --from datetime import datetimefrom flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQueryfrom sqlalchemy import inspect, Column, Integer, SmallInteger, ormfrom contextlib import contextmanagerfrom app.libs.error_code import NotFoundclass SQLAlchemy(_SQLAlchemy): @contextmanager def auto_commit(self): try: yield self.session.commit() except Exception as e: db.session.rollback() raise eclass Query(BaseQuery): def filter_by(self, **kwargs): if ‘status’ not in kwargs.keys(): kwargs[‘status’] = 1 return super(Query, self).filter_by(**kwargs) def get_or_404(self, ident): rv = self.get(ident) if not rv: raise NotFound() return rv def first_or_404(self): rv = self.first() if not rv: raise NotFound() return rvdb = SQLAlchemy(query_class=Query)class Base(db.Model): abstract = True create_time = Column(Integer) status = Column(SmallInteger, default=1) def init(self): self.create_time = int(datetime.now().timestamp()) def getitem(self, item): return getattr(self, item) @property def create_datetime(self): if self.create_time: return datetime.fromtimestamp(self.create_time) else: return None def set_attrs(self, attrs_dict): for key, value in attrs_dict.items(): if hasattr(self, key) and key != ‘id’: setattr(self, key, value) def delete(self): “““删除用户,注销用户””” self.status = 0 def active(self): “““激活用户””” self.status = 1 def update(self): “““更新数据库的表内容””” try: db.session.commit() except Exception as e: db.session.rollback() return str(e) def keys(self): return self.fields def hide(self, keys): for key in keys: self.fields.remove(key) return self def append(self, keys): for key in keys: self.fields.append(key) return selfclass MixinJSONSerializer: @orm.reconstructor def init_on_load(self): self._fields = [] # self._include = [] self._exclude = [] self._set_fields() self.__prune_fields() def _set_fields(self): pass def __prune_fields(self): columns = inspect(self.class).columns if not self._fields: all_columns = set(columns.keys()) self._fields = list(all_columns - set(self._exclude)) def hide(self, args): for key in args: self._fields.remove(key) return self def keys(self): return self._fields def getitem(self, key): return getattr(self, key)#File: /app/models/user.py# -- coding: utf-8 --from datetime import datetimefrom flask import current_appfrom sqlalchemy import Column, Integer, String, SmallIntegerfrom app import bcryptfrom app.libs.error_code import AuthFailedfrom app.models.base import Base, dbfrom app.libs.format_time import get_current_timestampfrom app.libs.get_openid import get_openidfrom app.libs.error_code import ParameterExceptionclass User(Base): id = Column(Integer, primary_key=True) nickname = Column(String(24), unique=True) email = Column(String(24), unique=True) mobile = Column(String(11), unique=True) sex = Column(Integer, default=0) # 1男2女 avatar = Column(String(200)) # 头像 register_ip = Column(String(100)) # 注册ip auth = Column(SmallInteger, default=1) # 权限 openid = Column(String(80), unique=True) _password = Column(‘password’, String(100)) login_time = Column(Integer, default=int(datetime.now().timestamp())) @property def login_datetime(self): if self.login_time: return datetime.fromtimestamp(self.login_time) else: return None def keys(self): return [‘id’, ’nickname’, ’email’, ‘auth’] @property def password(self): return self._password @password.setter def password(self, raw): self._password = bcrypt.generate_password_hash( raw, current_app.config[‘BCRYPT_LOG_ROUNDS’]).decode(‘utf-8’) @staticmethod def register_by_email(nickname, account, secret): “““通过邮箱注册””” with db.auto_commit(): user = User() user.nickname = nickname user.email = account user.password = secret db.session.add(user) @staticmethod def verify(email, password): “““通过邮箱登录””” user = User.query.filter_by(email=email).first_or_404() if not user.check_password(password): raise AuthFailed() scope = ‘AdminScope’ if user.auth == 2 else ‘UserScope’ login_time = get_current_timestamp() user.login_time = login_time User.update(User) return {‘uid’: user.id, ‘scope’: scope, ’login_time’: login_time} def check_password(self, raw): if not self._password: return False return bcrypt.check_password_hash(self._password, raw) @staticmethod def register_by_mina(nickname, account, sex, avatar): “““通过小程序注册””” with db.auto_commit(): user = User() user.nickname = nickname user.openid = account user.sex = sex user.avatar = avatar db.session.add(user) @staticmethod def mina_login(account, secret): “““通过小程序登录””” openid = get_openid(account) # 通过code来来获取openid if openid is None: raise ParameterException user = User.query.filter_by(openid=openid).first_or_404() scope = ‘AdminScope’ if user.auth == 2 else ‘UserScope’ login_time = get_current_timestamp() user.login_time = login_time User.update(User) return {‘uid’: user.id, ‘scope’: scope, ’login_time’: login_time}13.添加自定义的函数枚举登录类型# File: /app/libs/enums.py# -- coding: utf-8 --from enum import Enumclass ClientTypeEnum(Enum): USER_EMAIL = 100 USER_MOBILE = 101 # 微信小程序 USER_MINA = 200 # 微信公众号 USER_WX = 201 14.时间辅助函数#File: /app/libs/format_time.py# -- coding: utf-8 --import datetimedef get_current_date(): “““获取当前时间””” return datetime.datetime.now()def get_current_timestamp(): “““获取当前时间的时间戳””” return int(datetime.datetime.now().timestamp())def get_format_date(date=None, format_time="%Y-%m-%d %H:%M:%S”): “““获取格式化时间””” if date is None: date = datetime.datetime.now() return date.strftime(format_time)def get_format_timestamp(date=None, format_time="%Y-%m-%d %H:%M:%S”): “““格式化时间戳””” if date is None: date = datetime.datetime.now() return datetime.datetime.fromtimestamp(date).strftime(format_time)15.获取微信openid的函数#File: /app/libs/get_openid.py# -- coding: utf-8 --import requestsimport jsonfrom flask import current_appdef get_openid(code): api = ‘https://api.weixin.qq.com/sns/jscode2session' params = ‘appid={0}&secret={1}&js_code={2}&grant_type=authorization_code’ \ .format(current_app.config[‘MINA_APP’][‘AppID’], current_app.config[‘MINA_APP’][‘AppSecret’], code) url = api + ‘?’ + params response = requests.get(url=url) res = json.loads(response.text) openid = None if ‘openid’ in res: openid = res[‘openid’] return openid16.scope.py权限管理函数#File: /app/libs/scope.py# -- coding: utf-8 --class Scope: allow_api = [] allow_module = [] forbidden = [] def add(self, other): “““重载加号运算符””” self.allow_api = self.allow_api + other.allow_api self.allow_api = list(set(self.allow_api)) self.allow_module = self.allow_module + other.allow_module self.allow_module = list(set(self.allow_module)) self.forbidden = self.forbidden + other.forbidden self.forbidden = list(set(self.forbidden)) return selfclass AdminScope(Scope): allow_module = [‘v1.user’] def init(self): passclass UserScope(Scope): forbidden = [‘v1.user+super_get_user’, ‘v1.user+super_delete_user’] def init(self): self + AdminScope()def is_in_scope(scope, endpoint): # 把类名的字符串实例化 scope = globals()scope splits = endpoint.split(’+’) red_name = splits[0] if endpoint in scope.forbidden: return False if endpoint in scope.allow_api: return True if red_name in scope.allow_module: return True else: return False17.jwt生成token和验证token#File: /app/libs/token_auth.py# -- coding: utf-8 --import jwtfrom collections import namedtuplefrom flask import current_app, g, requestfrom flask_httpauth import HTTPBasicAuthfrom app.models.user import User as Userfrom app.libs.scope import is_in_scopefrom app.libs.error_code import AuthFailed, Forbidden, SingleLoginauth = HTTPBasicAuth()User = namedtuple(‘User’, [‘uid’, ‘ac_type’, ‘scope’, ’login_time’])@auth.verify_passworddef verify_password(token, password): user_info = verify_auth_token(token) if not user_info: return False else: g.user = user_info return Truedef verify_auth_token(token): try: data = jwt.decode(token, current_app.config[‘SECRET_KEY’]) except jwt.ExpiredSignatureError: raise AuthFailed(msg=‘token is expired’, error_code=1003) except jwt.InvalidTokenError: raise AuthFailed(msg=‘token is invalid’, error_code=1002) uid = data[‘uid’] ac_type = data[’type’] scope = data[‘scope’] login_time = data[’login_time’] user = User.query.filter_by(id=uid).first_or_404() if login_time != user.login_time: raise SingleLogin() # request 视图函数 allow = is_in_scope(scope, request.endpoint) if not allow: raise Forbidden() return User(uid, ac_type, scope, login_time)18.验证函数validators#File: /app/validators/base.py# -- coding: utf-8 --from flask import requestfrom wtforms import Formfrom app.libs.error_code import ParameterExceptionclass BaseForm(Form): def init(self): data = request.get_json(silent=True) args = request.args.to_dict() super(BaseForm, self).init(data=data, **args) def validate_for_api(self): valid = super(BaseForm, self).validate() if not valid: raise ParameterException(msg=self.errors) return self#File: /app/validators/forms.py# -- coding: utf-8 --from wtforms import StringField, IntegerFieldfrom wtforms.validators import DataRequired, length, Email, Regexpfrom wtforms import ValidationErrorfrom app.libs.enums import ClientTypeEnumfrom app.models.user import Userfrom app.validators.base import BaseForm as Formclass ClientForm(Form): account = StringField(validators=[ DataRequired(message=‘不允许为空’), length(min=5, max=32)]) secret = StringField() type = IntegerField(validators=[DataRequired()]) def validate_type(self, value): try: client = ClientTypeEnum(value.data) except ValueError as e: raise e self.type.data = clientclass UserEmailForm(ClientForm): account = StringField(validators=[Email(message=‘invalidate email’)]) secret = StringField(validators=[ DataRequired(), # password can only include letters , numbers and “” Regexp(r’^[A-Za-z0-9&$#@]{6,22}$’) ]) nickname = StringField(validators=[DataRequired(), length(min=2, max=22)]) def validate_account(self, value): if User.query.filter_by(email=value.data).first(): raise ValidationError()class TokenForm(Form): token = StringField(validators=[DataRequired()])class MinaForm(Form): account = StringField(validators=[ DataRequired(message=‘不允许为空’), length(min=10, max=80)]) nickname = StringField(validators=[DataRequired()]) sex = IntegerField(validators=[DataRequired()]) avatar = StringField(validators=[DataRequired()]) type = IntegerField(validators=[DataRequired()]) def validate_type(self, value): try: client = ClientTypeEnum(value.data) except ValueError as e: raise e self.type.data = client19. error_code.mderror_codemsg0创建成功1删除成功999未知错误 - 后台发生了错误1000无效参数1001没有找到对应的资源1002token is invalid1003token is expired1004禁止访问,不在对应权限内1005认证失败1006未检测到客户端类型2001请勿重复操作2002请重新登录20. jwt登录图片Flask + PyJWT 实现基于Json Web Token的用户认证授权endpointHTTP MethodAuthenticated?Resultjson Body/v1/client/registerPOSTNO注册用户{“account”:“666@qq.com”,“secret”:“123456”,“type”:100,“nickname”:“666”}/v1/tokenPOSTNO获取token{“account”:“666@qq.com”,“secret”:“123456”,“type”:100,“nickname”:“666”}/v1/userGETNO用户详情空/v1/user/2GETYES管理员获取用户详情空/v1/token/secretPOSTNOtoken详情{“token”:”"}通过 jwt 的 paylod 携带 login_time,和数据库 User 表中的 login_time 进行匹配作为单点登录修改数据库cd usersflask db migrateflask db upgrade运行结果注册成功注册失败登录成功登录失败成功获取用户信息用户重新登录,token变更,原token无法获取用户信息不带token请求,无法获取用户信息token过期,无法获取用户信息有权限获取其他用户信息无权限获取其他用户信息学习资料:慕课网-Python Flask构建可扩展的RESTful API慕课网-掌握Taro多端框架 快速上手小程序/H5开发Taro 多端开发实现原理与项目实战
...