乐趣区

搭建node服务二操作MySQL

为了进行复杂信息的存储和查询,服务端系统往往需要数据库操作。数据库分为关系型数据库和非关系型数据库,关系型数据库有 MySQL、Oracle、SQL Server 等,非关系型数据库有 Redis(常用来做缓存)、MongoDB 等。MySQL 是目前很流行的数据库,本文将要介绍如何在 node 服务中进行 MySQL 数据库操作。

一、安装依赖

npm install mysql --save

或者

yarn add mysql

二、建立连接

要想进行数据库操作就需要和数据库建立连接,然后通过连接进行数据库的操作。MySQL 的数据库连接方式有以下几种:

  • mysql.createConnection() 每次请求建立一个连接
  • mysql.createPool() 创建连接池,从连接池中获取连接
  • mysql.createPoolCluster() 创建连接池集群,连接池集群可以提供多个主机连接

mysqljs 文档中推荐使用第一种方式:每次请求建立一个连接,但是由于频繁的建立、关闭数据库连接,会极大的降低系统的性能,所以我选择了使用连接池的方式,如果对性能有更高的要求,安装了 MySQL 集群,可以选择使用连接池集群。

1. 数据库配置

将数据库相关的配置添加到公用的配置文件中,方便项目的初始化。

  • config.js
module.exports = {
    …
    // mysql 数据库配置
    mysql: {
        // 主机
        host: 'localhost',
        // 端口
        port: 3306,
        // 用户名
        user: 'root',
        // 密码
        password: '123456',
        // 数据库名
        database: 'server-demo',
        // 连接池允许创建的最大连接数,默认值为 10
        connectionLimit: 50,
        // 允许挂起的最大连接数, 默认值为 0, 代表挂起的连接数无限制
        queueLimit: 0
    }
};

connectionLimit 和 queueLimit 是数据连接池特有的配置项。

  • connectionLimit 是指连接池允许创建的最大连接数,默认值为 10。当获取连接时,如果连接池中有空闲的连接则直接返回一个空闲连接。如果所有连接都被占用,则判断连接池中的连接数是否达到了允许的最大数,如果未达到则创建新的连接,如果已达到则获取连接的请求挂起,等待其他请求完成操作后释放的连接。
  • queueLimit 是指允许挂起的最大连接数,默认值为 0,代表挂起的连接数无限制。当连接池中允许创建的所有连接都被占用时,获取连接的请求挂起,等待可用的连接,所有挂起的请求形成一个队列,queueLimit 则是指这个队列的最大长度。需要注意的是,当 queueLimit 为 0 时并不表示不允许挂起,而是表示对挂起的数目没有限制。

2. 创建连接池

  • db/pool.js
/**
 * 数据库连接池
 */
const mysql = require('mysql');
const config = require('../config');

// 创建数据库连接池
const pool = mysql.createPool(config.mysql);

pool.on('acquire', function (connection) {console.log(` 获取数据库连接 [${connection.threadId}]`);
});
pool.on('connection', function (connection) {console.log(` 创建数据库连接 [${connection.threadId}]`);
});
pool.on('enqueue', function () {console.log('正在等待可用数据库连接');
});
pool.on('release', function (connection) {console.log(` 数据库连接 [${connection.threadId}] 已释放 `);
});

module.exports = pool;

创建数据库连接池 pool 后,就可以通过 pool 获取数据库连接了,另外通过监听连接池的事件可以了解连接池中连接的使用情况。
如果将 connectionLimit 设为 2,queueLimit 设为 0,当同时有 5 个请求获取数据库连接时,线程池的事件日志如下:

 正在等待可用数据库连接
正在等待可用数据库连接
正在等待可用数据库连接
创建数据库连接 [1011]
获取数据库连接 [1011]
数据库连接 [1011] 已释放
获取数据库连接 [1011]
创建数据库连接 [1012]
获取数据库连接 [1012]
数据库连接 [1011] 已释放
获取数据库连接 [1011]
数据库连接 [1012] 已释放
获取数据库连接 [1012]
数据库连接 [1011] 已释放
数据库连接 [1012] 已释放 

由于线程池允许的最大连接数是 2,5 个请求中会有 2 个请求能够得到连接,另外 3 个请求挂起等待可用连接。由于创建数据库连接的代价比较大,线程池在创建连接时采用懒汉式,也就是,用到时才创建。先得到连接的请求在完成操作后释放连接,放回到连接池,然后挂起的请求从线程池取出空闲的连接进行操作。

三、执行操作

由于 mysql 模块的接口都为回调方式的,为了操作方便简单地将接口封装为 Promise,相关方法封装如下:

const pool = require('./pool');

// 获取连接
function getConnection () {return new Promise((resolve, reject) => {pool.getConnection((err, connection) => {if (err) {console.error('获取数据库连接失败!', err)
                reject(err);
            } else {resolve(connection);
            }
        });
    });
}

// 开始数据库事务
function beginTransaction (connection) {return new Promise((resolve, reject) => {
        connection.beginTransaction(err => {if (err) {reject(err);
            } else {resolve();
            }
        });
    });
}

