关于golang:Go-每日一库之-rxgo

46次阅读

共计 13370 个字符,预计需要花费 34 分钟才能阅读完成。

简介

ReactiveX,简称为 Rx,是一个异步编程的 API。与 callback(回调)、promise(JS 提供这种形式)和 deferred(Python 的 twisted 网络编程库就是应用这种形式)这些异步编程形式有所不同,Rx 是基于事件流的。这里的事件能够是零碎中产生或变动的任何货色,在代码中咱们个别用对象示意。在 Rx 中,事件流被称为 Observable(可察看的)。事件流须要被 Observer(观察者)解决才有意义。设想一下,咱们日常作为一个 Observer,一个重要的工作就是察看 BUG 的事件流。每次发现一个 BUG,咱们都须要去解决它。

Rx 仅仅只是一个 API 标准的定义。Rx 有多种编程语言实现,RxJava/RxJS/Rx.NET/RxClojure/RxSwift。RxGo 是 Rx 的 Go 语言实现。借助于 Go 语言简洁的语法和弱小的并发反对(goroutine、channel),Rx 与 Go 语言的联合十分完满。

pipelines (官网博客:https://blog.golang.org/pipelines)是 Go 根底的并发编程模型。其中蕴含,fan-in——多个 goroutine 产生数据,一个 goroutine 解决数据,fan-out——一个 goroutine 产生数据,多个 goroutine 解决数据,fan-inout——多个 goroutine 产生数据,多个 goroutine 解决数据。它们都是通过 channel 连贯。RxGo 的实现就是基于 pipelines 的理念,并且提供了不便易用的包装和弱小的扩大。

疾速应用

本文代码应用 Go Modules。

创立目录并初始化:

$ mkdir rxgo && cd rxgo
$ go mod init github.com/darjun/go-daily-lib/rxgo

装置 rxgo 库:

$ go get -u github.com/reactivex/rxgo/v2

编码:

package main

import (
  "fmt"

  "github.com/reactivex/rxgo/v2"
)

func main() {observable := rxgo.Just(1, 2, 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {fmt.Println(item.V)
  }
}

应用 RxGo 的个别流程如下:

  • 应用相干的 Operator 创立 ObservableOperator 就是用来创立 Observable 的。这些术语都比拟难贴切地翻译,而且英文也很好懂,就不强行翻译了;
  • 两头各个阶段能够应用过滤操作筛选出咱们想要的数据,应用转换操作对数据进行转换;
  • 调用 ObservableObserve() 办法,该办法返回一个 <- chan rxgo.Item。而后for range 遍历即可。

GitHub 上一张图很形象地描述了这个过程:

  • 首先应用 Just 创立一个仅有若干固定数据的 Observable
  • 应用 Map() 办法执行转换(将圆形转为方形);
  • 应用 Filter() 办法执行过滤(过滤掉黄色的方形)。

看懂了这张图片,就能理解 RxGo 工作的根本流程了。

下面是简略的示例,没有过滤、转换操作的应用。

运行:

$ go run main.go 
1
2
3
4
5

对于下面的示例,须要留神:

Just应用柯里化(currying)让它能够在第一个参数中承受多个数据,在第二个参数中承受多个选项定制行为。柯里化是函数化编程的思维,简略来说就是通过在函数中返回函数,以此来缩小每个函数的参数个数。例如:

func add(value int) func (int) int {return func (a int) int {return value + a}
}

fmt.Prinlnt(add(5)(10)) // 15

因为 Go 不反对多个可变参数,Just通过柯里化曲折地实现了这个性能:

// rxgo/factory.go
func Just(items ...interface{}) func(opts ...Option) Observable {return func(opts ...Option) Observable {
    return &ObservableImpl{iterable: newJustIterable(items...)(opts...),
    }
  }
}

实际上 rxgo.Item 还能够蕴含谬误。所以在应用时,咱们应该做一层判断:

func main() {observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {if item.Error() {fmt.Println("error:", item.E)
    } else {fmt.Println(item.V)
    }
  }
}

运行:

$ go run main.go 
1
2
error: unknown
3
4
5

咱们应用 item.Error() 查看是否呈现谬误。而后应用 item.V 拜访数据,item.E拜访谬误。

除了应用 for range 之外,咱们还能够调用 ObservableForEach() 办法来实现遍历。ForEach()承受 3 个回调函数:

  • NextFunc:类型为func (v interface {}),解决数据;
  • ErrFunc:类型为func (err error),处理错误;
  • CompletedFunc:类型为func ()Observable 实现时调用。

有点 Promise 那味了。应用ForEach(),能够将下面的示例改写为:

func main() {observable := rxgo.Just(1, 2, 3, 4, 5)()
  <-observable.ForEach(func(v interface{}) {fmt.Println("received:", v)
  }, func(err error) {fmt.Println("error:", err)
  }, func() {fmt.Println("completed")
  })
}

运行:

$ go run main.go 
received: 1
received: 2
received: 3
received: 4
received: 5
completed

ForEach()实际上是异步执行的,它返回一个接管告诉的 channel。当 Observable 数据发送结束时,该 channel 会敞开。所以如果要期待 ForEach() 执行实现,咱们须要应用<-。下面的示例中如果去掉<-,可能就没有输入了,因为主 goroutine 完结了,整个程序就退出了。

创立 Observable

下面应用最简略的形式创立 Observable:间接调用 Just() 办法传入一系列数据。上面再介绍几种创立 Observable 的形式。

Create

传入一个 []rxgo.Producer 的切片,其中 rxgo.Producer 的类型为 func(ctx context.Context, next chan<- Item)。咱们能够在代码中调用rxgo.Of(value) 生成数据,rxgo.Error(err)生成谬误,而后发送到 next 通道中:

func main() {observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {next <- rxgo.Of(1)
    next <- rxgo.Of(2)
    next <- rxgo.Of(3)
    next <- rxgo.Error(errors.New("unknown"))
    next <- rxgo.Of(4)
    next <- rxgo.Of(5)
  }})

  ch := observable.Observe()
  for item := range ch {if item.Error() {fmt.Println("error:", item.E)
    } else {fmt.Println(item.V)
    }
  }
}

