1.Redis 缓存双写一致性
2. 数据库和缓存一致性的几种更新策略
2.1 先更新数据库,再更新缓存
2.2 先删除缓存,再更新数据库
2.3 先更新数据库,再删除缓存
2.4 先更新缓存,再更新数据库
3.Redis 与 MySQL 数据双写一致性工程落地案例之 canal
4. 总结
1.Redis 缓存双写一致性
咱们都晓得,只有咱们应用 redis,就会遇到 缓存与数据库的双存储双写 ,那么只有是双写, 就肯定会有数据一致性问题,为了保障双写一致性,咱们要先动 redis 还是 mysql?
通常地来说,有为了保证数据的一致性,会有以下两种状况:
1)如果 redis 中 有数据,咱们须要和数据库中的值雷同
2)如果 redis 中 无数据,数据库里的值是最新值
2. 数据库和缓存一致性的几种更新策略
咱们要保障数据库和缓存的一致性,然而这毕竟是两个工具,必然会造成肯定的提早,所以咱们要保障的是 最终一致性!
咱们能够 给缓存的 key 设置过期工夫,这是保障最终一致性的解决方案。
咱们对存入缓存的数据设置过期工夫,所有写操作以数据库为准,对缓存操作只用尽最大致力即可。如果数据库写胜利,缓存更新失败,只有达到过期工夫 ,前面的 读申请天然会从数据库中取新值,而后回写缓存,达到一致性。
上述的案例只是目前支流 + 成熟的做法,思考到每个公司的业务性质不同,请抉择适宜咱们本人公司的办法。
2.1 先更新数据库,再更新缓存
当咱们进行这种操作的时候,如果线程并发量足够大,个别会呈现两个问题,咱们用列表格的形式来进行形容:
异常情况 1:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 更新数据库的值 | |
t2 | 查问申请,缓存命中旧数据,导致查问脏数据 | |
t3 | 更新缓存的数据 |
异常情况 2:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 更新数据库的值 | |
t2 | 查问申请,缓存命中旧数据,导致查问脏数据 | |
t3 | 缓存更新失败,导致肯定工夫内查问的都为脏数据 |
2.2 先删除缓存,再更新数据库
异常情况 1:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 删除缓存 | |
t2 | 大量查问申请,间接导致缓存击穿 | |
t3 | 服务器宕机 |
异常情况 1 解决方案:
缓存击穿的解决方案,咱们在后面这篇博客曾经解释过了。
深刻了解 redis——缓存雪崩 / 缓存击穿 / 缓存穿透
异常情况 2:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 删除缓存 | |
t2 | 查问申请,缓存无数据,去数据库查问旧数据 | |
t3 | 更新 mysql 的值,导致和缓存中数据不统一 |
异常情况 2 解决方案:
采纳 延时双删策略
public void delayDoubleDeleteUser(TUser user) throws InterruptedException {
// 线程胜利删除 redis 缓存
redisTemplate.delete(CACHE_KEY_USER + user.getId());
// 线程再更新 mysql
userMapper.updateById(user);
// 休眠两秒钟,期待其它查问业务逻辑先执行结束,缓存中曾经齐全是旧值的时候
Thread.sleep(2000);
// 再删除一遍缓存
redisTemplate.delete(CACHE_KEY_USER + user.getId());
}
延时双删造成的问题 1:那么,这个延时双删,线程要休眠多久呢?
个别在业务我的项目运行的时候,先统计下线程的读和写操作的工夫,由此为根底,再依据写数据的休眠工夫在读数据业务逻辑的耗时根底上减少百毫秒即可。
延时双删造成的问题 2:这种同步策略造成吞吐量升高怎么办?
再开一个线程就能够了。
public void delayDoubleDeleteUser(TUser user) throws InterruptedException, ExecutionException {
// 线程胜利删除 redis 缓存
redisTemplate.delete(CACHE_KEY_USER + user.getId());
// 线程再更新 mysql
userMapper.updateById(user);
// 休眠两秒钟,期待其它业务逻辑先执行结束
// 开一个线程,再删除一遍缓存
CompletableFuture.supplyAsync(()->{
try {Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
}
return redisTemplate.delete(CACHE_KEY_USER + user.getId());
}).get();}
2.3 先更新数据库,再删除缓存
这种是业内提倡的解决方案:
异常情况 1:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 更新数据库 | |
t2 | 还未来得及删除,就从缓存里查出了旧数据 | |
t3 | 删除缓存 |
那么问题就来了,咱们是 先删除缓存,而后再更新数据库 ,还是先 更新数据库,再删缓存呢?
个别地来说,对于一个不能保障事务性的操作,肯定波及 ” 哪个工作先做,哪个工作后做 ” 的问题,解决这个问题的方向是:如果呈现不统一,谁先做对业务的影响较小,谁先执行。
先删缓存,再更新数据库:该办法的异样呈现在,删 除缓存后,在数据库还未更新实现,立马又把旧值刷新回缓存。
先更新数据库,再删缓存:该办法的异样呈现在,更新数据库后,缓存短暂的工夫还没有淘汰,而呈现的旧数据被读取的问题。
咱们能够很显著地得出,第一种计划,出现异常的概率更大 ,删除缓存后到写操作实现,两头的间隔时间, 远比更新数据库后到删除缓存中的间隔时间要长,所以咱们更应该抉择第二种。
不过如果你肯定要保障一致性怎么办?
没有方法做到相对的一致性,这是由 CAP 实践决定的,缓存零碎实用的场景就是非强一致性的场景,所以它属于 CAP 中的 AP。
所以,咱们得忍辱负重,能够去做到 BASE 实践中说的最终一致性。
此时就引出了咱们的canal 工具!(下文会有应用解说)
2.4 先更新缓存,再更新数据库
个别没人会这么做,不倡议缓存的数据比数据库超前。
3.Redis 与 MySQL 数据双写一致性工程落地案例之 canal
canal 是什么:
canal 次要用于mysql 数据库增量日志数据的订阅,生产和解析,是阿里巴巴开发并开源的,采纳 java 语言开发。
历史背景是晚期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需要,实现形式次要是基于业务 trigger(触发器)获取增量变更。从 2010 年开始,阿里巴巴逐渐尝试采纳解析数据库日志获取增量变更进行同步,由此衍生出了 canal 我的项目;
总结:Canal 是基于 Mysql 变更日志增量订阅和生产的组件 , 订阅 mysql 的增量日志,再同步到其它组件(mysql/redis/mq 等等),实现数据的最终一致性。
canal 的工作原理:
在看 canal 的工作原理前,咱们先理解一下 mysql 主从同步的工作原理:
Mysql 的主从同步流程:
1. 当 master 主服务器上的 数据产生扭转的时候 ,将其 扭转 写入 二进制事件日志文件中 ;
2.slave 从服务器会 在肯定工夫距离内 对 master 主服务器上的 二进制日志进行探测 ,探测其是否产生扭转。
如果 探测到其产生了扭转 ,则 开启一个 I /O Thread 申请 read master 的二进制日志。
3. 同时 master 主服务器 为每个 I /O Thread 启动一个 dump(转储) Thread,用于向该 I /O Thread 发送二进制事件日志。
4.slave 从服务器将 接管到的二进制事件日志保留至本人本地的中继日志文件中。
5.slave 从服务器将 启动 SQL Thread 从中级日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致。
6. 最初 I /O Thread 和 SQL Thread 执行结束后将进行休眠状态,期待下一次被唤醒。
而 canal 也是 模仿 mysql slave 的同步协定,假装本人成为 mysql Slave,向 mysql Master 发送 dump 协定,Mysql Master 收到 dump 申请,开始推送 binary log 给 slave (即 canal)canal 解析 binary log 对象(原始为 byte 流)
然而这种解决方案,也只能是做到最终一致性,无奈达到强一致性。
canal 的应用:
连贯 MySQL 键入命令:show variables like ‘log_%’; 查看 binlog 是否开启
若 log_bin 的值为 OFF, 则须要对配置文件 my.ini 进行批改,退出如下代码:
log-bin=log-bin
binlog-format=ROW
ROW:模式除了记录 sql 语句之外,还会记录每个字段的变动状况 ,可能分明的记录每行数据的变动历史,但会占用较多的空间。
STATEMENT: 模式只记录了 sql 语句,然而没有记录上下文信息 ,在进行数据恢复的时候可能会导致数据的失落状况;
MIX:模式比拟灵便的记录, 实践上说当遇到了表构造变更的时候,就会记录为 statement 模式。当遇到了数据更新或者删除状况下就会变为 row 模式;
批改 conf /canal.properties 配置文件,注册地址为本机数据库地址
批改 conf/example/instance.properties 配置文件,配置数据库信息
Windows 下应用 startup.bat
pom.xml 减少依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
package com.example.demo.redisDemo.config.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @author sulingfeng
* @title: RedisUtils
* @projectName demo
* @description: TODO
* @date 2022/2/16 16:01
*/
public class RedisUtils {
private static JedisPool jedisPool;
static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
}
public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();
}
throw new Exception("Jedispool was not init");
}
}
package com.example.demo.canalDemo;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.example.demo.redisDemo.config.utils.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @auther
* @create 2020-11-11 17:13
*/
public class RedisCanalClientExample {
public static final Integer _60SECONDS = 60;
public static void main(String args[]) {
// 创立链接 canal 服务端 example 为 canal 实例名字
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("199.16.1.135",
11111), "example", "","");
int batchSize = 1000;
int emptyCount = 0;
System.out.println("---------------- 程序启动,开始监听 mysql 的变动:");
try {connector.connect();
//connector.subscribe(".*\\..*");
//connector.subscribe("db2020.t_order");
// 订阅 mysql 这张表
connector.subscribe("test.t_user");
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
// 如果没数据批改
if (batchId == -1 || size == 0) {
emptyCount++;
try {TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {e.printStackTrace();
}
} else {
// 如果有数据批改
emptyCount = 0;
printEntry(message.getEntries());
System.out.println();}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 解决失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {connector.disconnect();
}
}
// 次要干活的办法
private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}
RowChange rowChage = null;
try {rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
// 如果是新增
if (eventType == EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());
// 如果是删除
} else if (eventType == EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
// 数据库减少的时候,redis 减少
private static void redisInsert(List<Column> columns) {JSONObject jsonObject = new JSONObject();
for (Column column : columns) {System.out.println(column.getName() + ":" + column.getValue() + "insert=" + column.getUpdated());
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {try (Jedis jedis = RedisUtils.getJedis()) {jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
} catch (Exception e) {e.printStackTrace();
}
}
}
// 数据库删除的时候,redis 删除
private static void redisDelete(List<Column> columns) {JSONObject jsonObject = new JSONObject();
for (Column column : columns) {System.out.println(column.getName() + ":" + column.getValue() + "delete=" + column.getUpdated());
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {try (Jedis jedis = RedisUtils.getJedis()) {jedis.del(columns.get(0).getValue());
} catch (Exception e) {e.printStackTrace();
}
}
}
// 数据库批改的时候,redis 也批改
private static void redisUpdate(List<Column> columns) {JSONObject jsonObject = new JSONObject();
for (Column column : columns) {System.out.println(column.getName() + ":" + column.getValue() + "update=" + column.getUpdated());
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {try (Jedis jedis = RedisUtils.getJedis()) {jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
System.out.println("---------update after:" + jedis.get(columns.get(0).getValue()));
} catch (Exception e) {e.printStackTrace();
}
}
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis();
System.out.println("----costTime:" + (endTime - startTime) + "毫秒");
}
}
本来数据库里的数据:
本来 redis 里的数据:
mysql 减少了一条数据:
redis 也减少了一条数据:
控制台:
谨严一点的做法的流程图:
4. 总结
对于缓存的双写一致性,因为无奈做到强一致性,咱们还是以最终一致性为解决方案。