1.Redis缓存双写一致性
2.数据库和缓存一致性的几种更新策略
2.1先更新数据库,再更新缓存
2.2先删除缓存,再更新数据库
2.3先更新数据库,再删除缓存
2.4先更新缓存,再更新数据库
3.Redis与MySQL数据双写一致性工程落地案例之canal
4.总结
1.Redis缓存双写一致性
咱们都晓得,只有咱们应用redis,就会遇到缓存与数据库的双存储双写,那么只有是双写,就肯定会有数据一致性问题,为了保障双写一致性,咱们要先动redis还是mysql?
通常地来说,有为了保证数据的一致性,会有以下两种状况:
1)如果redis中有数据,咱们须要和数据库中的值雷同
2)如果redis中无数据,数据库里的值是最新值
2.数据库和缓存一致性的几种更新策略
咱们要保障数据库和缓存的一致性,然而这毕竟是两个工具,必然会造成肯定的提早,所以咱们要保障的是最终一致性!
咱们能够给缓存的key设置过期工夫,这是保障最终一致性的解决方案。
咱们对存入缓存的数据设置过期工夫,所有写操作以数据库为准,对缓存操作只用尽最大致力即可。如果数据库写胜利,缓存更新失败,只有达到过期工夫,前面的读申请天然会从数据库中取新值,而后回写缓存,达到一致性。
上述的案例只是目前支流+成熟的做法,思考到每个公司的业务性质不同,请抉择适宜咱们本人公司的办法。
2.1先更新数据库,再更新缓存
当咱们进行这种操作的时候,如果线程并发量足够大,个别会呈现两个问题,咱们用列表格的形式来进行形容:
异常情况1:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 更新数据库的值 | |
t2 | 查问申请,缓存命中旧数据,导致查问脏数据 | |
t3 | 更新缓存的数据 |
异常情况2:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 更新数据库的值 | |
t2 | 查问申请,缓存命中旧数据,导致查问脏数据 | |
t3 | 缓存更新失败,导致肯定工夫内查问的都为脏数据 |
2.2先删除缓存,再更新数据库
异常情况1:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 删除缓存 | |
t2 | 大量查问申请,间接导致缓存击穿 | |
t3 | 服务器宕机 |
异常情况1解决方案:
缓存击穿的解决方案,咱们在后面这篇博客曾经解释过了。
深刻了解redis——缓存雪崩/缓存击穿/缓存穿透
异常情况2:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 删除缓存 | |
t2 | 查问申请,缓存无数据,去数据库查问旧数据 | |
t3 | 更新mysql的值,导致和缓存中数据不统一 |
异常情况2解决方案:
采纳延时双删策略
public void delayDoubleDeleteUser(TUser user) throws InterruptedException { //线程胜利删除redis缓存 redisTemplate.delete(CACHE_KEY_USER + user.getId()); //线程再更新mysql userMapper.updateById(user); //休眠两秒钟,期待其它查问业务逻辑先执行结束,缓存中曾经齐全是旧值的时候 Thread.sleep(2000); //再删除一遍缓存 redisTemplate.delete(CACHE_KEY_USER + user.getId()); }
延时双删造成的问题1:那么,这个延时双删,线程要休眠多久呢?
个别在业务我的项目运行的时候,先统计下线程的读和写操作的工夫,由此为根底,再依据写数据的休眠工夫在读数据业务逻辑的耗时根底上减少百毫秒即可。
延时双删造成的问题2:这种同步策略造成吞吐量升高怎么办?
再开一个线程就能够了。
public void delayDoubleDeleteUser(TUser user) throws InterruptedException, ExecutionException { //线程胜利删除redis缓存 redisTemplate.delete(CACHE_KEY_USER + user.getId()); //线程再更新mysql userMapper.updateById(user); //休眠两秒钟,期待其它业务逻辑先执行结束 //开一个线程,再删除一遍缓存 CompletableFuture.supplyAsync(()->{ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return redisTemplate.delete(CACHE_KEY_USER + user.getId()); }).get(); }
2.3先更新数据库,再删除缓存
这种是业内提倡的解决方案:
异常情况1:
工夫 | 线程 A | 线程 B |
---|---|---|
t1 | 更新数据库 | |
t2 | 还未来得及删除,就从缓存里查出了旧数据 | |
t3 | 删除缓存 |
那么问题就来了,咱们是先删除缓存,而后再更新数据库,还是先更新数据库,再删缓存呢?
个别地来说,对于一个不能保障事务性的操作,肯定波及"哪个工作先做,哪个工作后做"的问题,解决这个问题的方向是:如果呈现不统一,谁先做对业务的影响较小,谁先执行。
先删缓存,再更新数据库:该办法的异样呈现在,删除缓存后,在数据库还未更新实现,立马又把旧值刷新回缓存。
先更新数据库,再删缓存:该办法的异样呈现在,更新数据库后,缓存短暂的工夫还没有淘汰,而呈现的旧数据被读取的问题。
咱们能够很显著地得出,第一种计划,出现异常的概率更大,删除缓存后到写操作实现,两头的间隔时间,远比更新数据库后到删除缓存中的间隔时间要长,所以咱们更应该抉择第二种。
不过如果你肯定要保障一致性怎么办?
没有方法做到相对的一致性,这是由CAP实践决定的,缓存零碎实用的场景就是非强一致性的场景,所以它属于CAP中的AP。
所以,咱们得忍辱负重,能够去做到BASE实践中说的最终一致性。
此时就引出了咱们的canal工具!(下文会有应用解说)
2.4先更新缓存,再更新数据库
个别没人会这么做,不倡议缓存的数据比数据库超前。
3.Redis与MySQL数据双写一致性工程落地案例之canal
canal是什么:
canal次要用于mysql数据库增量日志数据的订阅,生产和解析,是阿里巴巴开发并开源的,采纳java语言开发。
历史背景是晚期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需要,实现形式次要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐渐尝试采纳解析数据库日志获取增量变更进行同步,由此衍生出了canal我的项目;
总结:Canal是基于Mysql变更日志增量订阅和生产的组件,订阅mysql的增量日志,再同步到其它组件(mysql/redis/mq等等),实现数据的最终一致性。
canal的工作原理:
在看canal的工作原理前,咱们先理解一下mysql主从同步的工作原理:
Mysql的主从同步流程:
1.当master主服务器上的数据产生扭转的时候,将其扭转写入二进制事件日志文件中;
2.slave从服务器会在肯定工夫距离内对master主服务器上的二进制日志进行探测, 探测其是否产生扭转。
如果探测到其产生了扭转,则开启一个I/O Thread 申请read master的二进制日志。
3.同时master主服务器为每个I/O Thread启动一个dump(转储) Thread,用于向该I/O Thread发送二进制事件日志。
4.slave从服务器将接管到的二进制事件日志保留至本人本地的中继日志文件中。
5.slave从服务器将启动SQL Thread从中级日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致。
6.最初I/O Thread和SQL Thread执行结束后将进行休眠状态,期待下一次被唤醒。
而canal也是模仿mysql slave的同步协定,假装本人成为mysql Slave,向mysql Master发送dump协定,Mysql Master 收到dump申请,开始推送 binary log 给 slave (即 canal )canal 解析 binary log 对象(原始为 byte 流)
然而这种解决方案,也只能是做到最终一致性,无奈达到强一致性。
canal的应用:
连贯MySQL 键入命令:show variables like 'log_%'; 查看binlog是否开启
若log_bin的值为OFF,则须要对配置文件my.ini进行批改,退出如下代码:
log-bin=log-binbinlog-format=ROW
ROW:模式除了记录sql语句之外,还会记录每个字段的变动状况,可能分明的记录每行数据的变动历史,但会占用较多的空间。
STATEMENT: 模式只记录了sql语句,然而没有记录上下文信息,在进行数据恢复的时候可能会导致数据的失落状况;
MIX: 模式比拟灵便的记录,实践上说当遇到了表构造变更的时候,就会记录为statement模式。当遇到了数据更新或者删除状况下就会变为row模式;
批改conf /canal.properties配置文件,注册地址为本机数据库地址
批改conf/example/instance.properties配置文件,配置数据库信息
Windows下应用startup.bat
pom.xml减少依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
package com.example.demo.redisDemo.config.utils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;/** * @author sulingfeng * @title: RedisUtils * @projectName demo * @description: TODO * @date 2022/2/16 16:01 */public class RedisUtils { private static JedisPool jedisPool; static { JedisPoolConfig jedisPoolConfig=new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(20); jedisPoolConfig.setMaxIdle(10); jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379); } public static Jedis getJedis() throws Exception { if(null!=jedisPool){ return jedisPool.getResource(); } throw new Exception("Jedispool was not init"); }}
package com.example.demo.canalDemo;import com.alibaba.fastjson.JSONObject;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import com.example.demo.redisDemo.config.utils.RedisUtils;import redis.clients.jedis.Jedis;import java.net.InetSocketAddress;import java.util.List;import java.util.concurrent.TimeUnit;/** * @auther * @create 2020-11-11 17:13 */public class RedisCanalClientExample { public static final Integer _60SECONDS = 60; public static void main(String args[]) { // 创立链接canal服务端 example为canal实例名字 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("199.16.1.135", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; System.out.println("----------------程序启动,开始监听mysql的变动:"); try { connector.connect(); //connector.subscribe(".*\\..*"); //connector.subscribe("db2020.t_order"); //订阅mysql这张表 connector.subscribe("test.t_user"); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); //如果没数据批改 if (batchId == -1 || size == 0) { emptyCount++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } else { //如果有数据批改 emptyCount = 0; printEntry(message.getEntries()); System.out.println(); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 解决失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } //次要干活的办法 private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { //如果是新增 if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); //如果是删除 } else if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else {//EventType.UPDATE redisUpdate(rowData.getAfterColumnsList()); } } } } //数据库减少的时候,redis减少 private static void redisInsert(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " insert=" + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(), jsonObject.toJSONString()); } catch (Exception e) { e.printStackTrace(); } } } //数据库删除的时候,redis删除 private static void redisDelete(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " delete=" + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.del(columns.get(0).getValue()); } catch (Exception e) { e.printStackTrace(); } } } //数据库批改的时候,redis也批改 private static void redisUpdate(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(), jsonObject.toJSONString()); System.out.println("---------update after: " + jedis.get(columns.get(0).getValue())); } catch (Exception e) { e.printStackTrace(); } } long startTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis(); System.out.println("----costTime: " + (endTime - startTime) + " 毫秒"); }}
本来数据库里的数据:
本来redis里的数据:
mysql减少了一条数据:
redis也减少了一条数据:
控制台:
谨严一点的做法的流程图:
4.总结
对于缓存的双写一致性,因为无奈做到强一致性,咱们还是以最终一致性为解决方案。