关于mysql:Mysql到TiDB迁移双写数据库兜底方案

84次阅读

共计 5670 个字符,预计需要花费 15 分钟才能阅读完成。

作者:京东批发 石磊

TiDB 作为开源 NewSQL 数据库的典型代表之一,同样反对 SQL,反对事务 ACID 个性。在通信协定上,TiDB 抉择与 MySQL 齐全兼容,并尽可能兼容 MySQL 的语法。因而,基于 MySQL 数据库开发的零碎,大多数能够平滑迁徙至 TiDB,而简直不必批改代码。对用户来说,迁徙老本极低,过渡天然。

然而,仍有一些 MySQL 的个性和行为,TiDB 目前临时不反对或体现与 MySQL 有差别。除此之外,TiDB 提供了一些扩大语法和性能,为用户提供更多的便当。

TiDB 仍处在疾速倒退的路线上,对 MySQL 性能和行为的反对方面,正按 路线图 的布局在前行。

兼容策略

先从总体上概括 TiDB 和 MySQL 兼容策略,如下表:

通信协定 SQL 语法 性能和行为
齐全兼容 兼容绝大多数 兼容大多数

截至 4.0 版本,TiDB 与 MySQL 的区别总结如下表:

 MySQL TiDB
隔离级别 反对读未提交、读已提交、可反复读、串行化,默认为可反复读 乐观事务反对快照隔离,乐观事务反对快照隔离和读已提交
锁机制 乐观锁 乐观锁、乐观锁
存储过程 反对 不反对
触发器 反对 不反对
事件 反对 不反对
自定义函数 反对 不反对
窗口函数 反对 局部反对
JSON 反对 不反对局部 MySQL 8.0 新增的函数
外键束缚 反对 疏忽外键束缚
字符集  只反对 ascii、latin1、binary、utf8、utf8mb4
减少 / 删除主键 反对 通过 alter-primary-key 配置开关提供
CREATE TABLE tblName AS SELECT stmt 反对 不反对
CREATE TEMPORARY TABLE 反对 TiDB 疏忽 TEMPORARY 关键字,依照一般表创立
DML affected rows 反对 不反对
AutoRandom 列属性 不反对 反对
Sequence 序列生成器 不反对 反对

三种计划比拟

双写计划:同时往 mysql 和 tidb 写入数据,两个数据库数据齐全放弃同步

•长处:此计划最平安,作为兜底计划不需放心数据库回滚问题,因为数据完全一致,能够无缝回滚到 mysql

•毛病:新计划,调研计划实现,老本较高

读写拆散:数据写入 mysql,从 tidb 读,具体计划是切换到线上当前,放弃读写拆散一周工夫左右,这一周工夫用来确定 tidb 数据库没有问题,再把写操作也切换到 tidb

•长处: 切换过程,mysql 和 tidb 数据放弃同步,满足数据回滚到 mysql 计划

•毛病:mysql 和 tidb 数据库同步存在延时,对局部写入数据要求实时查问的会导致查问失败,同时一旦整体切换到 tidb,无奈回切到 mysql

间接切换:间接一步切换到 tidb

•长处:切换过程最简略,老本最低

•毛病:此计划没有兜底计划,切换到 tidb,无奈再回切到 mysql 或者同步数据回 mysql 危险较大,无奈保证数据是否可用

Django 双写 mysql 与 tidb 策略

settings.py 中新增配置 

# Dev Database settings
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'name',
        'USER': 'root',
        'PASSWORD': '123456',
        'HOST': 'db',
    },
    'replica': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'name',
        'USER': 'root',
        'PASSWORD': '123456',
        'HOST': 'db',
    },
    'bak': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'name',
        'USER': 'root',
        'PASSWORD': '123456',
        'HOST': 'db',
    },
}
# 多重写入数据库配置
MULTI_WRITE_DB = "bak"

双写中间件 basemodel.py

import copy
import logging
import traceback
from django.db import models, transaction, router
from django.db.models.deletion import Collector
from django.db.models import sql
from django.db.models.sql.constants import CURSOR
from jcdp.settings import MULTI_WRITE_DB, DATABASES

multi_write_db = MULTI_WRITE_DB


# 重写 QuerySet
class BaseQuerySet(models.QuerySet):

    def create(self, **kwargs):
        return super().create(**kwargs)

    def update(self, **kwargs):
        try:
            rows = super().update(**kwargs)
            if multi_write_db in DATABASES:
                self._for_write = True
                query = self.query.chain(sql.UpdateQuery)
                query.add_update_values(kwargs)
                with transaction.mark_for_rollback_on_error(using=multi_write_db):
                    query.get_compiler(multi_write_db).execute_sql(CURSOR)
        except Exception:
            logging.error(traceback.format_exc())
            raise
        return rows

    def delete(self):
        try:
            deleted, _rows_count = super().delete()
            if multi_write_db in DATABASES:
                del_query = self._chain()
                del_query._for_write = True
                del_query.query.select_for_update = False
                del_query.query.select_related = False
                collector = Collector(using=multi_write_db)
                collector.collect(del_query)
                collector.delete()
        except Exception:
            logging.error(traceback.format_exc())
            raise
        return deleted, _rows_count

    def raw(self, raw_query, params=None, translations=None, using=None):
        try:
            qs = super().raw(raw_query, params=params, translations=translations, using=using)
            if multi_write_db in DATABASES:
                super().raw(raw_query, params=params, translations=translations, using=multi_write_db)
        except Exception:
            logging.error(traceback.format_exc())
            raise
        return qs

    def bulk_create(self, objs, batch_size=None, ignore_conflicts=False):
        try:
            for obj in objs:
                obj.save()
        except Exception:
            logging.error(traceback.format_exc())
            raise
        # objs = super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts)
        # if multi_write_db in DATABASES:
        #     self._db = multi_write_db
        #     super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts)
        return objs

    def bulk_update(self, objs, fields, batch_size=None):
        try:
            super().bulk_update(objs, fields, batch_size=batch_size)
            if multi_write_db in DATABASES:
                self._db = multi_write_db
                super().bulk_update(objs, fields, batch_size=batch_size)
        except Exception:
            logging.error(traceback.format_exc())
            raise


class BaseManager(models.Manager):
    _queryset_class = BaseQuerySet


class BaseModel(models.Model):
    objects = BaseManager()

    class Meta:
        abstract = True

    def delete(self, using=None, *args, **kwargs):
        try:
            instance = copy.deepcopy(self)
            super().delete(using=using, *args, **kwargs)
            if multi_write_db in DATABASES:
                super(BaseModel, instance).delete(using=multi_write_db, *args, **kwargs)
        except Exception:
            logging.error(traceback.format_exc())
            raise

    def save_base(self, raw=False, force_insert=False,
                  force_update=False, using=None, update_fields=None):
        try:
            using = using or router.db_for_write(self.__class__, instance=self)
            assert not (force_insert and (force_update or update_fields))
            assert update_fields is None or update_fields
            cls = self.__class__
            # Skip proxies, but keep the origin as the proxy model.
            if cls._meta.proxy:
                cls = cls._meta.concrete_model
            meta = cls._meta
            # A transaction isn't needed if one query is issued.
            if meta.parents:
                context_manager = transaction.atomic(using=using, savepoint=False)
            else:
                context_manager = transaction.mark_for_rollback_on_error(using=using)
            with context_manager:
                parent_inserted = False
                if not raw:
                    parent_inserted = self._save_parents(cls, using, update_fields)
                self._save_table(
                    raw, cls, force_insert or parent_inserted,
                    force_update, using, update_fields,
                )
            if multi_write_db in DATABASES:
                super().save_base(raw=raw,
                                  force_insert=raw,
                                  force_update=force_update,
                                  using=multi_write_db,
                                  update_fields=update_fields)
            # Store the database on which the object was saved
            self._state.db = using
            # Once saved, this is no longer a to-be-added instance.
            self._state.adding = False
        except Exception:
            logging.error(traceback.format_exc())
            raise

上述配置实现当前,在每个利用的 models.py 中援用新的 BaseModel 类作为模型基类即可实现双写目标

class DirectoryStructure(BaseModel):
    """目录构造"""
    view = models.CharField(max_length=128, db_index=True)  # 视图名称 eg:部门视图 我的项目视图
    sub_view = models.CharField(max_length=128, unique=True, db_index=True)  # 子视图名称
    sub_view_num = models.IntegerField()  # 子视图顺序号 

注:目前该办法尚不反对多对多模型的双写情景,如有业务需要,还需重写 ManyToManyField 类,办法参考猴子补丁形式

迁徙数据库过程踩坑记录

TIDB 配置项差别:确认数据库配置:ONLY_FULL_GROUP_BY 禁用(mysql 默认禁用)

TIDB 不反对事务 savepoint,代码中须要显式敞开 savepoint=False

TIDB 因为是分布式数据库,对于自增主键字段的自增策略与 mysq 有差别,若业务代码会与主键 id 关联,须要留神。

正文完
 0