乐趣区

微信小程序端用户授权处理

taro
1. 安装 tarojs
npm install -g @tarojs/cli
2. 初始化项目
taro init taro-login
3. 进入目录
cd taro-login
4. 运行编译
npm run dev:weapp
5. 修改文件 /src/app.js 代码

class App extends Component {

config = {
pages: [
‘pages/user/user’, // new
‘pages/index/index’,
],
6. 微信登录需求
如果我们需要用户一进入就取得用户的授权,以便于进行某些记录用户信息的操作,而微信又要求用户去点页面上的某个按钮才能获取信息,那怎么办呢?只能把一个按钮放在用户不能不点的地方,那就只有弹窗了。微信 wx.showModal 不能满足我们的需求,只能自己造一个,在用户第一次进来的时候弹窗,再次进来的时候则不显示。为了让这个组件具有拓展性,我们根据传入的值来修改 确认 位置按钮的属性,如果是授权的弹窗就改按钮属性为 openType=’getUserInfo’。(摘自 Taro 多端开发实现原理与项目实战)
7. 新建文件夹和 modal.js 文件 /src/components/modal/modal.js

import Taro, {Component} from ‘@tarojs/taro’
import {View, Button} from ‘@tarojs/components’

import ‘./modal.scss’

class Modal extends Component {
constructor() {
super(…arguments)
this.state = {}
}

onConfirmClick = () => {
this.props.onConfirmCallback()
}

onCancelClick = () => {
this.props.onCancelCallback()
}

onAuthConfirmClick = (e) => {
this.props.onConfirmCallback(e.detail)
}

preventTouchMove = (e) => {
e.stopPropagation()
}

render() {
const {title, contentText, cancelText, confirmText, isAuth} = this.props
return (
<View className=’toplife_modal’ onTouchMove={this.preventTouchMove}>
<View className=’toplife_modal_content’>
<View className=’toplife_modal_title’>{title}</View>
<View className=’toplife_modal_text’>{contentText}</View>
<View className=’toplife_modal_btn’>
<Button className=’toplife_modal_btn_cancel’ onClick={this.onCancelClick}>{cancelText}</Button>
{!isAuth ?
<Button className=’toplife_modal_btn_confirm’ onClick={this.onConfirmClick}>{confirmText}</Button> :
<Button className=’toplife_modal_btn_confirm’ openType=’getUserInfo’ onGetUserInfo={this.onAuthConfirmClick}> 授权 </Button>}
</View>
</View>
</View>
)
}
}

Modal.defaultProps = {
title: ”,
contentText: ”,
cancelText: ‘ 取消 ’,
confirmText: ‘ 确定 ’,
isAuth: false,
onCancelCallback: () => {},
onConfirmCallback: () => {}
}

export default Modal
Modal 组件还算比较简单,组件的属性:

字段
说明

title
提示的标题

contentText
提示的描述

cancelText
取消按钮的文案

cancelCallback
取消回调的函数

confirmText
确认按钮的文案

confirmCallback
确认回调函数

isAuth
标记是否为授权按钮

在内部设置了一个函数 preventTouchMove,其作用是弹窗出现蒙层的时候,阻止在蒙版上的滑动手势 onTouchMove。另外一个函数 authConfirmClick,当 isAuth 为真时,确认按钮为取得个人信息的授权按钮,此时把个人信息当值传递给调用的函数。(摘自 Taro 多端开发实现原理与项目实战)
8. 添加 modal.scss 文件
/*postcss-pxtransform rn eject enable*/

.toplife_modal {
position: fixed;
width: 100%;
height: 100%;
left: 0;
top: 0;
background-color: rgba(0, 0, 0, .8);
z-index: 100;

&_content {
position: absolute;
left: 50%;
top: 50%;
width: 600px;
height: 320px;
transform: translate(-50%, -50%);
background-color: #fff;
color: #232321;
text-align: center;
border-radius: 30px;
}

&_title {
margin-top: 40px;
font-size: 32px;
}

&_text {
margin-top: 40px;
font-size: 24px;
}

&_btn {
position: absolute;
bottom: 0;
left: 0;
width: 100%;
height: 88px;
border-top: 2px solid #eee;

&_cancel {
color: #8c8c8c;
border-radius: 0;
border: 0;
border-right: 2px solid #eee;
border-bottom-left-radius: 30px;
}

&_confirm {
color: #666;
border-radius: 0;
border: 0;
border-bottom-right-radius: 30px;
}

button {
display: block;
float: left;
width: 50%;
height: 88px;
text-align: center;
line-height: 88px;
font-size: 32px;
box-sizing: border-box;
background-color: #fff;

&::after {
border: 0;
}
}
}
}

9. 新建文件 /src/page/user/user.js, 在 user.js 中引用该 Modal 组件
import Taro, {Component} from ‘@tarojs/taro’;
import {View, Image, Text} from ‘@tarojs/components’;
import classnames from ‘classnames’
import Modal from ‘../../components/modal/modal’;
import {setGlobalData} from ‘../../utils/globalData’;
import {getUserInfo, getIsAuth} from ‘../../utils/getUser’;

class Info extends Component {
config = {
navigationBarTitleText: ‘TARO 商城 ’,
enablePullDownRefresh: true,
backgroundTextStyle: ‘dark’,
disableScroll: true
}

constructor() {
super(…arguments)
this.state = {
animationClass: ”,
showAuthModal: false,
shouldIndexHidden: false,
}
this.env = process.env.TARO_ENV
}
hideAuthModal() {
this.setState({
showAuthModal: false
})
Taro.setStorage({key: ‘isHomeLongHideAuthModal’, data: true})
}

onProcessAuthResult = (userData) => {
Taro.setStorage({key: ‘isHomeLongHideAuthModal’, data: true})
if (userData.userInfo) {
setGlobalData(‘userData’, userData)
}
this.setState({
showAuthModal: false
})
getIsAuth()
}

async onPullDownRefresh() {
if (this.state.shouldIndexHidden) {
Taro.stopPullDownRefresh() // 停止下拉刷新
} else {
await this.props.onFetchIndexList()
Taro.stopPullDownRefresh() // 停止下拉刷新
}
}

componentDidMount() {
if (this.env === ‘weapp’) {
// 用类名来控制动画
setTimeout(async () => {
const userData = await getUserInfo();
Taro.getStorage({
key: ‘isHomeLongHideAuthModal’,
success: (res) => {
const isHomeLongHideAuthModal = res.data;
let showAuthModal
if (!userData && !this.state.showAuthModal && !isHomeLongHideAuthModal) {
showAuthModal = true
} else {
showAuthModal = false
}
this.setState({
animationClass: ‘animation’,
showAuthModal
})
},
fail: () => {
let showAuthModal
if (!userData && !this.state.showAuthModal) {
showAuthModal = true
} else {
showAuthModal = false
}
this.setState({
animationClass: ‘animation’,
showAuthModal
})
}
})
}, 1000)
getIsAuth()
} else if (this.env === ‘h5’ || this.env === ‘rn’) {
console.log(‘h5 登录 ’)
}
}
render() {
const {animationClass, shouldIndexHidden, showAuthModal} = this.state
const {loginname, avatar_url} = this.props;
const indexClassNames = classnames(‘container’, ‘index’, animationClass, {
hidden: shouldIndexHidden
})
return (
<View className={indexClassNames}>
<View className=’login-head’>
<Image className=’login-head-back’
src={require(‘../../assets/img/loginBack.jpg’)}
/>
<Image className=’login-head-head’
src={avatar_url ? avatar_url : require(‘../../assets/img/head.png’)}
/>
{loginname ? <Text classnames=’login-head-name’>{loginname}</Text> : null}
</View>
{showAuthModal && <Modal
title=’ 授权提示 ’
contentText=’ 诚邀您完成授权,尊享畅游体验 ’
onCancelCallback={this.hideAuthModal.bind(this)}
onConfirmCallback={this.onProcessAuthResult.bind(this)}
isAuth
/>}
</View>
)
}
}

export default Info
我们是如何保证这个应用只有一次授权弹窗呢?关键代码是 Taro.setStorageSync(‘isHomeLongHideAuthModal’, true),如果弹出了一次,就在本地存一个标记已经弹过授权框,下一次弹窗之前可以根据此判断。至此我们完成了授权处理,但如果可以的话还是要优雅一些,在需要的时候才征求用户授权,保证用户体验。(摘自 Taro 多端开发实现原理与项目实战)

10. 新建几个辅助文件
/src/utils/globalData.js
const globalData = {}

export function setGlobalData(key, val) {
globalData[key] = val
}

export function getGlobalData(key) {
return globalData[key]
}
/src/utils/request.js
import Taro from ‘@tarojs/taro’;
import ‘@tarojs/async-await’;

export function getJSON(url, data) {
Taro.showLoading();
return Taro.request({url: url, data: data, method: ‘GET’}).then(result => {
Taro.hideLoading();
return result;
})
}

export function postJSON(url, data) {
Taro.showLoading()
return Taro.request({
header: {‘content-type’: ‘application/json’},
url: url,
data: data,
method: ‘POST’
}).then(result => {
Taro.hideLoading();
return result;
});

}
/src/constants/api
const rootPath = ‘http://127.0.0.1:5000/v1’;
const apiObject = {
registerclient: rootPath + ‘/client/register’, // 注册用户
getusertoken: rootPath + ‘/token’, // 登录成功之后获取用户 token
checkusertoken: rootPath + ‘/token/secret’, // 验证用户 token
getuserinfo: rootPath + ‘/user’, // 获取用户信息
}
export default apiObject;
11. 新建一个登录获取 token 的函数
/src/utils/getUser.js
import Taro from ‘@tarojs/taro’
import {getGlobalData} from ‘./globalData’
import api from ‘../constants/api’;
import {postJSON} from ‘../utils/request’;

async function getUserInfo() {
const userData = getGlobalData(‘userData’)
if (userData) {
return userData
}
try {
const _userData = await Taro.getUserInfo()
return _userData
} catch (err) {
console.log(err)
console.log(‘ 微信登录或用户接口故障 ’)
return null
}
}

async function getIsAuth() {
const loginRes = await Taro.login()
let {userInfo} = await getUserInfo()
let isAuth = false
if (userInfo) {

// 使用微信注册新用户
let result = await postJSON(api.registerclient, {
“avatar”: userInfo.avatarUrl,
“sex”: userInfo.gender,
“nickname”: userInfo.nickName,
“account”: loginRes.code,
“type”: 200
});
if (result.data.error_code == 0) {

// 登录用户,获取 token, 缓存到前端
const tokenRes = await Taro.login()
let auth_token = await postJSON(api.getusertoken, {
“account”: tokenRes.code,
“type”: 200
})
if (auth_token.statusCode == 201) {
Taro.setStorage({key: ‘token’, data: auth_token.data.token})// 设置到缓存
Taro.showToast({title: ‘ 授权成功 ’})
userInfo.isAuth = true
isAuth = true
}
} else {
Taro.showToast({title: ‘ 授权失败,请稍后再试 ’, icon: ‘none’})
}

} else {
userInfo = {
isAuth: false
}
}
console.log(‘isAuth: ‘, isAuth)
return isAuth
}

export {
getUserInfo,
getIsAuth
}
flask
1. 文件目录
├── app
│   ├── __init__.py
│   ├── api
│   │   ├── __init__.py
│   │   └── v1
│   │   ├── __init__.py
│   │   ├── client.py
│   │   ├── token.py
│   │   └── user.py
│   ├── apps.py
│   ├── config
│   │   ├── secure.py
│   │   └── settings.py
│   ├── libs
│   │   ├── enums.py
│   │   ├── error.py
│   │   ├── error_code.py
│   │   ├── format_time.py
│   │   ├── get_openid.py
│   │   ├── redprint.py
│   │   ├── scope.py
│   │   └── token_auth.py
│   ├── models
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-36.pyc
│   │   │   ├── base.cpython-36.pyc
│   │   │   └── user.cpython-36.pyc
│   │   ├── base.py
│   │   └── user.py
│   └── validators
│   ├── __init__.py
│   ├── base.py
│   └── forms.py
├── manage.py
└── requirements.txt
2. 新建虚拟环境略过
requirements.txt
Flask
Flask-SQLAlchemy
psycopg2-binary
cymysql
Flask-Testing
coverage
flake8
flask-debugtoolbar
flask-cors
flask-migrate
flask-bcrypt
pyjwt
gunicorn
requests
flask-httpauth
flask-wtf
3. 新建 app 目录和__init__.py 文件和 apps.py

# File: /app/apps.py
# -*- coding: utf-8 -*-
from flask import Flask as _Flask
from flask.json import JSONEncoder as _JSONEncoder

from app.libs.error_code import ServerError
from datetime import date

class JSONEncoder(_JSONEncoder):
def default(self, o):
if hasattr(o, ‘keys’) and hasattr(o, ‘__getitem__’):
return dict(o)
if isinstance(o, date):
return o.strftime(‘%Y-%m-%d’)
raise ServerError()

class Flask(_Flask):
json_encoder = JSONEncoder
# File: /app/__init__.py
# -*- coding: utf-8 -*-
from .apps import Flask
from flask_debugtoolbar import DebugToolbarExtension
from flask_cors import CORS
from flask_migrate import Migrate
from flask_bcrypt import Bcrypt
from app.models.base import db

# instantiate 实例化
toolbar = DebugToolbarExtension()
migrate = Migrate(db=db)
bcrypt = Bcrypt()

def create_app():
# instantiate the app
app = Flask(__name__)

# enable CORS
CORS(app)

# set config
app.config.from_object(‘app.config.settings’)
app.config.from_object(‘app.config.secure’)

# set up extensions
toolbar.init_app(app)
migrate.init_app(app, db)
bcrypt.init_app(app)

# register blueprints
register_blueprints(app)
register_plugin(app)

# shell context for flask cli
@app.shell_context_processor
def ctx():
return {‘app’: app, ‘db’: db}

return app

def register_blueprints(app):
from app.api.v1 import create_blueprint_v1
app.register_blueprint(create_blueprint_v1(), url_prefix=’/v1′)

def register_plugin(app):
db.init_app(app)
with app.app_context():
db.create_all()
4. 新建配置文件
/app/config/ 目录,在这个目录下新建两个文件 settings.py 和 secure.py
# File: /app/config/settings.py
# -*- coding: utf-8 -*-
# TOKEN_EXPIRATION = 30 * 24 * 3600
DEBUG = ‘true’
TOKEN_EXPIRATION_DAYS = 30
TOKEN_EXPIRATION_SECONDS = 0

# encryption 的复杂程度,默认值为 12
BCRYPT_LOG_ROUNDS = 4
# File: /app/config/secure.py
# -*- coding: utf-8 -*-

SQLALCHEMY_DATABASE_URI = \
‘mysql+cymysql://root:root1234@localhost/flask-rest’

SECRET_KEY = ‘***’
SQLALCHEMY_TRACK_MODIFICATIONS = True

MINA_APP = {
‘AppID’: ‘***’,
‘AppSecret’: ‘***’
}
5. 在根目录下新建一个 manage.py

#File: /manage.py

# -*- coding: utf-8 -*-
from werkzeug.exceptions import HTTPException
from app import create_app
from app.libs.error import APIException
from app.libs.error_code import ServerError

app = create_app()

@app.errorhandler(Exception)
def framework_error(e):
“”” 全局拦截异常 ”””
if isinstance(e, APIException):
return e
if isinstance(e, HTTPException):
code = e.code
msg = e.description
error_code = 1007
return APIException(msg, code, error_code)
else:
if app.config[‘DEBUG’]:
return ServerError()
else:
raise e

if __name__ == ‘__main__’:
app.run()
6. 配置全局错误处理
新建文件夹 /app/libs/
#File: /app/libs/error.py

# -*- coding: utf-8 -*-
“””
自定义错误文件
“””
from flask import request, json
from werkzeug.exceptions import HTTPException

class APIException(HTTPException):
“”” 自定义 api 请求错误,返回的 json 格式 ”””
code = 500
msg = ‘ 抱歉,后台发生了错误 (*~︶~)!’
error_code = 999

def __init__(self, msg=None, code=None, error_code=None, headers=None):
if code:
self.code = code
if error_code:
self.error_code = error_code
if msg:
self.msg = msg
super(APIException, self).__init__(msg, None)

def get_body(self, environ=None):
body = dict(
msg=self.msg,
error_code=self.error_code,
request=request.method + ‘ ‘ + self.get_url_no_param()
)
text = json.dumps(body)
return text

def get_headers(self, environ=None):
return [(‘Content-Type’, ‘application/json’)]

@staticmethod
def get_url_no_param():
full_path = str(request.full_path)
main_path = full_path.split(‘?’)
return main_path[0]
#File: /app/libs/error_code.py
# -*- coding: utf-8 -*-
from werkzeug.exceptions import HTTPException
from app.libs.error import APIException

class Success(APIException):
code = 201
msg = ‘success’
error_code = 0

class DeleteSuccess(Success):
code = 202
error_code = 1

class ServerError(APIException):
code = 500
msg = ‘ 抱歉,后台发生了错误 (*~︶~)!’
error_code = 999

class ClientTypeError(APIException):
code = 400
msg = ‘ 未检测到客户端类型 ’
error_code = 1006

class ParameterException(APIException):
code = 400
msg = ‘ 无效参数 ’
error_code = 1000

class NotFound(APIException):
code = 404
msg = ‘ 没有找到对应的资源 O__O…’
error_code = 1001

class AuthFailed(APIException):
code = 401
error_code = 1005
msg = ‘ 认证失败 ’

class Forbidden(APIException):
code = 403
error_code = 1004
msg = ‘ 禁止访问,不在对应权限内 ’

class SingleLogin(APIException):
code = 400
error_code = 2002
msg = ‘ 请重新登录 ’

class DuplicateAct(APIException):
code = 400
error_code = 2001
msg = ‘ 请勿重复操作 ’
7. 自定义红图
#File: /app/libs/redprint.py

# -*- coding: utf-8 -*-
class Redprint:
def __init__(self, name):
self.name = name
self.mound = []

def route(self, rule, **options):
def decorator(f):
self.mound.append((f, rule, options))
return f
return decorator

def register(self, bp, url_prefix=None):
if url_prefix is None:
url_prefix = ‘/’ + self.name
for f, rule, options in self.mound:
endpoint = self.name + ‘+’ + \
options.pop(“endpoint”, f.__name__)
bp.add_url_rule(url_prefix + rule, endpoint, f, **options)
8. 新建 /app/api/v1/ 文件夹
#File: /app/api/v1/__init__.py

# -*- coding: utf-8 -*-
from flask import Blueprint
from app.api.v1 import user, client, token

def create_blueprint_v1():
bp_v1 = Blueprint(‘v1’, __name__)

user.api.register(bp_v1)
client.api.register(bp_v1)
token.api.register(bp_v1)
return bp_v1
9. 新建注册接口 client.py

#File: /app/api/v1/client.py

# -*- coding: utf-8 -*-
from app.libs.error_code import Success, ParameterException
from app.libs.redprint import Redprint
from app.models.user import User
from app.validators.forms import ClientForm, UserEmailForm, MinaForm
from app.libs.enums import ClientTypeEnum
from app.libs.get_openid import get_openid

api = Redprint(‘client’)

@api.route(‘/register’, methods=[‘POST’])
def create_client():
form = ClientForm().validate_for_api()
promise = {
ClientTypeEnum.USER_EMAIL: __register_user_by_email,
ClientTypeEnum.USER_MINA: __register_user_by_mina,
}
promise[form.type.data]()

return Success()

def __register_user_by_email():
form = UserEmailForm().validate_for_api()
User.register_by_email(form.nickname.data,
form.account.data,
form.secret.data)

def __register_user_by_mina():
form = MinaForm().validate_for_api()
account = get_openid(form.account.data)
if account is None:
raise ParameterException
else:
User.register_by_mina(form.nickname.data,
account,
form.sex.data,
form.avatar.data)
10. 登录 apitoken.py

#File: /app/api/v1/token.py

# -*- coding: utf-8 -*-
import jwt
import datetime
from flask import current_app, jsonify
from app.libs.enums import ClientTypeEnum
from app.libs.error_code import AuthFailed
from app.libs.redprint import Redprint
from app.models.user import User
from app.validators.forms import ClientForm, TokenForm
from app.libs.format_time import get_format_timestamp

api = Redprint(‘token’)

@api.route(”, methods=[‘POST’])
def get_token():
“”” 登录功能,认证成功返回 token”””
form = ClientForm().validate_for_api()
promise = {
ClientTypeEnum.USER_EMAIL: User.verify,
ClientTypeEnum.USER_MINA: User.mina_login,
}
identity = promise[ClientTypeEnum(form.type.data)](
form.account.data,
form.secret.data
)
# Token
token = generate_auth_token(identity[‘uid’],
form.type.data,
identity[‘login_time’],
identity[‘scope’])
t = {‘token’: token.decode(‘ascii’)}
return jsonify(t), 201

@api.route(‘/secret’, methods=[‘POST’])
def get_token_info():
“”” 获取令牌信息 ”””
form = TokenForm().validate_for_api()
auth_token = form.token.data
try:
data = jwt.decode(auth_token, current_app.config[‘SECRET_KEY’])
except jwt.ExpiredSignatureError:
raise AuthFailed(msg=’token is expired’, error_code=1003)
except jwt.InvalidTokenError:
raise AuthFailed(msg=’token is invalid’, error_code=1002)
r = {
‘scope’: data[‘scope’],
‘create_at’: get_format_timestamp(data[‘iat’]),
‘expire_in’: get_format_timestamp(data[‘exp’]),
‘uid’: data[‘uid’],
‘login_time’: get_format_timestamp(data[‘login_time’])
}
return jsonify(r)

def generate_auth_token(uid, ac_type, login_time, scope=None):
“”” 生成令牌 ”””
try:
payload = {
‘exp’: datetime.datetime.utcnow() + datetime.timedelta(
days=current_app.config[‘TOKEN_EXPIRATION_DAYS’],
seconds=current_app.config[‘TOKEN_EXPIRATION_SECONDS’],
),
‘iat’: datetime.datetime.utcnow(),
‘uid’: uid,
‘type’: ac_type.value,
‘login_time’: login_time,
‘scope’: scope,
}
return jwt.encode(
payload, current_app.config[‘SECRET_KEY’],
algorithm=’HS256′
)
except Exception as e:
return e
11. 用户接口 user.py

#File: /app/api/v1/user.py
# -*- coding: utf-8 -*-
from flask import jsonify, g

from app.libs.error_code import DeleteSuccess
from app.libs.redprint import Redprint
from app.libs.token_auth import auth
from app.models.base import db
from app.models.user import User

api = Redprint(‘user’)

@api.route(‘/<int:uid>’, methods=[‘GET’])
@auth.login_required
def super_get_user(uid):
user = User.query.filter_by(id=uid).first_or_404()
return jsonify(user)

@api.route(”, methods=[‘GET’])
@auth.login_required
def get_user():
uid = g.user.uid
user = User.query.filter_by(id=uid).first_or_404()
return jsonify(user)

@api.route(‘/<int:uid>’, methods=[‘DELETE’])
def super_delete_user(uid):
with db.auto_commit():
user = User.query.filter_by(id=uid).first_or_404()
user.delete()
return DeleteSuccess()

@api.route(”, methods=[‘DELETE’])
@auth.login_required
def delete_user():
uid = g.user.uid
with db.auto_commit():
user = User.query.filter_by(id=uid).first_or_404()
user.delete()
return DeleteSuccess()

@api.route(”, methods=[‘PUT’])
def update_user():
return ‘update’
12. 新建用户 models

#File: /app/models/base.py

# -*- coding: utf-8 -*-
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery
from sqlalchemy import inspect, Column, Integer, SmallInteger, orm
from contextlib import contextmanager

from app.libs.error_code import NotFound

class SQLAlchemy(_SQLAlchemy):
@contextmanager
def auto_commit(self):
try:
yield
self.session.commit()
except Exception as e:
db.session.rollback()
raise e

class Query(BaseQuery):
def filter_by(self, **kwargs):
if ‘status’ not in kwargs.keys():
kwargs[‘status’] = 1
return super(Query, self).filter_by(**kwargs)

def get_or_404(self, ident):
rv = self.get(ident)
if not rv:
raise NotFound()
return rv

def first_or_404(self):
rv = self.first()
if not rv:
raise NotFound()
return rv

db = SQLAlchemy(query_class=Query)

class Base(db.Model):
__abstract__ = True
create_time = Column(Integer)
status = Column(SmallInteger, default=1)

def __init__(self):
self.create_time = int(datetime.now().timestamp())

def __getitem__(self, item):
return getattr(self, item)

@property
def create_datetime(self):
if self.create_time:
return datetime.fromtimestamp(self.create_time)
else:
return None

def set_attrs(self, attrs_dict):
for key, value in attrs_dict.items():
if hasattr(self, key) and key != ‘id’:
setattr(self, key, value)

def delete(self):
“”” 删除用户,注销用户 ”””
self.status = 0

def active(self):
“”” 激活用户 ”””
self.status = 1

def update(self):
“”” 更新数据库的表内容 ”””
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return str(e)

def keys(self):
return self.fields

def hide(self, *keys):
for key in keys:
self.fields.remove(key)
return self

def append(self, *keys):
for key in keys:
self.fields.append(key)
return self

class MixinJSONSerializer:
@orm.reconstructor
def init_on_load(self):
self._fields = []
# self._include = []
self._exclude = []
self._set_fields()
self.__prune_fields()

def _set_fields(self):
pass

def __prune_fields(self):
columns = inspect(self.__class__).columns
if not self._fields:
all_columns = set(columns.keys())
self._fields = list(all_columns – set(self._exclude))

def hide(self, *args):
for key in args:
self._fields.remove(key)
return self

def keys(self):
return self._fields

def __getitem__(self, key):
return getattr(self, key)
#File: /app/models/user.py
# -*- coding: utf-8 -*-
from datetime import datetime
from flask import current_app
from sqlalchemy import Column, Integer, String, SmallInteger
from app import bcrypt
from app.libs.error_code import AuthFailed
from app.models.base import Base, db
from app.libs.format_time import get_current_timestamp
from app.libs.get_openid import get_openid
from app.libs.error_code import ParameterException

class User(Base):
id = Column(Integer, primary_key=True)
nickname = Column(String(24), unique=True)
email = Column(String(24), unique=True)
mobile = Column(String(11), unique=True)
sex = Column(Integer, default=0) # 1 男 2 女
avatar = Column(String(200)) # 头像
register_ip = Column(String(100)) # 注册 ip
auth = Column(SmallInteger, default=1) # 权限
openid = Column(String(80), unique=True)
_password = Column(‘password’, String(100))
login_time = Column(Integer, default=int(datetime.now().timestamp()))

@property
def login_datetime(self):
if self.login_time:
return datetime.fromtimestamp(self.login_time)
else:
return None

def keys(self):
return [‘id’, ‘nickname’, ’email’, ‘auth’]

@property
def password(self):
return self._password

@password.setter
def password(self, raw):
self._password = bcrypt.generate_password_hash(
raw, current_app.config[‘BCRYPT_LOG_ROUNDS’]).decode(‘utf-8’)

@staticmethod
def register_by_email(nickname, account, secret):
“”” 通过邮箱注册 ”””
with db.auto_commit():
user = User()
user.nickname = nickname
user.email = account
user.password = secret
db.session.add(user)

@staticmethod
def verify(email, password):
“”” 通过邮箱登录 ”””
user = User.query.filter_by(email=email).first_or_404()
if not user.check_password(password):
raise AuthFailed()
scope = ‘AdminScope’ if user.auth == 2 else ‘UserScope’
login_time = get_current_timestamp()
user.login_time = login_time
User.update(User)
return {‘uid’: user.id, ‘scope’: scope, ‘login_time’: login_time}

def check_password(self, raw):
if not self._password:
return False
return bcrypt.check_password_hash(self._password, raw)

@staticmethod
def register_by_mina(nickname, account, sex, avatar):
“”” 通过小程序注册 ”””
with db.auto_commit():
user = User()
user.nickname = nickname
user.openid = account
user.sex = sex
user.avatar = avatar
db.session.add(user)

@staticmethod
def mina_login(account, secret):
“”” 通过小程序登录 ”””
openid = get_openid(account) # 通过 code 来来获取 openid
if openid is None:
raise ParameterException

user = User.query.filter_by(openid=openid).first_or_404()
scope = ‘AdminScope’ if user.auth == 2 else ‘UserScope’
login_time = get_current_timestamp()
user.login_time = login_time
User.update(User)
return {‘uid’: user.id, ‘scope’: scope, ‘login_time’: login_time}
13. 添加自定义的函数枚举登录类型
# File: /app/libs/enums.py

# -*- coding: utf-8 -*-
from enum import Enum

class ClientTypeEnum(Enum):
USER_EMAIL = 100
USER_MOBILE = 101

# 微信小程序
USER_MINA = 200
# 微信公众号
USER_WX = 201
14. 时间辅助函数
#File: /app/libs/format_time.py
# -*- coding: utf-8 -*-
import datetime

def get_current_date():
“”” 获取当前时间 ”””
return datetime.datetime.now()

def get_current_timestamp():
“”” 获取当前时间的时间戳 ”””
return int(datetime.datetime.now().timestamp())

def get_format_date(date=None, format_time=”%Y-%m-%d %H:%M:%S”):
“”” 获取格式化时间 ”””
if date is None:
date = datetime.datetime.now()
return date.strftime(format_time)

def get_format_timestamp(date=None, format_time=”%Y-%m-%d %H:%M:%S”):
“”” 格式化时间戳 ”””
if date is None:
date = datetime.datetime.now()
return datetime.datetime.fromtimestamp(date).strftime(format_time)
15. 获取微信 openid 的函数
#File: /app/libs/get_openid.py
# -*- coding: utf-8 -*-
import requests
import json
from flask import current_app

def get_openid(code):
api = ‘https://api.weixin.qq.com/sns/jscode2session’
params = ‘appid={0}&secret={1}&js_code={2}&grant_type=authorization_code’ \
.format(current_app.config[‘MINA_APP’][‘AppID’], current_app.config[‘MINA_APP’][‘AppSecret’], code)
url = api + ‘?’ + params
response = requests.get(url=url)
res = json.loads(response.text)
openid = None
if ‘openid’ in res:
openid = res[‘openid’]
return openid
16.scope.py 权限管理函数
#File: /app/libs/scope.py
# -*- coding: utf-8 -*-
class Scope:
allow_api = []
allow_module = []
forbidden = []

def __add__(self, other):
“”” 重载加号运算符 ”””
self.allow_api = self.allow_api + other.allow_api
self.allow_api = list(set(self.allow_api))

self.allow_module = self.allow_module + other.allow_module
self.allow_module = list(set(self.allow_module))

self.forbidden = self.forbidden + other.forbidden
self.forbidden = list(set(self.forbidden))

return self

class AdminScope(Scope):
allow_module = [‘v1.user’]

def __init__(self):
pass

class UserScope(Scope):
forbidden = [‘v1.user+super_get_user’,
‘v1.user+super_delete_user’]

def __init__(self):
self + AdminScope()

def is_in_scope(scope, endpoint):
# 把类名的字符串实例化
scope = globals()[scope]()
splits = endpoint.split(‘+’)
red_name = splits[0]
if endpoint in scope.forbidden:
return False
if endpoint in scope.allow_api:
return True
if red_name in scope.allow_module:
return True
else:
return False
17.jwt 生成 token 和验证 token
#File: /app/libs/token_auth.py

# -*- coding: utf-8 -*-
import jwt
from collections import namedtuple
from flask import current_app, g, request
from flask_httpauth import HTTPBasicAuth

from app.models.user import User as _User
from app.libs.scope import is_in_scope
from app.libs.error_code import AuthFailed, Forbidden, SingleLogin

auth = HTTPBasicAuth()
User = namedtuple(‘User’, [‘uid’, ‘ac_type’, ‘scope’, ‘login_time’])

@auth.verify_password
def verify_password(token, password):
user_info = verify_auth_token(token)
if not user_info:
return False
else:
g.user = user_info
return True

def verify_auth_token(token):
try:
data = jwt.decode(token, current_app.config[‘SECRET_KEY’])
except jwt.ExpiredSignatureError:
raise AuthFailed(msg=’token is expired’, error_code=1003)
except jwt.InvalidTokenError:
raise AuthFailed(msg=’token is invalid’, error_code=1002)
uid = data[‘uid’]
ac_type = data[‘type’]
scope = data[‘scope’]
login_time = data[‘login_time’]
user = _User.query.filter_by(id=uid).first_or_404()
if login_time != user.login_time:
raise SingleLogin()
# request 视图函数
allow = is_in_scope(scope, request.endpoint)
if not allow:
raise Forbidden()
return User(uid, ac_type, scope, login_time)
18. 验证函数 validators

#File: /app/validators/base.py

# -*- coding: utf-8 -*-
from flask import request
from wtforms import Form

from app.libs.error_code import ParameterException

class BaseForm(Form):
def __init__(self):
data = request.get_json(silent=True)
args = request.args.to_dict()
super(BaseForm, self).__init__(data=data, **args)

def validate_for_api(self):
valid = super(BaseForm, self).validate()
if not valid:
raise ParameterException(msg=self.errors)
return self
#File: /app/validators/forms.py
# -*- coding: utf-8 -*-
from wtforms import StringField, IntegerField
from wtforms.validators import DataRequired, length, Email, Regexp
from wtforms import ValidationError

from app.libs.enums import ClientTypeEnum
from app.models.user import User
from app.validators.base import BaseForm as Form

class ClientForm(Form):
account = StringField(validators=[
DataRequired(message=’ 不允许为空 ’),
length(min=5, max=32)])
secret = StringField()
type = IntegerField(validators=[DataRequired()])

def validate_type(self, value):
try:
client = ClientTypeEnum(value.data)
except ValueError as e:
raise e
self.type.data = client

class UserEmailForm(ClientForm):
account = StringField(validators=[Email(message=’invalidate email’)])
secret = StringField(validators=[
DataRequired(),
# password can only include letters , numbers and “_”
Regexp(r’^[A-Za-z0-9_*&$#@]{6,22}$’)
])
nickname = StringField(validators=[DataRequired(),
length(min=2, max=22)])

def validate_account(self, value):
if User.query.filter_by(email=value.data).first():
raise ValidationError()

class TokenForm(Form):
token = StringField(validators=[DataRequired()])

class MinaForm(Form):
account = StringField(validators=[
DataRequired(message=’ 不允许为空 ’),
length(min=10, max=80)])
nickname = StringField(validators=[DataRequired()])
sex = IntegerField(validators=[DataRequired()])
avatar = StringField(validators=[DataRequired()])
type = IntegerField(validators=[DataRequired()])

def validate_type(self, value):
try:
client = ClientTypeEnum(value.data)
except ValueError as e:
raise e
self.type.data = client
19. error_code.md

error_code
msg

0
创建成功

1
删除成功

999
未知错误 – 后台发生了错误

1000
无效参数

1001
没有找到对应的资源

1002
token is invalid

1003
token is expired

1004
禁止访问,不在对应权限内

1005
认证失败

1006
未检测到客户端类型

2001
请勿重复操作

2002
请重新登录

20. jwt 登录图片
Flask + PyJWT 实现基于 Json Web Token 的用户认证授权

endpoint
HTTP Method
Authenticated?
Result
json Body

/v1/client/register
POST
NO
注册用户
{“account”:”666@qq.com”,”secret”:”123456″,”type”:100,”nickname”:”666″}

/v1/token
POST
NO
获取 token
{“account”:”666@qq.com”,”secret”:”123456″,”type”:100,”nickname”:”666″}

/v1/user
GET
NO
用户详情

/v1/user/2
GET
YES
管理员获取用户详情

/v1/token/secret
POST
NO
token 详情
{“token”:”*”}

通过 jwt 的 paylod 携带 login_time,和数据库 User 表中的 login_time 进行匹配作为单点登录
修改数据库
cd users
flask db migrate
flask db upgrade
运行结果
注册成功

注册失败

登录成功

登录失败

成功获取用户信息

用户重新登录,token 变更,原 token 无法获取用户信息

不带 token 请求,无法获取用户信息

token 过期,无法获取用户信息

有权限获取其他用户信息

无权限获取其他用户信息

学习资料:

慕课网 -Python Flask 构建可扩展的 RESTful API
慕课网 - 掌握 Taro 多端框架 快速上手小程序 /H5 开发
Taro 多端开发实现原理与项目实战

退出移动版