@[toc]
一、 线程锁与分布式锁
线程锁 单体我的项目
单体我的项目
步骤
代码如下
//定义动态全局锁 private readonly static object _lock = new object();// 控制器中增加代码 lock (_lock) { Stock sto = new Stock(); sto = demoDbContext.stock.Where(p => p.ID == 1).FirstOrDefault(); if (sto.count == 0) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId + "---秒杀完结,无库存"); return Ok("秒杀完结,无库存"); } Console.WriteLine(Thread.CurrentThread.ManagedThreadId + "--秒杀胜利;"); //库存减1 sto.count = sto.count - 1; demoDbContext.SaveChanges(); } return Ok("秒杀完结");
- 数据库数量为10
如图: - 用jmeter并发10个线程
如图:
- 运行后果如下: 
分布式锁
条件
- 启动两个实例 5000/5001
- Nginx
- jmeter
步骤
外围代码
public class RedisLock { public readonly ConnectionMultiplexer connectionMultiplexer; private IDatabase database = null; public RedisLock() { connectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:6380"); database = connectionMultiplexer.GetDatabase(0); } /// <summary> /// 加锁 /// </summary> public void Lock() { while (true) // { //redis_lock 锁名称 // Thread.CurrentThread.ManagedThreadId 线程名称 // TimeSpan.FromSeconds(10) 设置过期工夫 避免死锁 bool flag = database.LockTake("redis_lock", Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(10)); //true :加锁胜利 false:加锁失败 if (flag) { break; } Thread.Sleep(10);//避免死锁 等待时间 开释资源。 } } /// <summary> /// 开释锁 /// </summary> public void UnLock() { database.LockRelease("redis_lock", Thread.CurrentThread.ManagedThreadId); connectionMultiplexer.Close(); } }
控制器中应用
[HttpGet] [Route("SubStock")] public IActionResult SubStock() { RedisLock redisLock = new RedisLock(); redisLock.Lock(); Stock sto = new Stock(); sto = demoDbContext.stock.Where(p => p.ID == 1).FirstOrDefault(); if (sto.count == 0) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId + "---秒杀完结,无库存"); //redisLock.UnLock(); return Ok("秒杀完结,无库存"); } Console.WriteLine(Thread.CurrentThread.ManagedThreadId + "--秒杀胜利;"); //库存减1 sto.count = sto.count - 1; demoDbContext.SaveChanges(); redisLock.UnLock(); return Ok("秒杀完结"); }
- 运行两个实例
如图: - 启动Nginx
如图: - 数据库库存
如图: - jmeter并发10个线程
- 运行后果如下
- 分布式锁的应用场景
当集群零碎中批改某个字段值时应用分布式锁。 - 分布式锁的设计思路
比方并发两个过程,当第一个过程加锁后,第二个过程加锁会失败,会休眠(10毫秒),直到第一个过程执行完业务代码并开释锁,如果第一个过程解决业务代码超过10毫秒,redis的过期工夫也是10毫秒,那么第二个过程进行加锁执行业务代码并开释锁。
备注:休眠的毫秒数可依据本人业务代码定义,毫秒数最好和redis过期工夫统一。
## 二、Redis集群
第一代集群 主从集群
- 如图:
- 毛病
只有一个master,当maset宕机后,整个redis集群零碎无奈应用。
- 如图:
第二代集群 哨兵集群
如图
第二代集群比第一代集群多了一个sentinel监督的角色,当主节点宕机后,sentinel会从多个从节点中抉择一个为主节点。
毛病
- 只有一个master,无奈解决高并发写的问题。
- 无奈存储海量数据。
第三代集群
- 如图:
长处与毛病
长处
- 解决高并发写。
- 存储海量数据。
毛病
- 耗费资源比拟大。
实现
条件
- windows 环境
Redis
- 网盘下载地址
链接:https://pan.baidu.com/s/1-rde...
提取码:liiz
- 网盘下载地址
Ruby
- 网盘下载地址
链接:https://pan.baidu.com/s/1NEnV...
提取码:lf10
- 网盘下载地址
Ruby 驱动
- 网盘下载地址
链接:https://pan.baidu.com/s/1LkpT...
提取码:7wn6
- 网盘下载地址
调配主从工具
- 网盘下载地址
链接:https://pan.baidu.com/s/18ah0...
提取码:0e02
- 网盘下载地址
步骤
配置集群文件 (6个实例) 配置6个配置文件【并将6个配置拷贝到redis根目录下】 ==配置不能有中文正文也不行!!!!==
port 6380 #端口 bind 127.0.0.1 #IP地址 appendonly yes #数据保留格局为aof appendfilename "appendonly.6380.aof" #数据保留文件 cluster-enabled yes #是否开启集群 cluster-config-file nodes.6380.conf #集群节点配置文件 cluster-node-timeout 15000 #节点超时工夫 cluster-slave-validity-factor 10 #验证slaver节点次数 cluster-migration-barrier 1 # cluster-require-full-coverage yes #master节点和slaver节点之间是否全量复制
执行所有实例
redis-server.exe redis.6380.conf redis-server.exe redis.6381.conf redis-server.exe redis.6382.conf redis-server.exe redis.6383.conf redis-server.exe redis.6384.conf redis-server.exe redis.6385.conf
如图:
装置 ruby
ruby --version #验证是否装置胜利
redis-cluster 驱动装置命令
#进入 ruby装置目录bin文件下执行装置命令 ruby gem install --local D:\Assembly\redis\Windows\redis-cluster\redis-3.2.2.gem #驱动文件门路
执行调配主从工具脚本
ruby redis-trib.rb create --replicas 1 127.0.0.1:6380 127.0.0.1:6381 127.0.0.1:6382 127.0.0.1:6383 127.0.0.1:6384 127.0.0.1:6385 #写入所有的实例地址和端口号# --replicas 1:是否调配3主3从 一个主节点和从节点
如图:
- 查看是否调配胜利
当6个实例不停输入日志,阐明曾经调配胜利。
redis集群外部关系结构图
如图:在redis集群中,每个节点都是互相通信的,用的协定是Gossip协定。
redis集群外部数据存储原理
Slot槽 主节点有槽[平均分配] 从节点没有槽 总共有16384个槽
ruby redis-trib.rb check 127.0.0.1:6380
- Hash算法
- 取模算法
当客户端将数据写入节点中时,节点会将key应用hash算法取到一个固定长度数值,而后对槽总数【16384】用取模算法进行取模,失去的数值后在到各个主节点中查看数值在哪个节点的槽数数值之间,在将数据写到那个主节点中。
- 如图: