我的项目代码基于:MySql 数据,开发框架为:SpringBoot、Mybatis
开发语言为:Java8
前言
公司业务中遇到一个需要,须要同时批改最多约 5 万条数据,而且还不反对批量或异步批改操作。于是只能写个 for 循环操作,但操作耗时太长,只能一步一步寻找其余解决方案。
具体操作如下:
一、循环操作的代码
先写一个最简略的 for 循环代码,看看耗时状况怎么样。
/***
* 一条一条顺次对 50000 条数据进行更新操作
* 耗时:2m27s,1m54s
*/
@Test
void updateStudent() {List<Student> allStudents = studentMapper.getAll();
allStudents.forEach(s -> {
// 更新老师信息
String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
}
循环批改整体耗时约 1 分 54 秒,且代码中没有手动事务管制应该是主动事务提交,所以每次操作事务都会提交所以操作比较慢,咱们先对代码中增加手动事务管制,看查问效率怎么。
最新面试题整顿:https://www.javastack.cn/mst/
二、应用手动事务的操作代码
批改后的代码如下:
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
/**
* 因为心愿更新操作 一次性实现,须要手动管制增加事务
* 耗时:24s
* 从测试后果能够看出,增加事务后插入数据的效率有显著的晋升
*/
@Test
void updateStudentWithTrans() {List<Student> allStudents = studentMapper.getAll();
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
allStudents.forEach(s -> {
// 更新老师信息
String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
dataSourceTransactionManager.commit(transactionStatus);
} catch (Throwable e) {dataSourceTransactionManager.rollback(transactionStatus);
throw e;
}
}
增加手动事务操管制后,整体耗时约 24 秒,这绝对于主动事务提交的代码,快了约 5 倍,对于大量循环数据库提交操作,增加手动事务能够无效进步操作效率。
三、尝试多线程进行数据批改
增加数据库手动事务后操作效率有明细进步,但还是比拟长,接下来尝试多线程提交看是不是可能再快一些。
先增加一个 Service 将批量批改操作整合一下,具体代码如下:
StudentServiceImpl.java
@Service
public class StudentServiceImpl implements StudentService {
@Autowired
private StudentMapper studentMapper;
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
@Override
public void updateStudents(List<Student> students, CountDownLatch threadLatch) {TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
System.out.println("子线程:" + Thread.currentThread().getName());
try {
students.forEach(s -> {
// 更新老师信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
dataSourceTransactionManager.commit(transactionStatus);
threadLatch.countDown();} catch (Throwable e) {e.printStackTrace();
dataSourceTransactionManager.rollback(transactionStatus);
}
}
}
批量测试代码,咱们采纳了多线程进行提交,批改后测试代码如下:
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
@Autowired
private StudentService studentService;
/**
* 对用户而言,27s 任是一个较长的工夫,咱们尝试用多线程的形式来经行批改操作看是否放慢处理速度
* 预计创立 10 个线程,每个线程进行 5000 条数据批改操作
* 耗时统计
* 1 线程数:1 耗时:25s
* 2 线程数:2 耗时:14s
* 3 线程数:5 耗时:15s
* 4 线程数:10 耗时:15s
* 5 线程数:100 耗时:15s
* 6 线程数:200 耗时:15s
* 7 线程数:500 耗时:17s
* 8 线程数:1000 耗时:19s
* 8 线程数:2000 耗时:23s
* 8 线程数:5000 耗时:29s
*/
@Test
void updateStudentWithThreads() {
// 查问总数据
List<Student> allStudents = studentMapper.getAll();
// 线程数量
final Integer threadCount = 100;
// 每个线程解决的数据量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 创立多线程解决工作
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
// 每个线程解决的数据
List<Student> threadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
studentThreadPool.execute(() -> {studentService.updateStudents(threadDatas, threadLatchs);
});
}
try {
// 倒计时锁设置超时工夫 30s
threadLatchs.await(30, TimeUnit.SECONDS);
} catch (Throwable e) {e.printStackTrace();
}
System.out.println("主线程实现");
}
多线程提交批改时,咱们尝试了不同线程数对提交速度的影响,具体能够看上面表格,
多线程批改 50000 条数据时 不同线程数耗时比照(秒)
依据表格,咱们线程数增大提交速度并非始终增大,在当前情况下约在 2 - 5 个线程数时,提交速度最快(理论线程数还是须要依据服务器配置理论测试)。
另外,MySQL 系列面试题和答案全副整顿好了,微信搜寻 Java 技术栈,在后盾发送:面试,能够在线浏览。
四、基于两个 CountDownLatch 管制多线程事务提交
因为多线程提交时,每个线程事务时独自的,无奈保障一致性,咱们尝试给多线程增加事务管制,来保障每个线程都是在插入数据实现后在提交事务,
这里咱们应用两个 CountDownLatch 来管制主线程与子线程事务提交,并设置了超时工夫为 30 秒。咱们对代码进行了一点批改:
@Override
public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
System.out.println("子线程:" + Thread.currentThread().getName());
try {
students.forEach(s -> {
// 更新老师信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
} catch (Throwable e) {taskStatus.setIsError();
} finally {threadLatch.countDown(); // 切换到主线程执行
}
try {mainLatch.await(); // 期待主线程执行
} catch (Throwable e) {taskStatus.setIsError();
}
// 判断是否有谬误,如有谬误 就回滚事务
if (taskStatus.getIsError()) {dataSourceTransactionManager.rollback(transactionStatus);
} else {dataSourceTransactionManager.commit(transactionStatus);
}
}
/**
* 因为每个线程都是独自的事务,须要增加对线程事务的对立管制
* 咱们这边应用两个 CountDownLatch 对子线程的事务进行管制
*/
@Test
void updateStudentWithThreadsAndTrans() {
// 查问总数据
List<Student> allStudents = studentMapper.getAll();
// 线程数量
final Integer threadCount = 4;
// 每个线程解决的数据量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 创立多线程解决工作
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交
StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程工作是否有谬误
for (int i = 0; i < threadCount; i++) {
// 每个线程解决的数据
List<Student> threadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength)
.collect(Collectors.toList());
studentThreadPool.execute(() -> {studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
});
}
try {
// 倒计时锁设置超时工夫 30s
boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
if (!await) { // 期待超时,事务回滚
taskStatus.setIsError();}
} catch (Throwable e) {e.printStackTrace();
taskStatus.setIsError();}
mainLatch.countDown(); // 切换到子线程执行
studentThreadPool.shutdown(); // 敞开线程池
System.out.println("主线程实现");
}
本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过 10 个时,执行时就报错。具体谬误内容如下:
Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
... 7 more
谬误的大抵意思时,不能为数据库事务关上 jdbc Connection
,连贯在 30s 的时候超时了。因为后面启动的十个线程须要期待主线程实现后能力提交,所以始终占用连贯未开释,造成前面的过程创立连贯超时。
看谬误日志中谬误的起源是 HikariPool
,咱们来重新配置一下这个连接池的参数,将最大连接数批改为 100,具体配置如下:
# 连接池中容许的最小连接数。缺省值:10
spring.datasource.hikari.minimum-idle=10
# 连接池中容许的最大连接数。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
# 主动提交
spring.datasource.hikari.auto-commit=true
# 一个连贯 idle 状态的最大时长(毫秒),超时则被开释(retired),缺省:10 分钟
spring.datasource.hikari.idle-timeout=30000
# 一个连贯的生命时长(毫秒),超时而且没被应用则被开释(retired),缺省:30 分钟,倡议设置比数据库超时时长少 30 秒
spring.datasource.hikari.max-lifetime=1800000
# 期待连接池调配连贯的最大时长(毫秒),超过这个时长还没可用的连贯则产生 SQLException,缺省:30 秒
再次执行测试发现没有报错,批改线程数为 20 又执行了一下,同样执行胜利了。另外,关注公众号 Java 技术栈,在后盾回复:面试,能够获取我整顿的 Java 系列面试题和答案,十分齐全。
五、基于 TransactionStatus 汇合来管制多线程事务提交
在共事举荐下咱们应用事务汇合来进行多线程事务管制,次要代码如下
@Service
public class StudentsTransactionThread {
@Autowired
private StudentMapper studentMapper;
@Autowired
private StudentService studentService;
@Autowired
private PlatformTransactionManager transactionManager;
List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentWithThreadsAndTrans() throws InterruptedException {
// 查问总数据
List<Student> allStudents = studentMapper.getAll();
// 线程数量
final Integer threadCount = 2;
// 每个线程解决的数据量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 创立多线程解决工作
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
AtomicBoolean isError = new AtomicBoolean(false);
try {for (int i = 0; i < threadCount; i++) {
// 每个线程解决的数据
List<Student> threadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
studentThreadPool.execute(() -> {
try {
try {studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
} catch (Throwable e) {e.printStackTrace();
isError.set(true);
}finally {threadLatchs.countDown();
}
} catch (Exception e) {e.printStackTrace();
isError.set(true);
}
});
}
// 倒计时锁设置超时工夫 30s
boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
// 判断是否超时
if (!await) {isError.set(true);
}
} catch (Throwable e) {e.printStackTrace();
isError.set(true);
}
if (!transactionStatuses.isEmpty()) {if (isError.get()) {transactionStatuses.forEach(s -> transactionManager.rollback(s));
} else {transactionStatuses.forEach(s -> transactionManager.commit(s));
}
}
System.out.println("主线程实现");
}
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {
// 应用这种形式将事务状态都放在同一个事务外面
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比拟平安些。TransactionStatus status = transactionManager.getTransaction(def); // 取得事务状态
transactionStatuses.add(status);
students.forEach(s -> {
// 更新老师信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
System.out.println("子线程:" + Thread.currentThread().getName());
}
因为这个中形式去后面形式雷同,须要期待线程执行实现后才会提交事务,所有任会占用 Jdbc 连接池,如果线程数量超过连接池最大数量会产生连贯超时。所以在应用过程中任要控制线程数量,
六、应用 union 连贯多个 select 实现批量 update
有些状况写不反对,批量 update,但反对 insert 多条数据,这个时候可尝试将须要更新的数据拼接成多条 select 语句,而后应用 union 连接起来,再应用 update 关联这个数据进行 update,具体代码演示如下:
update student,((select 1 as id,'teacher_A' as teacher) union
(select 2 as id,'teacher_A' as teacher) union
(select 3 as id,'teacher_A' as teacher) union
(select 4 as id,'teacher_A' as teacher)
/* ....more data ... */
) as new_teacher
set
student.teacher=new_teacher.teacher
where
student.id=new_teacher.id
这种形式在 Mysql 数据库没有配置 allowMultiQueries=true
也能够实现批量更新。
总结
- 对于大批量数据库操作,应用手动事务提交能够很多水平上进步操作效率
- 多线程对数据库进行操作时,并非线程数越多操作工夫越快,按上述示例大概在 2 - 5 个线程时操作工夫最快。
- 对于多线程阻塞事务提交时,线程数量不能过多。
- 如果能有方法实现批量更新那是最好
版权申明:本文为 CSDN 博主「圣心」的原创文章,遵循 CC 4.0 BY-SA 版权协定,转载请附上原文出处链接及本申明。原文链接:https://blog.csdn.net/qq27376…
近期热文举荐:
1.1,000+ 道 Java 面试题及答案整顿 (2022 最新版)
2. 劲爆!Java 协程要来了。。。
3.Spring Boot 2.x 教程,太全了!
4. 别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!
5.《Java 开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞 + 转发哦!