关于大数据:火山引擎ByteHouseClickHouse如何保证海量数据一致性

107次阅读

共计 8399 个字符,预计需要花费 21 分钟才能阅读完成。

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

背景

ClickHouse 是一个开源的 OLAP 引擎,不仅被寰球开发者宽泛应用,在字节各个利用场景中也能够看到它的身影。基于高性能、分布式特点,ClickHouse 能够满足大规模数据的剖析和查问需要,因而字节研发团队以开源 ClickHouse 为根底,推出火山引擎云原生数据仓库 ByteHouse。

在日常工作中,研发人员常常会遇到业务链路过长,导致流程稳定性和数据一致性难保障的问题,这在分布式、跨服务的场景中更为显著。本篇文章提出针对这一问题的解决思路:在火山引擎 ByteHouse 中构建轻量级流程引擎,来解决数据一致性问题。

应用轻量级流程引擎能够帮咱们应用对立的规范来解决简单业务链路的编排问题,不仅进步业务代码的可读性和复用性,还能更专一业务外围逻辑的开发,让整体流程更加标准化、规范化。

总结来说,应用流程引擎有以下劣势:

  • 轻量级,接入不便,内存操作,性能有保障
  • 易保护,流程配置与业务拆散,反对热更新
  • 易扩大,丰盛的执行策略及算子反对

大体思路

上图为 ByteHouse 企业版治理平台性能架构图。从该性能架构图能够看出,ByteHouse 外围能力都是依赖 ClickHouse 集群,对于集群节点多、数据计算量大的业务场景,容易呈现节点状态不统一的问题,因而保障 ClickHouse 集群间的状态一致性是咱们的外围诉求。

为了保证数据一致性,ByteHouse 提供了以下能力:

  1. event engine: 事件处理核心
  2. workflow engine:轻量级流程引擎
  3. 对账零碎

保障数据一致性最简略的形式是通过状态机来监听流程执行过程:

  • 首先,将所有的工作申请下发到 event engine,由 event engine 将工作散发对应的 handler 执行,对立治理所有下发工作的生命周期,并提供异步重试、回滚弥补等性能。流量汇总到 event engine 当前,会让服务后续的业务扩大更加便捷。
  • 其次,对于比较复杂的工作申请,咱们能够下发到 workflow engine 执行,由 workflow 生成实例,并编排工作队列,治理流程执行实例的生命周期,对立失败回滚,失败重试。
  • 最初,对于服务不可用等非凡场景产生的脏数据,由对账服务兜底。

架构设计

在流程监控的架构设计中,次要蕴含以下:

  • 流程管理层:次要负责流程配置的解析初始化,并实现编排策略的工作
  • 策略 behavior 层:编排执行节点,并下发执行工作到执行器
  • 执行器:治理执行节点执行
  • 执行节点:负责业务具体实现

实现计划

执行节点

流程引擎的外围为“责任链”,依照责任链上的节点程序顺次执行所有工作,所以咱们须要的三个根本单元别离为:

  • request:入参
  • processlist:流程执行节点 list
  • response:出参

在研发工作中,咱们时常会遇到以下问题:

  • 如果同时呈现了一个问题,node1、node2、node3 之间的数据交互如何实现?
  • 如果 node1 入参、出参加 node2,node3 不一样该如何解决?
  • 参数类型不同的 node 又该如何对立调度?

最简略的解决方法,是让 node 应用雷同的上下文信息,将整个执行 node 模版化。咱们让所有的执行节点 node 实现雷同的接口 Delegation,对立应用雷同的上下文 executionContext 作为执行办法的入参。

对于流程中的 request 和 response,咱们能够放入 executionContext 中,让每个执行节点都能够通过上下文操作 response。

// Delegation -
type Delegation interface {Execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
   TryExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
   ConfirmExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
   CancelExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError

   Code() string
   Type() value.DelegationType}

执行策略

如果确定好了最小的执行节点,咱们须要思考到,业务场景并不会永远程序执行 node,再返回后果,流程执行过程中跳转、循环、并发执行都是比拟常见的操作。思考不同业务场景复用性,咱们在执行节点之上加了一层执行策略,用策略 behaivor 来从新编排触发执行节点的工作。

  • 下图将流程分成了 behavior1 和 behavior2,别离对应不同的策略。
  • 简略的策略举例:按程序执行、并发执行、循环执行、条件跳转执行等。
  • 咱们能够依据本身业务理论须要定制,后续会有实例介绍。
