本篇承接上一篇《Mysql 数据库的批量插入或更新(Upsert)》的内容,来看看在 Python 中,怎么实现大量数据的 upsert(记录存在就更新,不存在则插入)。

因为不同数据库对这种 upsert 的实现机制不同,Sqlalchemy 也就不再试图做一致性的封装了,而是提供了各自的方言 API,具体到 Mysql,就是给 insert statement ,减少了 on_duplicate_key_update 办法。

根本用法

假如表数据模型如下:

class TableA(db.Model):    __tablename__ = 'table_a'    __table_args__ = (db.UniqueConstraint('a', 'b', name='table_a_a_b_unique'))    id = db.Column(db.Integer, primary_key=True)    a = db.Column(db.Integer)    b = db.Column(db.Integer)    c = db.Column(db.Integer)

其中 id 是自增主键,a, b 组成了惟一索引。那么对应的 upsert 语句如下:
from sqlalchemy.dialects.mysql import insert

insert(TableA).values(a=1, b=2, c=3).on_duplicate_key_update(c=3)

复用数值

跟 SQL 语句相似,咱们能够不必每次都反复填写 insert 和 update 的数值:

update_keys = ['c']insert_stmt = insert(table_cls).values(a=1, b=2, c=3)update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)db.session.execute(upsert_stmt)

留神,最初一句 on_duplicate_key_update 的参数是须要开展的,不承受 dict 作为参数

批量解决

同样,insert 语句是反对传一组数据作为参数的:

records = {[    'a':1,    'b':2,    'c':3],[    'a':10,    'b':20,    'c':4],[    'a':20,    'b':30,    'c':5]}update_keys = ['c']insert_stmt = insert(table_cls).values(records)update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)db.session.execute(upsert_stmt)

就能够实现整体的 upsert。

封装

察看下面的代码,实际上 upsert 的局部是业务无关的,那么就能够封装一个更不便调用的通用函数了:

from sqlalchemy.dialects.mysql import insertdef upsert(table_cls, records, except_cols_on_update=[]):    update_keys = [key for key in records[0].keys() if                   key not in except_cols_on_update]    insert_stmt = insert(table_cls).values(chunk)    update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}    upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)    db.session.execute(upsert_stmt)

分批次生成

以上的封装,还能够做一些改良:为防止records 数据集过大,能够分批执行 sql 语句,并通过参数决定是否要提交:

from sqlalchemy.dialects.mysql import insertdef upsert(table_cls, records, chunk_size=10000, commit_on_chunk=True, except_cols_on_update=[]):    update_keys = [key for key in records[0].keys() if                   key not in except_cols_on_update]    for i in range(0, len(records), chunk_size):        chunk = records[i:i + chunk_size]        insert_stmt = insert(table_cls).values(chunk)        update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}        upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)        db.session.execute(upsert_stmt)        if commit_on_chunk:            db.session.commit()

调用形式如下 :

upsert(TableA, records,                       chunk_size=50000,                       commit_on_chunk=True,                       except_cols_on_update=['id', 'a', 'b'])

这时 records 能够数量很大,比方1千万条,调用后每 5 万条生成一条 sql 语句,并且执行后就commit(如果参数 commit_on_chunk = False,那么函数内就始终不提交,能够完结后自行对立提交),update 语句中,防止更新 'id', 'a', 'b' 这三个字段。

我的语雀原文链接