乐趣区

关于golang:gomachinery入门教程异步任务队列

前言

哈喽,大家好,我是 asong,这次给大家介绍一个 go 的异步工作框架 machinery。应用过 python 的同学们都晓得 Celery 框架,machinery框架就相似于 Celery 框架。上面咱们就来学习一下 machinery 的根本应用。

本人翻译一个粗略版的 machinery 中文文档,有须要的搭档们公众号自取无水印版:后盾回复:machinery 即可支付。

或者 github 下载:https://github.com/asong2020/…

抛砖引玉

咱们在应用某些 APP 时,登陆零碎后个别会收到一封邮件或者一个短信提醒咱们在某个工夫某某地点登陆了。而邮件或短信都是在咱们曾经登陆后才收到,这里就是采纳的异步机制。大家有没有想过这里为什么没有应用同步机制实现呢?咱们来剖析一下。假如咱们当初采纳同步的形式实现,用户在登录时,首先会去测验一下账号密码是否正确,验证通过后去给用户发送登陆提示信息,如果在这一步出错了,那么就会导致用户登陆失败,这样是大大影响用户的体验感的,一个登陆提醒的优先级别并不是很高,所以咱们齐全能够采纳异步的机制实现,即便失败了也不会影响用户的体验。后面说了这么多,那么异步机制该怎么实现呢?对,没错,就是 machinery 框架,据说你们还不会应用它,明天我就写一个小例子,咱们一起来学习一下他吧。

个性

下面只是简略举了个例子,工作队列有着宽泛的利用场景,比方大批量的计算工作,当有大量数据插入,通过拆分并分批插入工作队列,从而实现串行链式工作解决或者实现分组并行任务解决,进步零碎鲁棒性,进步零碎并发度;或者对数据进行预处理,定期的从后端存储将数据同步到到缓存零碎,从而在查问申请产生时,间接去缓存零碎中查问,进步查问申请的响应速度。实用工作队列的场景有很多,这里就不一一列举了。回归本文主题,既然咱们要学习machinery,就要先理解一下他都有哪些个性呢?

  • 工作重试机制
  • 提早工作反对
  • 工作回调机制
  • 工作后果记录
  • 反对 Workflow 模式:Chain,Group,Chord
  • 多 Brokers 反对:Redis, AMQP, AWS SQS
  • 多 Backends 反对:Redis, Memcache, AMQP, MongoDB

架构

工作队列,简而言之就是一个放大的生产者消费者模型,用户申请会生成工作,工作生产者一直的向队列中插入工作,同时,队列的处理器程序充当消费者一直的生产工作。基于这种框架设计思维,咱们来看下 machinery 的简略设计结构图例:

  • Sender:业务推送模块,生成具体任务,可依据业务逻辑中,按交互进行拆分;
  • Broker:存储具体序列化后的工作,machinery 中目前反对到 Redis, AMQP, 和 SQS;
  • Worker:工作过程,负责消费者性能,解决具体的工作;
  • Backend:后端存储,用于存储工作执行状态的数据;

e.g

学习一门新货色,我都习惯先写一个 demo,先学会了走,再学会跑。所以先来看一个例子,性能很简略,异步计算 1 到 10 的和。

先看一下配置文件代码:

broker: redis://localhost:6379

default_queue: "asong"

result_backend: redis://localhost:6379

redis:
  max_idle: 3
  max_active: 3
  max_idle_timeout: 240
  wait: true
  read_timeout: 15
  write_timeout: 15
  connect_timeout: 15
  normal_tasks_poll_period: 1000
  delayed_tasks_poll_period: 500
  delayed_tasks_key: "asong"

这里 brokerresult_backend来实现。

主代码,完整版 github 获取:


func main()  {cnf,err := config.NewFromYaml("./config.yml",false)
    if err != nil{log.Println("config failed",err)
        return
    }

    server,err := machinery.NewServer(cnf)
    if err != nil{log.Println("start server failed",err)
        return
    }

    // 注册工作
    err = server.RegisterTask("sum",Sum)
    if err != nil{log.Println("reg task failed",err)
        return
    }

    worker := server.NewWorker("asong", 1)
    go func() {err = worker.Launch()
        if err != nil {log.Println("start worker error",err)
            return
        }
    }()

    //task signature
    signature := &tasks.Signature{
        Name: "sum",
        Args: []tasks.Arg{
            {Type:  "[]int64",
                Value: []int64{1,2,3,4,5,6,7,8,9,10},
            },
        },
    }

    asyncResult, err := server.SendTask(signature)
    if err != nil {log.Fatal(err)
    }
    res, err := asyncResult.Get(1)
    if err != nil {log.Fatal(err)
    }
    log.Printf("get res is %v\n", tasks.HumanReadableResults(res))

}

运行后果:

INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55

好啦,当初咱们开始讲一讲下面的代码流程,

  • 读取配置文件,这一步是为了配置 brokerresult_backend,这里我都抉择的是redis,因为电脑正好有这个环境,就间接用了。
  • Machinery 库必须在应用前实例化。实现办法是创立一个 Server 实例。ServerMachinery 配置和注册工作的根本对象。
  • 在你的 workders 能生产一个工作前,你须要将它注册到服务器。这是通过给任务分配一个惟一的名称来实现的。
  • 为了生产工作,你需有有一个或多个 worker 正在运行。运行 worker 所须要的只是一个具备已注册工作的 Server 实例。每个 worker 将只应用已注册的工作。对于队列中的每个工作,Worker.Process()办法将在一个 goroutine 中运行。能够应用 server.NewWorker 的第二参数来限度并发运行的 worker.Process()调用的数量(每个 worker)。
  • 能够通过将 Signature 实例传递给 Server 实例来调用工作。
  • 调用 HumanReadableResults 这个办法能够解决反射值,获取到最终的后果。

多功能

1. 延时工作

下面的代码只是一个简略 machinery 应用示例,其实 machiney 也反对延时工作的,能够通过在工作 signature 上设置 ETA 工夫戳字段来提早工作。

eta := time.Now().UTC().Add(time.Second * 20)
    signature.ETA = &eta

2. 重试工作

在将工作申明为失败之前,能够设置多次重试尝试。斐波那契序列将用于在一段时间内分隔重试申请。这里能够应用两种办法,第一种间接对 tsak signature 中的 retryTimeoutRetryCount字段进行设置,就能够,重试工夫将依照斐波那契数列进行叠加。

//task signature
    signature := &tasks.Signature{
        Name: "sum",
        Args: []tasks.Arg{
            {Type:  "[]int64",
                Value: []int64{1,2,3,4,5,6,7,8,9,10},
            },
        },
        RetryTimeout: 100,
        RetryCount: 3,
    }

或者,你能够应用return.tasks.ErrRetryTaskLater 返回工作并指定重试的持续时间。

func Sum(args []int64) (int64, error) {sum := int64(0)
    for _, arg := range args {sum += arg}

    return sum, tasks.NewErrRetryTaskLater("我说他错了", 4 * time.Second)

}

3. 工作流

下面咱们讲的都是运行一个异步工作,然而咱们往往做我的项目时,一个需要是须要多个异步工作以编排好的形式执行的,所以咱们就能够应用 machinery 的工作流来实现。

3.1 Groups

Group 是一组工作,它们将互相独立地并行执行。还是画个图吧,这样看起来更明了:

一起来看一个简略的例子:

    // group
    group,err :=tasks.NewGroup(signature1,signature2,signature3)
    if err != nil{log.Println("add group failed",err)
    }

    asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
    if err != nil {log.Println(err)
    }
    for _, asyncResult := range asyncResults{results,err := asyncResult.Get(1)
        if err != nil{log.Println(err)
            continue
        }
        log.Printf(
            "%v  %v  %v\n",
            asyncResult.Signature.Args[0].Value,
            tasks.HumanReadableResults(results),
        )
    }

group中的工作是并行执行的。

3.2 chrods

咱们在做我的项目时,往往会有一些回调场景,machiney也为咱们思考到了这一点,Chord容许你定一个回调工作在 groups 中的所有工作执行完结后被执行。

来看一段代码:

callback := &tasks.Signature{Name: "call",}



    group, err := tasks.NewGroup(signature1, signature2, signature3)
    if err != nil {log.Printf("Error creating group: %s", err.Error())
        return
    }

    chord, err := tasks.NewChord(group, callback)
    if err != nil {log.Printf("Error creating chord: %s", err)
        return
    }

    chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
    if err != nil {log.Printf("Could not send chord: %s", err.Error())
        return
    }

    results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
    if err != nil {log.Printf("Getting chord result failed with error: %s", err.Error())
        return
    }
    log.Printf("%v\n", tasks.HumanReadableResults(results))

下面的例子并行执行 task1、task2、task3,聚合它们的后果并将它们传递给 callback 工作。

3.3 chains

chain就是一个接一个执行的工作集,每个胜利的工作都会触发 chain 中的下一个工作。

看这样一段代码:

//chain
    chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
    if err != nil {log.Printf("Error creating group: %s", err.Error())
        return
    }
    chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
    if err != nil {log.Printf("Could not send chain: %s", err.Error())
        return
    }

    results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
    if err != nil {log.Printf("Getting chain result failed with error: %s", err.Error())
    }
    log.Printf("%v\n", tasks.HumanReadableResults(results))

下面的例子执行 task1,而后是 task2,而后是 task3。当一个工作胜利实现时,后果被附加到 chain 中下一个工作的参数列表的开端,最终执行 callback 工作。

文中代码地址:https://github.com/asong2020/…

总结

这一篇文章到这里就完结了,machinery还有很多用法,比方定时工作、定时工作组等等,就不在这一篇文章介绍了。更多应用办法解锁能够观看 machinery 文档。因为 machiney 没有中文文档,所以我在学习的过程本人翻译了一篇中文文档,须要的小伙伴们自取。

获取步骤:关注公众号【Golang 梦工厂】,后盾回复:machiney 即可获取无水印版~~~

好啦,这一篇文章到这就完结了,咱们下期见~~。心愿对你们有用,又不对的中央欢送指出,可增加我的 golang 交换群,咱们一起学习交换。

结尾给大家发一个小福利吧,最近我在看 [微服务架构设计模式] 这一本书,讲的很好,本人也收集了一本 PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang 梦工厂],后盾回复:[微服务],即可获取。

我翻译了一份 GIN 中文文档,会定期进行保护,有须要的小伙伴后盾回复 [gin] 即可下载。

我是 asong,一名普普通通的程序猿,让 gi 我一起缓缓变强吧。我本人建了一个 golang 交换群,有须要的小伙伴加我vx, 我拉你入群。欢送各位的关注,咱们下期见~~~

举荐往期文章:

  • 手把手教姐姐写音讯队列
  • 常见面试题之缓存雪崩、缓存穿透、缓存击穿
  • 详解 Context 包,看这一篇就够了!!!
  • go-ElasticSearch 入门看这一篇就够了(一)
  • 面试官:go 中 for-range 应用过吗?这几个问题你能解释一下起因吗
  • 学会 wire 依赖注入、cron 定时工作其实就这么简略!
  • 据说你还不会 jwt 和 swagger- 饭我都不吃了带着实际我的项目我就来了
  • 把握这些 Go 语言个性,你的程度将进步 N 个品位(二)
  • go 实现多人聊天室,在这里你想聊什么都能够的啦!!!
  • grpc 实际 - 学会 grpc 就是这么简略
  • go 规范库 rpc 实际
  • 2020 最新 Gin 框架中文文档 asong 又捡起来了英语,用心翻译
  • 基于 gin 的几种热加载形式
退出移动版