// 提交数据库操作
function commit (connection) {return new Promise((resolve, reject) => {
        connection.commit(err => {if (err) {reject(err);
            } else {resolve();
            }
        });
    })
}

// 回滚数据库操作
function rollback (connection) {return new Promise((resolve, reject) => {
        connection.rollback(err => {if (err) {reject(err);
            } else {resolve();
            }
        });
    })
}

1. 执行普通操作

对于不需要使用事务的普通操作,获取数据库连接 connection 后,使用 connection 进行数据库操作,完成后释放连接到连接池,则执行完成一次操作。

  • db/execute.js

/**
 * 执行数据库操作【适用于不需要事务的查询以及单条的增、删、改操作】* 示例:* let func = async function(conn, projectId, memberId) {...};
 * await execute(func, projectId, memberId);
 * @param func 具体的数据库操作异步方法(第一个参数必须为数据库连接对象 connection)* @param params func 方法的参数(不包含第一个参数 connection)* @returns {Promise.<*>} func 方法执行后的返回值
 */
async function execute (func, ...params) {
    let connection = null;
    try {connection = await getConnection()
        let result = await func(connection, ...params);
        return result
    } finally {connection && connection.release && connection.release();
    }
}

2. 执行事务操作

对于很多业务都需要执行事务操作,例如:银行转账,A 账户转账给 B 账户 100 元,这个业务操作需要执行两步,从 A 账户减去 100 元,然后给 B 账户增加 100 元。两个子操作必须全部执行成功才能完成完整的业务操作,如果任意子操作执行失败就需要撤销之前的操作,进行回滚。

对于需要使用事务的操作,获取数据库连接 connection 后,首先需要调用 connection.beginTransaction() 开始事务,然后使用 connection 进行多步操作,完成后执行 connection.commit() 进行提交,则执行完成一次事务操作。如果在执行过程中出现了异常,则执行 connection.rollback() 进行回滚操作。

  • db/execute.js
/**
 * 执行数据库事务操作【适用于增、删、改多个操作的执行,如果中间数据操作出现异常则之前的数据库操作全部回滚】* 示例:* let func = async function(conn) {...};
 * await executeTransaction(func);
 * @param func 具体的数据库操作异步方法(第一个参数必须为数据库连接对象 connection)* @returns {Promise.<*>} func 方法执行后的返回值
 */
async function executeTransaction(func) {const connection = await getConnection();
    await beginTransaction(connection);

    let result = null;
    try {result = await func(connection);
        await commit(connection);
        return result
    } catch (err) {console.error('事务执行失败,操作回滚');
        await rollback(connection);
        throw err;
    } finally {connection && connection.release && connection.release();
    }
}

四、增删改查

增删改查是处理数据的基本原子操作,将这些操作根据操作的特点进行简单的封装。

  • db/curd.js
/**
 * 查询操作
 * @param connection 连接
 * @param sql SQL 语句
 * @param val SQL 参数
 * @returns {Promise} resolve 查询到的数据数组
 */
function query (connection, sql, val) {// console.info('sql 执行 query 操作:\n', sql, '\n', val);
    return new Promise((resolve, reject) => {connection.query(sql, val, (err, rows) => {if (err) {console.error('sql 执行失败!', sql, '\n', val);
                reject(err);
            } else {let results = JSON.parse(JSON.stringify(rows));
                resolve(results);
            }
        });
    });
}

/**
 * 查询单条数据操作
 * @param connection 连接
 * @param sql SQL 语句
 * @param val SQL 参数
 * @returns {Promise} resolve 查询到的数据对象
 */
function queryOne (connection, sql, val) {return new Promise((resolve, reject) => {query(connection, sql, val).then(
            results => {let result = results.length > 0 ? results[0] : null;
                resolve(result);
            },
            err => reject(err)
        )
    });
}

/**
 * 新增数据操作
 * @param connection 连接
 * @param sql SQL 语句
 * @param val SQL 参数
 * @param {boolean} skipId 跳过自动添加 ID, false: 自动添加 id,true: 不添加 id
 * @returns {Promise} resolve 自动生成的 id
 */
function insert (connection, sql, val, skipId) {
    let id = val.id;
    if (!id && !skipId) {id = uuid();
        val = {id, ...val};
    }
    return new Promise((resolve, reject) => {// console.info('sql 执行 insert 操作:\n', sql, '\n', val);
        connection.query(sql, val, (err, results) => {if (err) {console.error('sql 执行失败!', sql, '\n', val);
                reject(err);
            } else {resolve(id);
            }
        });
    });
}

/**
 * 更新操作
 * @param connection 连接
 * @param sql SQL 语句
 * @param val SQL 参数
 * @returns {Promise} resolve 更新数据的行数
 */
function update (connection, sql, val) {// console.info('sql 执行 update 操作:\n', sql, '\n', val);
    return new Promise((resolve, reject) => {connection.query(sql, val, (err, results) => {if (err) {console.error('sql 执行失败!', sql, '\n', val);
                reject(err);
            } else {resolve(results.affectedRows);
            }
        });
    });
}

/**
 * 删除操作
 * @param connection 连接
 * @param sql SQL 语句
 * @param val SQL 参数
 * @returns {Promise} resolve 删除数据的行数
 */
function del (connection, sql, val) {// console.info('sql 执行 delete 操作:\n', sql, '\n', val);
    return new Promise((resolve, reject) => {connection.query(sql, val, (err, results) => {if (err) {console.error('sql 执行失败!', sql, '\n', val);
                reject(err);
            } else {// console.log('delete result', results);
                resolve(results.affectedRows);
            }
        });
    });
}

五、代码分层

将代码分层可以降低代码的耦合度,提高可复用性、可维护性,这里将代码分成了 3 层:Dao 层、Service 层和 Controller 层。

  • DAO 层: 主要负责数据持久化工作;
  • Service 层: 主要负责业务模块的逻辑设计,此层的业务实现,可以调用 DAO 层的接口;
  • Controller 层: 负责具体的业务模块流程的控制,在此层可以调用 Service 层的接口。

1.DAO 层

  • dao/userDao.js
const {query, queryOne, update, insert, del} = require('../db/curd');

class UserDao {static async queryUserById (connection, id) {
        const sql = `SELECT user.id, user.account, user.name, user.email, user.phone,
                          user.birthday, user.enable, user.deleteFlag, user.creator,
                          user.createTime, user.updater, user.updateTime
                   FROM sys_user user
                   WHERE user.id = ?`;
        const user = await queryOne(connection, sql, id);
        return user;
    }
    …
}

module.exports = UserDao;

2.Service 层

  • service/userService.js

简单调用一个 DAO 层方法:

const {execute, executeTransaction} = require('../db/execute');
const UserDao = require('../dao/userDao');

class UserService {static async findUserById (id) {return await execute(UserDao.queryUserById, id);
}
…
}

module.exports = UserService;

对于复杂些的业务逻辑可以使用匿名函数来实现:

static async findUserWithRoles (id) {
    return await execute (async connection => {const user = await UserDao.queryUserById(connection, id);
        if (user) {user.roles = await RoleDao.queryRolesByUserId(connection, id);
        }
        return user;
    });
}

如果要执行事务操作,则需要使用 executeTransaction 方法:

static async updateUserRoleRelations (userId, roleIds) {
    return await executeTransaction(async connection => {const relations = await UserDao.queryUserRoleRelations(connection, userId);
        const oldRoleIds = relations.map(item => item.roleId);
        const newRoleIds = roleIds || [];
        // 新增的角色数组
        const addList = [];
        // 移除的角色数组
        const removeList = [];
        newRoleIds.forEach(roleId => {if (oldRoleIds.indexOf(roleId) === -1) {addList.push(roleId);
            }
        });
        oldRoleIds.forEach(roleId => {if (newRoleIds.indexOf(roleId) === -1) {removeList.push(roleId);
            }
        });

        if (addList.length > 0) {await UserDao.insertUserRoleRelations(connection, userId, addList);
        }
        if (removeList.length > 0) {await UserDao.deleteUserRoleRelations(connection, userId, removeList);
        }
    });
}

3.Controller 层

  • controler/userController.js
const UserService = require('../service/userService');

class UserControler {static async getUserById (ctx) {
        // 用户 ID
        const id = ctx.params.id;
        // 是否包含用户角色信息,如果 withRoles 为 "1" 表示需要包含角色信息
        const withRoles = ctx.query.withRoles;

        let user;
        if (withRoles === '1') {user = await UserService.findUserWithRoles(id);
        } else {user = await UserService.findUserById(id);
        }
        if (user) {ctx.body = user;} else {
            ctx.body = {
                code: 1004,
                msg: '用户不存在!'
            }
        }
    }
    …
}

module.exports = UserControler;

此示例基于 Koa 框架,controller 层实现完成后需要添加路由:

const router = new KoaRouter();
const UserController = require('./controler/userControler');

// 获取指定 ID 的用户
router.get('/users/:id', UserController.getUserById);

// 获取所有用户
router.get('/users', UserControler.getUsers);

对于 Koa 框架如何使用,这里不再介绍,路由添加完毕后,启动服务,即可使用这些接口,如果本地服务启动的端口为 3000,接口请求地址如下:

  • http://localhost:3000/users/3571a123-0454-49b4-a2bc-8b30a37f0b14
  • http://localhost:3000/users/3571a123-0454-49b4-a2bc-8b30a37f0b14?withRoles=1
  • http://localhost:3000/users/

六、说明

本文介绍了 mysql 模块的基本使用,对其进行了简单封装,并提供了使用示例。除了使用 mysql 模块来操作数据库,也可以使用 mysql2 模块,mysql2 的基本用法与 mysql 一致,另外 mysql2 还支持 Promise,使用起来更方便。本文相关的代码已提交到 GitHub 以供参考,项目地址:https://github.com/liulinsp/node-server-typeorm-demo。

作者:刘琳

退出移动版