共计 870 个字符,预计需要花费 3 分钟才能阅读完成。
- pip3 install flask-redis
from flask import Flask | |
from flask_redis import FlaskRedis | |
import time | |
app = Flask(__name__) | |
app.config['REDIS_URL'] = 'redis://:XxxxU@10.2x.5.36:6x9/0' | |
redis_client = FlaskRedis(app) | |
@app.route('/') | |
def hello_world(): | |
return 'Hello World!' | |
def mark_dyn_data(keys): | |
keys_str = str(keys).encode('utf-8') | |
exists_keys = redis_client.exists(keys_str) | |
print("###exists_keys:", exists_keys) | |
if exists_keys == 1: | |
redis_client.incrby(keys_str, 1) | |
else: | |
expires = int(time.time()) + 3600 | |
p = redis_client.pipeline() | |
p.set(keys_str, 1) | |
p.expireat(keys_str, expires) | |
p.execute() | |
def get_dyn_data(keys): | |
keys_str = str(keys).encode('utf-8') | |
data = redis_client.get(keys_str) | |
if data: | |
return str(data) | |
return None | |
@app.route('/test_redis/<path:keys>') | |
def test_redis1(keys): | |
#mark_dyn_data('icmp_设施无奈 ping 通告警_icmp_10.26.5.36_2022-06-2517:59:38') | |
mark_dyn_data(keys) | |
data = get_dyn_data(keys) | |
return str(data) | |
if __name__ == '__main__': | |
app.run() | |
正文完