// ActivityBehavior -
type ActivityBehavior interface {Enter(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError
   Execute(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError
   Leave(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError
   Code() value.ActivityBehaviorCode}

策略 behavior 提供有 Enter,Execute,Leave 三个接口,Enter 负责生成执行节点工作 instance,Execute 负责编排并触发执行工作 instance 操作,Leave 负责跳转到下一个 behavior。

能够看进去策略 behaivor 的跳转形式相似于链表,一直执行 next 办法,所以编码过程中须要留神不要呈现死循环,小心 stackoverflow。

Executor

执行器 Executor 的次要作用是串联执行策略和执行节点,策略 behavior 将执行的命令下发给 Executor,由 Executor 对执行节点的触发操作。这里会依据执行节点的 type,映射到三种执行节点的执行形式,蕴含 tcc,执行一次,重试屡次。

// DelegationExecutor -
type DelegationExecutor interface {execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
   postExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
}

func (de *DefaultDelegationExecutor) execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError {delegationCode := executionContext.GetExecutionInstance().GetDelegationCode()
   if len(delegationCode) == 0 || de.DelegationMap[delegationCode] == nil {logger.Info(ctx, "DefaultDelegationExecutor delegation code not found,use default delegation", zap.String("delegationCode", delegationCode))

      delegationCode = string(value.DefaultDelegation)
      executionContext.GetExecutionInstance().SetDelegationCode(delegationCode)
   }

   return de.dumpExecute(ctx, executionContext, delegationCode)
}

func (de *DefaultDelegationExecutor) dumpExecute(ctx context.Context, executionContext ExecutionContextInterface, delegationCode string) apperror.AppError {FireEvent(ctx, executionContext, value.ExecutionStart)

   var err apperror.AppError
   delegation := de.DelegationMap[delegationCode]
   switch delegation.Type() {
   case value.TccDelegation:
      err = tccExecute(ctx, executionContext, delegation)
   case value.SingleDelegation:
      err = singleExecute(ctx, executionContext, delegation)
   case value.RetryDelegation:
      err = retryExecute(ctx, executionContext, delegation)
   }

   if err != nil {logger.Error(ctx, "delegation.Execute_err", zap.Error(err))

      return apperror.Trace(err)
   }

   FireEvent(ctx, executionContext, value.ExecutionEnd)

   return nil
}

ExecutionContext

ExecutionContext 上下文是用来记录了流程执行的所有细节,蕴含以下:

  • ProcessEngineConfigurationInterface: 流程定义信息
  • ExecutionInstanceInterface: 执行节点实例
  • ActivityInstanceInterface: 执行策略实例
  • ProcessInstanceInterface: 流程实例
  • request:入参
  • response:返回值

为了保障整个流程执行的稳定性,这里除了 response 之外,所以其余的实例参数都不倡议凋谢写接口,response 能够用来存储流程实例执行过程中会产生的变量信息。

对于整个流程的定义 ProcessEngineConfiguration,咱们能够抉择最简略的形式,即在数据库里,将配置信息映射成 json 字符串。当然也能够抉择读取配置文件,只有能满足读取不便,数据不丢即可。

// ExecutionContextInterface -
type ExecutionContextInterface interface {GetProcessEngineConfiguration() ProcessEngineConfigurationInterface
   SetProcessEngineConfiguration(processEngineConfiguration ProcessEngineConfigurationInterface)
   GetExecutionInstance() instance.ExecutionInstanceInterface
   SetExecutionInstance(executionInstance instance.ExecutionInstanceInterface)
   GetActivityInstance() instance.ActivityInstanceInterface
   SetActivityInstance(activityInstance instance.ActivityInstanceInterface)
   GetProcessInstance() instance.ProcessInstanceInterface
   SetProcessInstance(processInstance instance.ProcessInstanceInterface)
   SetNeedPause(needPause bool)
   IsNeedPause() bool

   SetActivityIndex(activityIndex int)
   GetActivityIndex() int
   SetActivityBehaviorCode(activityBehaviorCode value.ActivityBehaviorCode)
   GetActivityBehaviorCode() value.ActivityBehaviorCode
   SetBizUniqueKey(bizUniqueKey string)
   GetBizUniqueKey() string

   GetRequest() map[string]interface{}
   SetRequest(request map[string]interface{})
   GetResponse() map[string]string
   SetResponse(response map[string]string)
   AtomicAddResponse(key string, value string)
}

Listener

监听器的次要作用是用来监听流程执行中的重要参数信息。从上述 executor 接口能够看到 fireEvent,它的作用是发送音讯 event,让 listener 监听到对应的 event 类型,实现一些定制化的行为。

相似于面向切面编程,咱们能够在执行节点的前后减少定制化的逻辑,如打日志、监听节点执行工夫,长久化流程中产生的 response 信息、减少链路追踪等。

API

最初,咱们将上述的内容拼接串联起来,次要提供三个接口:

  • Start: 启动流程
  • Signal: 暂停或是异样退出后,继续执行流程
  • Abort: 强制中断流程

<!—->

process start(){
    //1.get and create ProcessEngineConfigurationInterface 解析流程定义
    
    //2.create processInstance 创立流程实例
    
    //3.create ExecutionContext 创立执行上下文
    
    //4. lockstrategy trylock 
    
    //5. invoke process start 
    processinstance.start()
    //6. persist processInstance and return

    //7. lockstrategy unlock 
}

processinstance start(){
    // get behavior
    
    // behavior enter
    behavior.Enter(ctx, executionContext)
    //behavior execute
    behavior.Execute(ctx, executionContext)
    //behavior leave
    behavior.Leave(ctx, executionContext)
}

相比于 start,signal 须要读取执行的细节信息,找到之前失败的执行节点地位,并加载到上下文中,再继续执行。

对于失败节点信息的长久化有两种形式:第一,能够抉择在流程执行完结长久化;第二,能够通过 listener 在每个执行节点完结长久化。具体依据理论业务场景对于性能、数据一致性的要求做出抉择。

并发场景思考

  1. behavior 策略中必定会呈现定制、并发、解决多个执行节点到场景的问题,如果同时批改必定会造成数据错乱。简略的办法举荐应用带锁的容器存储,能够被批改的信息 (response),此处应用的是 github.com/bytedance/gopkg 包外面封装的 skipmap。
  2. lockstrategy 能够本人定义最适配业务场景的,最简略的计划是 redis 锁,同时也思考到零碎异样退出后的复原问题。能够参考 redis 官网解决非凡状况下的锁异样解决方案:https://redis.io/commands/setnx/

后续的工作

轻量级流程引擎的基本功能到此曾经实现,后续的扩大优化能够围绕以下方向进行:

  1. 界面化展现,能够将链路执行状况展现进去
  2. 策略 behavior 维度扩大,适配各种业务场景
  3. 减少子流程的维度,能够复用原先的执行逻辑

Demo 示例

以下为简略的 processconfiguration 的配置信息,此处应用 DefaultBehavior,即同步程序执行策略。

{
    "ProcessContentList":[
        {
            "Behavior":"DefaultBehavior",
            "DelegationList":[
                {"Code":"sample1"},{"Code":"sample2"},
                {"Code":"sample3"}
            ]
        },
        {
            "Behavior":"DefaultBehavior",
            "DelegationList":[
                {"Code":"sample4"},
                {"Code":"sample5"}
            ]
        }
    ]
}

在 listener 外面退出日志,这样能够追溯出整个流程的执行流程,以便更好的监控整个流程的运行状态。

理论应用

以 ClickHouse 集群缩容为例:

{
    "ProcessContentList":[
        // 查问所有须要重散布的 table
        {
            "Behavior":"DefaultBehavior",// 程序执行
            "DelegationList":[
                {"Code":"hor_reshard_table_loop"}
            ]
        },
        // 遍历所有 table 进行数据的重散布 
        {
            "LoopKey":"reshard_table_loop_key",
            "Behavior":"NonBlockLoopBehavior",// 非阻塞循环解决
            "DelegationList":[
                {"Code":"hor_reshard_table"}
            ]
        },
        // 进行删除节点操作
        {
            "Behavior":"DefaultBehavior",
            "DelegationList":[
                {"Code":"hor_start_remove_node"},
                {
                    "Code":"hor_prepare_node_vcloud",
                    "PostCode":"hor_rollback_remove_node_vcloud"// 对立失败回滚解决
                },
                {
                    "Code":"hor_update_config_vcloud",
                    "PostCode":"hor_rollback_remove_node_vcloud"
                },
                {
                    "Code":"hor_set_cluster_running",
                    "PostCode":"hor_rollback_remove_node_vcloud"
                },
                {"Code":"hor_release_node"},
                {"Code":"hor_callback_bill"}
            ]
        }
    ]
}

总结

一个流程引擎适配所有的业务场景简直是不可能,除非承受简单的方案设计,而第三方流程引擎对于日常的业务开发显得太轻便。轻量级流程引擎则会简化接入形式,缩小了过多 http 申请带来的性能损耗,更加灵便多变,追述问题也变得简略。

在 ByteHouse 中退出流程引擎的能力,能以较小的代价给业务更多重试的可能性,而不须要重复回滚,特地对于耗时很长的工作,能带来更好用户应用体验。除此之外,流程引擎还能将业务流程模版化,减少接口服务的复用性,使得业务代码的可读性、扩展性失去晋升,不便前期保护。

火山引擎云原生数据仓库 ByteHouse 是火山引擎旗下的一款云原生数据仓库,为用户提供极速剖析体验,可能撑持实时数据分析和海量数据离线剖析,同时还具备便捷的弹性扩缩容能力,极致剖析性能和丰盛的企业级个性,助力客户数字化转型。

点击跳转火山引擎 ByteHouse 理解更多

正文完
 0