关于golang:Go实战-基于有向无环图的并发执行流的实现

81次阅读

共计 6648 个字符,预计需要花费 17 分钟才能阅读完成。

大家好,我是 Go 学堂的渔夫子。明天跟大家聊聊基于有向无环图的工作流的实现.

01 工作流(workflow)概述

工作流,是对工作流程中的工作按肯定的规定组织在一起并按其进行执行的一种模型。比方常见的行政零碎中的加班申请、销假申请;工作流要解决的问题就是为了实现某个特定的指标,让多个参与者之间按某种预订的规定主动的传递信息。

本文介绍了一种基于有向无环图实现的工作流,通过有向无环图,能够解决两个问题:从逻辑上,对各个节点的依赖关系进行了组织;从技术上,有依赖关系的节点须要期待执行,无依赖关系的能够并发执行。

该工作流的理论利用是在程序化广告中从接管申请到广告响应之间的流程:接管申请、获取地理位置、获取用户画像、广告召回、广告排序、算法预估、返回响应等各个节点之间的依赖执行。

但本文的指标是介绍其实现思维,所以在示例局部会以穿衣服的流程为例进行解说。

02 工作流的实现

上面咱们以早上起床穿衣所产生的事件为例来解说有向无环图的实现。穿衣流程中蕴含的事件有穿内裤、穿裤子、穿袜子、穿鞋、戴手表、穿衬衣、穿外套等。这些事件中有的必须要先穿某些衣服,能力再穿其余衣服(如先穿袜子后能力穿鞋)。有些事件则能够以任意程序穿上(如袜子和裤子之间能够以任意词序进行穿戴)。如图所示:

由上图能够看到,穿内裤、穿袜子、穿衬衣、戴手表之间没有相互依赖,能够并发执行。而穿鞋子则必须期待所依赖的裤子和袜子穿完后能力执行。上面咱们就来看看如何实现这样的有向无环图的工作流。

2.1 定义工作流构造

依据上图,咱们能够看出一个绝对残缺的工作流蕴含开始节点(从哪里开始)、边(通过哪些节点)、完结节点(到哪里完结)。由此,咱们定义工作流的构造如下:

type WorkFlow struct {done chan struct{} // 完结标识, 该标识由完结节点写入
  doneOnce *sync.Once // 保障并发时只写入一次
  alreadyDone bool // 有节点出错时终止流程标记
    root *Node // 开始节点
    End *Node // 完结节点
    edges []*Edge // 所有通过的边,边连贯了节点}

2.2 定义工作流中边

边用来示意两个节点之间的依赖关系。有向图中的边还能表明两个节点哪个是前置节点,哪个是后置节点。后置节点须要期待前置节点的工作执行实现后能力执行。如下图所示:

内裤和裤子两个节点阐明只有等穿上内裤后,能力穿裤子,那么穿内裤节点就是穿裤子节点的前置节点;而鞋子只有等裤子和袜子都穿上后能力穿鞋子,那么裤子和袜子就是鞋子的前置节点。

所以边的示意是从哪个节点到哪个节点。定义如下:

type Edge struct {
    FromNode *Node
    ToNode *Node
}

2.3 定义工作流中的节点

节点即要具体执行逻辑的工作单元。同时每个节点都有所关联的边。因为咱们应用的是有向图,所以关联的边又分为入边(即终止于该顶点的边)和出边(即从该顶点开始的边)。如下图:

边 1 是内裤节点的出边,同时也是裤子节点的入边。边 2 和边 3 都是鞋子节点的入边。本文将入边称为该节点的依赖边,定义为 Dependency,示意只有这些边连贯的节点工作执行完后,该节点能力开始执行。将该节点的出边称为孩子边,定义 Children,示意该节点执行完后,能够继续执行的子节点。由此,节点的构造定义如下:

type Node struct {Dependency []*Edge // 依赖的边
    DepCompleted int32 // 示意依赖的边有多少个已执行实现,用于判断该节点是否能够执行了
    Task Runnable // 工作执行
    Children []*Edge // 节点的字边}

在节点的定义中,咱们看到有一个 Runnable 类型。该类型是一个接口,只有一个 Run(i interface{})办法,用来对节点执行业务逻辑的形象。以便在节点执行的时候,对立调用各个节点的 Run 办法即可。该接口的定义如下:

type Runnable interface {Run(i interface{})
}

咱们以穿鞋子工作为例来实现该接口,定义如下:

type WearShoesAction struct {}

func (a *WearShoesAction) Run(i interface{}) {fmt.Println("我正在穿鞋子...")
}

有了具体的执行工作,咱们就能够将该工作构建到一个节点上:

func NewNode(Task Runnable) *Node {
    return &Node{Task: Task,}
}

// 构建穿鞋子的节点
shoesNode := NewNode(&WearShoesAction{})

有了节点,咱们须要构建边,因为边是连贯两个节点的,所以咱们再定义一个穿袜子的节点:

type WearSocksAction struct { }

func(a *WearSocksAction) Run(i interface{}) {fmt.Println("我正在穿袜子...")
}

好了,有了两个节点了,咱们定义边的函数,以便将两个节点构建成边:

func AddEdge(from *Node, to *Node) *Edge {
    edg := &Edge{
        FromNode: from,
        ToNode: to,
    }
    
    // 该条边是 from 节点的出边
    from.Children = append(from.Children, edge)
    // 该条边是 to 节点的入边
    to.Dependency = append(to.Dependency, edge)
    
    return edg
}

2.4 工作流中的非凡节点 — 开始节点和完结节点

另外,咱们在工作流中看到有一个开始节点和完结节点。开始节点是工作流的根节点,是整个工作流开始执行的触发点。它的工作就是触发子节点,所有该节点中没有具体的业务逻辑要执行,也就是说该节点中的 Task 是 nil。

完结节点代表整个工作流的工作都执行实现了,通过信号的形式告知调用方流程完结。所以,该节点的工作逻辑就是给工作流的 done 通道写入一个音讯,让调用方接管到该音讯,以解除阻塞:

type EndWorkFlowAction struct {done chan struct{} // 节点执行实现,往该 done 写入音讯,和 workflow 中的 done 共用
  s *sync.Once // 并发管制,确保只往 done 中写入一次
}


// 完结节点的具体执行工作
func (end *EndWorkFlowAction) Run(i interface{}) {end.s.Do(func() {end.Done <- struct{} })
}

2.5 构建工作流

好了,咱们来看下基于以上各个元素的构造体定义,如何构建一个残缺的工作流,并让工作流可能工作。

首先,当然是要实例化一个工作流。

func NewWorkFlow() *WorkFlow {
    wf := &WorkFlow{root: &Node{Task: nil},// 开始节点,所有具体的节点都是它的子节点,没有具体的执行逻辑,只为登程其余节点的执行
        done: make(chan struct{}, 1),
        doneOnce: &sync.Once{},}
    
    // 退出完结节点
    EndNode = &EndWorkFlow{
        done: wf.done,
        s: wf.doneOnce,
    }
    wf.End = NewTaskNode
    
    return wf
}

因为每个工作流实例都必须要有开始节点和完结节点,所以咱们在初始化的时候就指定了开始节点和完结节点。

其次,构建节点之间的边。构建边分三类,一类是跟节点和两头节点之间形成的边,这类的特点是根节点只有出边,没有入边。一类是两头节点和两头节点之间形成的边。最初一类是两头节点和完结节点之间形成的边。这类边的特点是完结节点只有入边,没有出边。

func (wf *WorkFlow) AddStartNode(node *Node) {wf.edges = append(wf.edges, AddEdge(wf.root, node))
}

func (wf *WorkFlow) AddEdge(from *Node, to *Node) {wf.edges = append(wf.edges, AddEdge(from, to))
}

func (wf *WokFlow) ConnectToEnd(node *Node) {wf.edges = append(wf.edges, AddEdge(node, wf.End))
}

通过以上 3 个函数,咱们就能够结构出工作流中各个节点之间的关系图了。有了关系图,咱们须要让
这个关系图流转起来。所以,咱们再来看看工作流中相干的执行行为的定义。

func (wf *WorkFlow) StartWithContext(ctx context.Context, i interface{}) {wf.root.ExecuteWithContext(ctx, wf, i)
}

func(wf *WorkFlow) WaitDone() {
    <-wf.done
    close(wf.done)
}

func(wf *WorkFlow)  interrupDone() {
    wf.alreadyDone = true
    wf.s.Do(func() {wf.done <- struct{} })
}

2.6 节点的具体执行逻辑实现

在工作流执行函数中,咱们看到调用了根节点的 ExecuteWithContext 函数。咱们来看下该函数的具体实现。

func (n *Node) ExecuteWithContext(ctx context.Context, wf *WorkFlow, i interface{}) {
    // 所依赖的前置节点没有运行实现,则间接返回
    if !n.dependencyHasDone() {return}
    // 有节点运行出错,终止流程的执行
    if ctx.Err() != nil {wf.interruptDone()
        return
    }
    
    // 节点具体的运行逻辑
    if n.Task != nil {n.Task.Run(i)
    }
    
    // 运行子节点
    if len(n.Children) > 0 {for idex := 1; idx < len(n.Children); idx++ {go func(child *Edge) {child.Next.Execute(ctx, wf, i)
            }(n.Children[idx])
        }
        
        n.Children[0].Next.Execute(ctx, wf, i)
    }
    
}

大部分逻辑都很简略,咱们重点看下 dependencyHasDone()函数。该函数是用来查看一个节点所依赖的边是否都执行完了。这里是通过计数来实现的。其具体实现如下:

func (n *Node) dependencyHasDone() bool {
    // 该节点没有依赖的前置节点,不须要期待,间接返回 true
    if n.Dependency == nil {return true}
    
    // 如果该节点只有一个依赖的前置节点,也间接返回
    if len(n.Dependency) == 1 {return true}
    
    // 这里将依赖的节点加 1,阐明有一个依赖的节点实现了
    atomic.AddInt32(&n.DepCompleted, 1)
    
    // 判断以后依赖的节点数量是否和依赖的节点相等,相等,阐明都运行完了
    return n.DepCompleted == int32(len(n.Dependency))
}

咱们以下图为例,来阐明 end 节点是如何被查看所依赖的前置节点都执行完了的。

先是 root 节点开始执行本人的 ExecuteWithContext 办法。在该办法中,先通过 dependencyHasDone 函数来判断该节点所依赖的前置节点是否都执行完了,但该节点没有前置节点,即满足 n.Dependency == nil,返回 true,所以 root 节点能够继续执行 ExecuteWithContext 中前面的逻辑。

执行 root 的 Task 工作,因为 root 的 Task 为 nil,所以继续执行到子节点中。发现 root 的 Children 中有两条边,开始循环让子节点同时执行。即执行裤子节点和袜子节点。

咱们晓得,袜子节点和裤子节点所依赖的前置节点只有 root,也就是只有一条入边。当执行到本人的时候,阐明 root 节点曾经执行完了,所以执行到了上面的逻辑:

    // 如果该节点只有一个依赖的前置节点,也间接返回
    if len(n.Dependency) == 1 {return true}

假如当初裤子节点先执行完了,但袜子节点还没执行完。当裤子节点执行完之后,就会找和本人治理的子节点继续执行,也就是鞋子节点,这时,鞋子节点在执行 dependencyHasDone 逻辑时,命中了第三局部的逻辑:

// 这里将依赖的节点加 1,阐明有一个依赖的节点实现了
    atomic.AddInt32(&n.DepCompleted, 1)
    
// 判断以后依赖的节点数量是否和依赖的节点相等,相等,阐明都运行完了
return n.DepCompleted == int32(len(n.Dependency))

发现鞋子依赖的边有 2 条。当初只实现了 1 条,所以是不可执行状态,就间接返回不再执行了。如下图所示:

这时,袜子节点也执行完结了。同样会执行本人的子节点,也就是鞋子节点。这时发现鞋子节点的实现数是 2,发现和本人依赖边相等,这时,鞋子节点编程可执行状态。如下图:

2.7 残缺示例

咱们当初来看下穿衣服流程的残缺示例。

wf := NewWorkFlow()

// 构建节点
UnderpantsNode := NewNode(&WearUnderpantsAction{})
SocksNode := NewNode(&WearSocksAction{})
ShirtNode := NewNode(&ShirtNodeAction{})
WatchNode := NewNode(&WatchNodeAction{})
TrousersNode := NewNode(&WearTrouserNodeAction{})
ShoesNode := NewNode(&WearShoesNodeAction{})
CoatNode := NewNode(&WearCoatNodeAction{})

// 构建节点之间的关系
wf.AddStartNode(UnserpatnsNode)
wf.AddStartNode(SocksNode)
wf.AddStartNode(ShirtNode)
wf.AddStartNode(WatchNode)

wf.AddEdge(UnserpatnsNode, TrousersNode)
wf.AddEdge(TrousersNode, ShoesNode)
wf.AddEdge(SocksNode, ShoesNode)
wf.AddEdge(ShirtNode, CoatNode)
wf.AddEdge(WatchNode, CoatNode)

wf.ConnectToEnd(ShoesNode)
wf.ConnectToEnd(CoatNode)

var completedAction []string

wf.StartWithContext(ctx, completedAction)
wf.WaitDone()

fmt.Println("执行其余逻辑")

下面代码中的各个 Action 的构造体实现 Runnable 接口即可,这里就不再反复了。到此,咱们就构建胜利了在开始时就看到的穿衣流程图。

03 一些其余问题

3.1 在各个节点之间,如何传递数据

咱们看到 WorkFlow 的 StartWithContext 函数中的第二个参数是一个 interface{}类型,这个就是用来给各个节点传递参数的。同时在 Node 的 ExecuteWithContext 函数中的参数也是 interface{}类型。咱们能够将该参数定义为指针类型。这样在各个节点执行过程中就能够扭转指针指向的内容了。

3.2 如果有节点执行谬误,如何终止流程

在一个流程中,有任何一个节点执行出错,咱们的解决形式是终止整个流程。即在下面节点的 ExecuteWithContext 函数中有如下代码的判断:

    if ctx.Err() != nil {wf.interruptDone()
        return
    }

咱们来看下 WorkFlow 的 interruptDone 的实现:

wf.alreadyDone = true
wf.s.Do(func() {wf.done <- struct{}{}})

总结

有向无环图是一种解决节点依赖关系的利器。在解决了依赖之间的问题同时,也解决了互相独立节点的并发问题。

正文完
 0