共计 3122 个字符,预计需要花费 8 分钟才能阅读完成。
就我之前因为在解决 jpa 长久化对象上下文
(文:https://segmentfault.com/a/1190000043581830)
时,parallelStream 并行流给我的印象就是会读不到父线程的上下文的,所以应该在父线程里的事务和在 parallelStream 里的事务应该是辨别的,而不是共用同一个事务的,然而明天因为一个锁超时的问题,发现并没有那么简略,上面咱们一步一步来验证。
首先说下我锁超时的场景:具体的业务我不讲了,就说下伪代码
@PostMapping("/saveUser") | |
@Transactional | |
public void saveUser(@RequestBody List<Complex> list) {list.parallelStream().forEach(complex->{Integer appId = complex.getAppId(); | |
Integer userId = complex.getUserId(); | |
GeneratedKeyHolder keyHolder = new GeneratedKeyHolder(); | |
String sql = "insert ignore into open_app_user (app_id, open_id, user_status, creator, modifier, create_time, modify_time, status, version) values ("+appId+","+userId+",0,1,1,now(),now(),1,1)"; | |
int id = jdbcTemplate.update(con -> con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS), keyHolder); | |
}); | |
//todo 业务逻辑... | |
} |
这里我有个批量保留的逻辑,须要先保留一个两头表 open_app_user 表(该表 app_id 和 open_id 是联结惟一键)取得 id,拿到用户的 open_app_user_id 后再进行其余业务逻辑,这里按我原来的了解是尽管我在 controller 的办法上加了 @Transactional 注解,然而 parallelStream 里的事务应该都是独立的,不会是同一个事务,所以即便有数据反复,第一个线程插入后,第二个线程也只会插入失败(不会报错,因为我加了 ignore),所以即便并行也不会有问题的,然而却产生了锁超时的问题。
查看锁超时以及定位的操作能够看我后面的文章,通过查找 mysql 的
select * from information_schema.INNODB_TRX; | |
select * from performance_schema.data_lock_waits; | |
select * from performance_schema.data_locks; |
定位到了这里,然而我也百思不得其解,为啥会锁超时呢,这里应该都是马上执行就马上开释了啊,难道是其中的事务没有提交?
因为当初都是 spring 的申明式事务管理,spring 是在有 @Transactional 注解的状况下,执行完了才提交事务,在没有 @Transactional 注解的状况下,每个办法都差不多能够了解成原子,比方我下面的 jdbcTemplate.update()这个办法就是一个事务,执行完了就间接提交事务了。
因为 spring 是把事务上下文放在 ThreadLocal 里了,次要是用 TransactionSynchronizationManager 这个类来治理,所以我写了一个 demo 来进行验证
@GetMapping("/get") | |
@Transactional | |
public String get() {List<Complex> list = new ArrayList<>(); | |
for (int i = 0; i < 10; i++) {list.add(new Complex(1, 1)); | |
} | |
list.parallelStream().forEach(complex->{Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap(); | |
System.err.println("count:"+resourceMap.size()); | |
Integer appId = complex.getAppId(); | |
Integer userId = complex.getUserId(); | |
String sql = "insert ignore into open_app_user (app_id, open_id, user_status, creator, modifier, create_time, modify_time, status, version) values ("+appId+","+userId+",0,1,1,now(),now(),1,1)"; | |
int update = jdbcTemplate.update(sql); | |
}); | |
return "hello, world!"; | |
} |
乏味的事件产生了,我在正文掉 @Transactional 注解时,代码里 resourceMap.size()返回的内容是居然不一样,因为我的 list 有 10 条记录,差不多就是 10 个并行,然而我的输入却是:
count:1 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 |
没有正文掉 @Transactional 注解时,输入是:
count:2 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 | |
count:0 |
并且还会呈现锁超时的景象,奇怪的中央就是为啥我用的 parallelStream 会有线程上下文里的值,我并没有做什么操作,而且 10 个并行里只有一个(这里并不是阐明固定只有一次,上面会阐明)取得了线程上下文的信息,我又进一步测试,伪代码改成:
@GetMapping("/get") | |
public void get() {List<Complex> list = new ArrayList<>(); | |
for (int i = 0; i < 10; i++) {list.add(new Complex(1, 1)); | |
} | |
ThreadLocal local = new ThreadLocal(); | |
local.set("parent_set_value"); | |
list.parallelStream().forEach(complex->{System.err.println(local.get()); | |
}); | |
} |
后果如我所料,输入为:
parent_set_value | |
null | |
null | |
null | |
null | |
null | |
null | |
null | |
null | |
null |
应用 parallelStream 并不齐全都是另开了线程,其中有一个是属于主线程的,能够应用 System.err.println(Thread.currentThread().getName()); 查看以后线程的名称,我发现 parallelStream 会把以后主线程也作为一个执行线程去执行工作
前面我再去理解了一下 parallelStream 的实现,原来参加并行处理的线程有主线程以及 ForkJoinPool 中的 worker 线程,而我这里导致锁超时,就是因为用到了主线程,所以在并行插入的时候,有个解决有事务上下文,导致始终没有提交事务(@Transactional 正文办法的办法没有跑完,这里也不可能跑完),所以其余线程的插入就始终期待这个,产生了锁超时报错