关于java:百万数据模拟插入数据库

38次阅读

共计 10112 个字符,预计需要花费 26 分钟才能阅读完成。

模拟出 300 万的数量插入数据库,次要思维是通过线程池和 atomic 来实现,借鉴分片思维。
先疾速过根本的配置。

数据库

/*
 Navicat Premium Data Transfer

 Source Server         :
 Source Server Type    : MySQL
 Source Server Version : 50735
 Source Host           : :3306
 Source Schema         : high_concurrency

 Target Server Type    : MySQL
 Target Server Version : 50735
 File Encoding         : 65001

 Date: 07/08/2021 10:52:33
*/

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (`id` int(11) NOT NULL,
  `name` varchar(45) DEFAULT NULL,
  `createdTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `updatedTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `index` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户测试表';

SET FOREIGN_KEY_CHECKS = 1;

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.concurrency</groupId>
    <artifactId>currency01</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>currency01</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- Druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

application.yml(其中数据库明码和 ip 本人配置)

server:
  port: 8080
mybatis:
  mapper-locations: classpath:mapper/*Mapper.xml
logging:
  level:
    com.concurrency.currency01.mapper: debug

spring:
  # 数据源配置
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://xxxx:3306/high_concurrency?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
    username: root
    password: xxx
    type: com.alibaba.druid.pool.DruidDataSource
    platform: mysql
    # 连接池配置
    druid:
      # 初始化大小,最小,最大
      initial-size: 5
      max-active: 100
      min-idle: 1
      # 配置获取连贯期待超时的工夫
      max-wait: 60000
      # 配置距离多久才进行一次检测,检测须要敞开的闲暇连贯,单位毫秒
      time-between-eviction-runs-millis: 60000
      # 配置一个连贯在池中最小生存工夫
      min-evictable-idle-time-millis: 300000
      # 用来检测连贯是否无效的 sql 必须是一个查问语句 留神没有此语句以下三个属性不会失效
      validation-query: SELECT 1 FROM DUAL
      # 偿还连贯时会执行 validationQuery 检测连贯是否无效, 开启会升高性能, 默认为 true
      test-on-return: false
      # 申请连贯时会执行 validationQuery 检测连贯是否无效, 开启会升高性能, 默认为 true
      test-on-borrow: true
      # 申请连贯的时候检测,如果闲暇工夫大于 timeBetweenEvictionRunsMillis,执行 validationQuery 检测连贯是否无效。test-while-idle: true
      # 配置监控统计拦挡的 Filter,去掉后监控界面 SQL 无奈统计,wall 用于防火墙
      filters: stat,wall
      # 通过 connection-properties 属性关上 mergeSql 性能;慢 SQL 记录
      connection-properties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
      # 配置 DruidStatFilter
      web-stat-filter:
        enabled: true
        url-pattern: /*
        exclusions: .js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*
      # 配置 DruidStatViewServlet
      stat-view-servlet:
        url-pattern: /druid/*
        # IP 白名单,没有配置或者为空,则容许所有拜访
        #allow: 127.0.0.1
        # IP 黑名单,若白名单也存在,则优先应用
        #deny: 192.168.31.253
        # 禁用 HTML 中 Reset All 按钮
        reset-enable: true
        # 登录用户名 / 明码
        login-username: root
        login-password: 123456
        # 留神 此处必须开启,否则无法访问 druid 监控面板
        enabled: true
      use-global-data-source-stat: true

用户实体类

package com.concurrency.currency01.domain;

import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
    * 用户测试表
    */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private Integer id;

    private String name;

    private Date createdtime;

    private Date updatedtime;
}

usermapper

package com.concurrency.currency01.mapper;

import com.concurrency.currency01.domain.User;

import java.util.List;

public interface UserMapper {int deleteByPrimaryKey(Integer id);

    int insert(User record);

    int insertSelective(User record);

    User selectByPrimaryKey(Integer id);

    int updateByPrimaryKeySelective(User record);

    int updateByPrimaryKey(User record);

    int batchSave(List list);

}

usermapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.concurrency.currency01.mapper.UserMapper">
  <resultMap id="BaseResultMap" type="com.concurrency.currency01.domain.User">
    <!--@mbg.generated-->
    <!--@Table `user`-->
    <id column="id" jdbcType="INTEGER" property="id" />
    <result column="name" jdbcType="VARCHAR" property="name" />
    <result column="createdTime" jdbcType="TIMESTAMP" property="createdtime" />
    <result column="updatedTime" jdbcType="TIMESTAMP" property="updatedtime" />
  </resultMap>
  <sql id="Base_Column_List">
    <!--@mbg.generated-->
    id, `name`, createdTime, updatedTime
  </sql>
  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
    <!--@mbg.generated-->
    select 
    <include refid="Base_Column_List" />
    from `user`
    where id = #{id,jdbcType=INTEGER}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
    <!--@mbg.generated-->
    delete from `user`
    where id = #{id,jdbcType=INTEGER}
  </delete>
  <insert id="insert" parameterType="com.concurrency.currency01.domain.User">
    <!--@mbg.generated-->
    insert into `user` (id, `name`, createdTime, 
      updatedTime)
    values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{createdtime,jdbcType=TIMESTAMP}, 
      #{updatedtime,jdbcType=TIMESTAMP})
  </insert>
  <insert id="insertSelective" parameterType="com.concurrency.currency01.domain.User">
    <!--@mbg.generated-->
    insert into `user`
    <trim prefix="(" suffix=")" suffixOverrides=",">
      <if test="id != null">
        id,
      </if>
      <if test="name != null">
        `name`,
      </if>
      <if test="createdtime != null">
        createdTime,
      </if>
      <if test="updatedtime != null">
        updatedTime,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides=",">
      <if test="id != null">
        #{id,jdbcType=INTEGER},
      </if>
      <if test="name != null">
        #{name,jdbcType=VARCHAR},
      </if>
      <if test="createdtime != null">
        #{createdtime,jdbcType=TIMESTAMP},
      </if>
      <if test="updatedtime != null">
        #{updatedtime,jdbcType=TIMESTAMP},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.concurrency.currency01.domain.User">
    <!--@mbg.generated-->
    update `user`
    <set>
      <if test="name != null">
        `name` = #{name,jdbcType=VARCHAR},
      </if>
      <if test="createdtime != null">
        createdTime = #{createdtime,jdbcType=TIMESTAMP},
      </if>
      <if test="updatedtime != null">
        updatedTime = #{updatedtime,jdbcType=TIMESTAMP},
      </if>
    </set>
    where id = #{id,jdbcType=INTEGER}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.concurrency.currency01.domain.User">
    <!--@mbg.generated-->
    update `user`
    set `name` = #{name,jdbcType=VARCHAR},
      createdTime = #{createdtime,jdbcType=TIMESTAMP},
      updatedTime = #{updatedtime,jdbcType=TIMESTAMP}
    where id = #{id,jdbcType=INTEGER}
  </update>

  <insert id="batchSave">
    insert into user (id, `name`) values
      <foreach collection="list" item="item" index="index" separator=",">
         (#{item.id,jdbcType=INTEGER}, #{item.name,jdbcType=VARCHAR} )
    </foreach>

  </insert>

</mapper>

用户管制层

package com.concurrency.currency01.controller;

import com.concurrency.currency01.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
    @Autowired
    private UserService userService;

    @PostMapping("/insert")
    public String ok() {return userService.hanlder();
    }
}

接下来是最外围的代码
这里借鉴分片思维,将三百万的数据按每次分片 20000 条数据进行插入,首先开启 100 的定长线程池,

ExecutorService pool = Executors.newFixedThreadPool(100);

通过 runnable,存入 20000 条数据,由 atomic 计算插入的数量,最初通过执行线程池的命令进行数据插入

 atomicInt.addAndGet(userMapper.batchSave(userList));
pool.execute(run);
package com.concurrency.currency01.service.impl;

import com.concurrency.currency01.domain.User;
import com.concurrency.currency01.mapper.UserMapper;
import com.concurrency.currency01.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class UserServiceImpl implements UserService {
    @Autowired
    private UserMapper userMapper;
    /**
     * 百万数据插入
     * @return
     */
    @Override
    public String hanlder() {ExecutorService pool = Executors.newFixedThreadPool(100);
        int totalPageNo = 150;
        int pageSize = 20000;
        long start = System.currentTimeMillis();
        AtomicInteger atomicInt = new AtomicInteger();
        for(int currentPageNo = 0; currentPageNo < totalPageNo; currentPageNo++){
           int finalCurrentPageNo = currentPageNo;
          Runnable run = new Runnable() {
             @Override
             public void run() {List userList=new ArrayList<>();
               for(int i=1;i<=pageSize;i++){
                 int id=i+ finalCurrentPageNo *pageSize;
                 User user =new User();
                 user.setId(id);
                 user.setName("zhouzhou:"+id);
                 userList.add(user);
               }
               atomicInt.addAndGet(userMapper.batchSave(userList));
               if(atomicInt.get() ==(totalPageNo*pageSize)){System.out.println("sync data to db, it  has spent" +(System.currentTimeMillis()-start)+"ms");
               }
             }
          };
          try {Thread.sleep(5);
          } catch (InterruptedException e) {e.printStackTrace();
          }
          pool.execute(run);
        }
        return  "ok";
    }
}




源码地址:GitHub

正文完
 0