当然,分成两个 rxgo.Producer 也是一样的成果:

observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {next <- rxgo.Of(1)
  next <- rxgo.Of(2)
  next <- rxgo.Of(3)
  next <- rxgo.Error(errors.New("unknown"))
  }, func(ctx context.Context, next chan<- rxgo.Item) {next <- rxgo.Of(4)
  next <- rxgo.Of(5)
}})

FromChannel

FromChannel能够间接从一个已存在的 <-chan rxgo.Item 对象中创立 Observable

func main() {ch := make(chan rxgo.Item)
    go func() {
        for i := 1; i <= 5; i++ {ch <- rxgo.Of(i)
        }
        close(ch)
    }()

    observable := rxgo.FromChannel(ch)
    for item := range observable.Observe() {fmt.Println(item.V)
    }
}

留神:

通道须要手动调用 close() 敞开,下面 Create() 办法外部 rxgo 主动帮咱们执行了这个步骤。

Interval

Interval以传入的工夫距离生成一个无穷的数字序列,从 0 开始:

func main() {observable := rxgo.Interval(rxgo.WithDuration(5 * time.Second))
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

下面的程序启动后,第 5s 输入 0,第 10s 输入 1,…,而且不会进行。

咱们能够用 time.Ticker 实现雷同的性能:

func main() {t := time.NewTicker(5 * time.Second)

  var count int
  for range t.C {fmt.Println(count)
    count++
  }
}

Range

Range能够生成一个范畴内的数字:

func main() {observable := rxgo.Range(0, 3)
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

下面代码顺次输入 0,1,2,3。

Repeat

在已存在的 Observable 对象上调用Repeat,能够实现每隔指定工夫,反复一次该序列,一共反复指定次数:

func main() {observable := rxgo.Just(1, 2, 3)().Repeat(3, rxgo.WithDuration(1*time.Second),
  )
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

运行下面的代码,立刻输入 1,2,3,而后期待 1s,又输入一次 1,2,3,而后又期待 1s,最初又输入一次 1,2,3。

Start

能够给 Start 办法传入 []rxgo.Supplier 作为参数,它能够蕴含任意数量的 rxgo.Supplier 类型。rxgo.Supplier的底层类型为:

// rxgo/types.go
var Supplier func(ctx context.Context) rxgo.Item

Observable 外部会顺次调用这些 rxgo.Supplier 生成rxgo.Item

func Supplier1(ctx context.Context) rxgo.Item {return rxgo.Of(1)
}

func Supplier2(ctx context.Context) rxgo.Item {return rxgo.Of(2)
}

func Supplier3(ctx context.Context) rxgo.Item {return rxgo.Of(3)
}

func main() {observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3})
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

Observable 分类

依据数据在何处生成,Observable 被分为 HotCold 两种类型(类比热启动和冷启动)。数据在其它中央生成的被成为 Hot Observable。相同,在 Observable 外部生成数据的就是 Cold Observable

应用下面介绍的办法创立的实际上都是 Hot Observable

func main() {ch := make(chan rxgo.Item)
  go func() {
    for i := 0; i < 3; i++ {ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  for item := range observable.Observe() {fmt.Println(item.V)
  }

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

下面创立的是 Hot Observable。然而有个问题,第一次 Observe() 耗费了所有的数据,第二个就没有数据输入了。

Cold Observable 就不会有这个问题,因为它创立的流是独立于每个观察者的。即每次调用 Observe() 都创立一个新的 channel。咱们应用 Defer() 办法创立 Cold Observable,它的参数与 Create() 办法一样。

func main() {observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {ch <- rxgo.Of(i)
    }
  }})

  for item := range observable.Observe() {fmt.Println(item.V)
  }

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

输入:

$ go run main.go
0
1
2
0
1
2

可连贯的 Observable

可连贯的(Connectable)Observable 对一般的 Observable 进行了一层组装。调用它的 Observe() 办法时并不会立即产生数据。应用它,咱们能够等所有的观察者都准备就绪了(即调用了 Observe() 办法)之后,再调用其 Connect() 办法开始生成数据。咱们通过两个示例比拟应用一般的 Observable 和可连贯的 Observable 有何不同。

一般的:

func main() {ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  observable.DoOnNext(func(i interface{}) {fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {fmt.Printf("Second observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
}

上例中咱们应用 DoOnNext() 办法来注册观察者。因为 DoOnNext() 办法是异步执行的,所以为了期待后果输入,在最初减少了一行time.Sleep。运行:

$ go run main.go
First observer: 1
First observer: 2
First observer: 3
before subscribe second observer

由输入能够看出,注册第一个观察者之后就开始产生数据了。

咱们通过在创立 Observable 的办法中指定 rxgo.WithPublishStrategy() 选项就能够创立可连贯的 Observable

func main() {ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

  observable.DoOnNext(func(i interface{}) {fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {fmt.Printf("Second observer: %d\n", i)
  })

  observable.Connect(context.Background())
  time.Sleep(3 * time.Second)
}

运行输入:

$ go run main.go
before subscribe second observer
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3

下面是等两个观察者都注册之后,并且手动调用了 Observable 的 Connect() 办法才产生数据。而且可连贯的 Observable 有一个个性:它是冷启动的!!!,即每个观察者都会收到一份雷同的拷贝。

转换 Observable

rxgo 提供了很多转换函数,能够批改通过它的rxgo.Item,而后再发送给下一个阶段。

Map

Map()办法简略批改它收到的 rxgo.Item 而后发送到下一个阶段(转换或过滤)。Map()承受一个类型为 func (context.Context, interface{}) (interface{}, error) 的函数。第二个参数就是 rxgo.Item 中的数据,返回转换后的数据。如果出错,则返回谬误。

func main() {observable := rxgo.Just(1, 2, 3)()

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {return i.(int)*2 + 1, nil
  }).Map(func(_ context.Context, i interface{}) (interface{}, error) {return i.(int)*3 + 2, nil
  })

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

上例中每个数字通过两个 Map,第一个Map 执行 2 * i + 1,第二个Map 执行3 * i + 2。即对于每个数字来说,最终进行的变换为3 * (2 * i + 1) + 2。运行:

$ go run main.go
11
17
23

Marshal

Marshal对通过它的数据进行一次 Marshal。这个Marshal 能够是 json.Marshal/proto.Marshal,甚至咱们本人写的Marshal 函数。它承受一个类型为 func(interface{}) ([]byte, error) 的函数用于对数据进行解决。

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    User{
      Name: "dj",
      Age:  18,
    },
    User{
      Name: "jw",
      Age:  20,
    },
  )()

  observable = observable.Marshal(json.Marshal)

  for item := range observable.Observe() {fmt.Println(string(item.V.([]byte)))
  }
}

因为 Marshal 操作返回的是 []byte 类型,咱们须要进行类型转换之后再输入。

Unmarshal

既然有 Marshal,也就有它的相同操作UnmarshalUnmarshal 用于将一个 []byte 类型转换为相应的构造体或其余类型。与 Marshal 不同,Unmarshal须要晓得转换的指标类型,所以须要提供一个函数用于生成该类型的对象。而后将 []byte 数据 Unmarshal 到该对象中。Unmarshal承受两个参数,参数一是类型为 func([]byte, interface{}) error 的函数,参数二是 func () interface{} 用于生成理论类型的对象。咱们拿下面的例子中生成的 JSON 字符串作为数据,将它们从新 UnmarshalUser对象:

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(`{"name":"dj","age":18}`,
    `{"name":"jw","age":20}`,
  )()

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {return []byte(i.(string)), nil
  }).Unmarshal(json.Unmarshal, func() interface{} {return &User{}
  })

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

因为 Unmarshaller 承受 []byte 类型的参数,咱们在 Unmarshal 之前加了一个 Map 用于将 string 转为[]byte。运行:

$ go run main.go
&{dj 18}
&{jw 20}

Buffer

Buffer依照肯定的规定收集接管到的数据,而后一次性发送进来(作为切片),而不是收到一个发送一个。有 3 种类型的Buffer

  • BufferWithCount(n):每收到 n 个数据发送一次,最初一次可能少于 n 个;
  • BufferWithTime(n):发送在一个工夫距离 n 内收到的数据;
  • BufferWithTimeOrCount(d, n):收到 n 个数据,或通过 d 工夫距离,发送以后收到的数据。

BufferWithCount

func main() {observable := rxgo.Just(1, 2, 3, 4)()

  observable = observable.BufferWithCount(3)

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

运行:

$ go run main.go
[1 2 3]
[4]

留神,最初一组只有一个。

BufferWithTime

func main() {ch := make(chan rxgo.Item, 1)

  go func() {
    i := 0
    for range time.Tick(time.Second) {ch <- rxgo.Of(i)
      i++
    }
  }()

  observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(3 * time.Second))

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

每 3s 发送一次:

$ go run main.go
[0 1 2]
[3 4 5]
[6 7 8]
...

BufferWithTimeOrCount

func main() {ch := make(chan rxgo.Item, 1)

  go func() {
    i := 0
    for range time.Tick(time.Second) {ch <- rxgo.Of(i)
      i++
    }
  }()

  observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(3*time.Second), 2)

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

下面 3s 能够收集 3 个数据,然而设置了收集 2 个就发送。所以,运行输入为:

$ go run main.go
[0 1]
[2 3]
[4 5]
...

GroupBy

GroupBy依据传入一个 Hash 函数,为每个不同的后果别离创立新的 Observable。换句话说,GroupBy生成一个数据类型为 ObservableObservable

func main() {
  count := 3
  observable := rxgo.Range(0, 10).GroupBy(count, func(item rxgo.Item) int {return item.V.(int) % count
  }, rxgo.WithBufferedChannel(10))

  for subObservable := range observable.Observe() {fmt.Println("New observable:")

    for item := range subObservable.V.(rxgo.Observable).Observe() {fmt.Printf("item: %v\n", item.V)
    }
  }
}

下面依据每个数模 3 的余数将整个流分为 3 组。运行:

$ go run main.go 
New observable:
item: 0
item: 3
item: 6
item: 9
New observable:
item: 1
item: 4
item: 7
item: 10
New observable:
item: 2
item: 5
item: 8

留神 rxgo.WithBufferedChannel(10) 的应用,因为咱们的数字是间断生成的,顺次为 0->1->2->…->9->10。而 Observable 默认是惰性的,即由 Observe() 驱动。内层的 Observe() 在返回一个 0 之后就期待下一个数,然而下一个数 1 不在此 Observable 中。所以会陷入死锁。应用rxgo.WithBufferedChannel(10),设置它们之间的连贯 channel 缓冲区大小为 10,这样即便咱们未取出 channel 外面的数字,上游还是能发送数字进来。

并行操作

默认状况下,这些转换操作都是串行的,即只有一个 goroutine 负责执行转换函数。咱们也能够应用 rxgo.WithPool(n) 选项设置运行 n 个 goroutine,或者 rxgo.WitCPUPool() 选项设置运行与逻辑 CPU 数量相等的 goroutine。

func main() {observable := rxgo.Range(1, 100)

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {time.Sleep(time.Duration(rand.Int31()))
    return i.(int)*2 + 1, nil
  }, rxgo.WithCPUPool())

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

因为是并行,所以 输入程序就不确定了。为了让不确定性更显著一点,我在代码中加了一行time.Sleep

过滤 Observable

Observable 中发送过去的数据并不一定都是咱们须要的,咱们要把不想要的过滤掉。

Filter

Filter()承受一个类型为 func (i interface{}) bool 的参数,通过的数据应用这个函数断言,返回 true 的将发送给下一个阶段。否则,抛弃。

func main() {observable := rxgo.Range(1, 10)

  observable = observable.Filter(func(i interface{}) bool {return i.(int)%2 == 0
  })

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

下面过滤掉奇数,最初只剩下偶数:

$ go run main.go
2
4
6
8
10

ElementAt

ElementAt()只发送指定索引的数据,如 ElementAt(2) 只发送索引为 2 的数据,即第 3 个数据。

func main() {observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)

  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

下面代码输入 2。

Debounce

Debounce()比拟有意思,它收到数据后还会期待指定的工夫距离,后续距离内没有收到其余数据才会发送刚开始的数据。

func main() {ch := make(chan rxgo.Item)

  go func() {ch <- rxgo.Of(1)
    time.Sleep(2 * time.Second)
    ch <- rxgo.Of(2)
    ch <- rxgo.Of(3)
    time.Sleep(2 * time.Second)
    close(ch)
  }()

  observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

下面示例,先收到 1,而后 2s 内没收到数据,所以发送 1。接着收到了数据 2,因为马上又收到了 3,所以 2 不会发送。收到 3 之后 2s 内没有收到数据,发送了 3。所以最初输入为 1,3。

Distinct

Distinct()会记录它发送的所有数据,它不会发送反复的数据。因为数据格式多样,Distinct()要求咱们提供一个函数,依据原数据返回一个惟一标识码(有点相似哈希值)。基于这个标识码去重。

func main() {observable := rxgo.Just(1, 2, 2, 3, 3, 4, 4)().
    Distinct(func(_ context.Context, i interface{}) (interface{}, error) {return i, nil})
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

顺次输入 1,2,3,4,没有反复。

Skip

Skip能够跳过前若干个数据。

func main() {observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

Take

Take只取前若干个数据。

func main() {observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
  for item := range observable.Observe() {fmt.Println(item.V)
  }
}

选项

rxgo 提供的大部分办法的最初一个参数是一个可变长的选项类型。这是 Go 中特有的、经典的选项设计模式。咱们后面曾经应用了:

  • rxgo.WithBufferedChannel(10):设置 channel 的缓存大小;
  • rxgo.WithPool(n)/rxgo.WithCpuPool():应用多个 goroutine 执行转换操作;
  • rxgo.WithPublishStrategy():应用公布策略,即创立可连贯的 Observable

除此之外,rxgo 还提供了很多其余选项。留待大家自行摸索了。

总结

rxgo 让基于 pipelines 的并发编程变得更容易!

大家如果发现好玩、好用的 Go 语言库,欢送到 Go 每日一库 GitHub 上提交 issue????

参考

  1. rxgo GitHub:https://github.com/jordan-wright/rxgo
  2. Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

欢送关注我的微信公众号【GoUpUp】,独特学习,一起提高~

正文完
 0