1 什么是Stream?

Stream是Dart用来解决异步的API,和同样用来解决异步的Future不同的是,Stream能够异步的返回多个后果,而Future只能返回一个,如果你对Future有疑难,能够参考作者的上一篇文章,Dart根底——Dart异步Future与事件循环Event Loop。

2 如何创立Stream?

1.1应用Stream的构造方法

Stream periodicStream = Stream.periodic(Duration(seconds: 2), (num) {  return num;}); 

periodic构造方法次要有两个参数,第一个参数类型为Duration(工夫距离),第二个参数类型为Function,Function每隔一个Duration(工夫距离)会被调用一次,参数num为事件调用的次数,从0开始顺次递增。

翻阅源码 Stream.periodic是应用Timer.periodic加_SyncStreamController实现的

1.2将办法的返回值申明为Stream

Stream<String> timedCounter(Duration interval, [int maxCount]) async* {  int i = 0;  while (true) {   //提早interval(工夫距离)执行一次    await Future.delayed(interval);    //返回i  i++    yield "stream返回${i++}";    if (i == maxCount) break;  }} 

看到这里你可能会有一些疑难什么是async*和yield?

yield为一个用async *_润饰返回值为Stream的函数返回一个值,它就像return,不过他不会完结函数_

Stream asynchronousNaturalsTo(n) async* {  int k = 0;  while (k < n) yield k++;} 

这里波及到了Dart的生成器函数概念,在这里你只须要简略了解yield的作用就能够了

1.3应用StreamController

 var _controller = StreamController<int>();  var _count = 1;  createStream() {  //函数每隔一秒调用一次    Timer.periodic(Duration(seconds: 1), (t) {      _controller.sink.add(_count);      _count++;    });  } 

咱们次要应用_controller的两个属性,应用_controller.Stream获取流,应用_controller.sink.add向流中增加数据,下面的例子应用定时器,每隔一秒向流中增加数据_count

3 Stream的罕用办法

接下来介绍一下Stream的罕用办法

PS:以下Stream罕用办法的展现都是用上面代码创立的流

Stream periodicStream = Stream.periodic(Duration(seconds: 1), (num) {  return num;}); 

3.1 listen

listen作为应用Stream最重要的办法,次要用于监听流的数据变动,每当流的数据变动时,listen中的办法都会被调用。

 periodicStream.listen((event) {      print(event);    }); 

listen办法默认参数为Function,参数中的event为下面示例中返回的num,每当流返回新的数据时,listen办法都会被调用。

控制台输入如下

01234 

打印流返回的数据

listen的onError参数当流呈现谬误时调用。

listen的onDone参数当流敞开时调用。

还有一个cancelOnError属性,默认状况下为true,能够将其设置为false以使订阅在产生谬误后也能持续进行。

3.2 map

Stream.periodic(Duration(seconds: 1), (num) {    return num;  }).map((num) => num * 2) 

应用map将流返回的数据进行转换

控制台输入如下

0246 

3.3 asBroadcastStream()&broadcast

通过Stream的asBroadcastStream()或StreamController的broadcast将单订阅的流转换为多订阅流

什么是单订阅流和多订阅流?

3.3.1 单订阅流

单订阅流顾名思义,此流只能有一个订阅者,也就是单订阅流的listen办法只能被调用一次,当第二次调用单订阅流的listen时会报错,值得一提的是,当咱们创立流时,默认创立的就是单订阅流。

3.3.2 多订阅流

顾名思义,此流能够有多个订阅者,也就是多订阅流的listen办法能够被屡次调用,通过Stream的asBroadcastStream()或StreamController的broadcast将单订阅流转换为多订阅流。

创立多订阅流
Stream broadcastStream = Stream.periodic(Duration(seconds: 5), (num) {  return num;}).asBroadcastStream(); 
var _controller = StreamController<int>.broadcast() 

3.3.3 单订阅流与多订阅流的区别

第一个区别

第一个区别就是下面提到的订阅者数量的区别

第二个区别

咱们重点要议论一下两种流的第二个区别

第二个区别就是单订阅流会持有本人的数据,当订阅者呈现时将本身持有的数据全副返回给订阅者,而多订阅流不会持有任何数据,如果多订阅流没有订阅者,多订阅流会把数据抛弃。

上面咱们用两端代码来展现两种流解决数据上的差异

单订阅流代码展现

创立流

 var _controller = StreamController<int>.broadcast();  var _count = 1;  createStream() {    Timer.periodic(Duration(seconds: 1), (t) {      _controller.sink.add(_count);      _count++;    });  } 

订阅流

createStream();Future.delayed(Duration(seconds: 5), () {  _controller.stream.listen((event) {    print("单订阅流$event");    });}); 

控制台输入如下

能够看到,单订阅流即便前五秒咱们没有订阅,但单订阅流还是在持有数据,当订阅者呈现时将持有的所有数据发送给订阅者。

多订阅流代码展现

创立流

 var _controller = StreamController<int>.broadcast();  var _count = 1;  createStream() {    Timer.periodic(Duration(seconds: 1), (t) {      _controller.sink.add(_count);      _count++;    });  } 

订阅流

 createStream();    Future.delayed(Duration(seconds: 5), () {      _controller.stream.listen((event) {        print("多订阅流$event");      });    });    Future.delayed(Duration(seconds: 10), () {      _controller.stream.listen((event) {        print("多订阅流二$event");      });    }); 

控制台输入

能够看到多订阅流产生的前五条数据都被抛弃了,只有订阅者呈现后生成的数据被发送给了订阅者。

代码看完想必你曾经了解了单订阅流与多订阅流的第二种区别,我制作了两种流程图帮忙你了解

3.4 其余办法

解决 Stream 的办法

上面这些 Stream 类中的办法能够对 Stream 进行解决并返回后果:

Future<T> get first;Future<bool> get isEmpty;Future<T> get last;Future<int> get length;Future<T> get single;Future<bool> any(bool Function(T element) test);Future<bool> contains(Object needle);Future<E> drain<E>([E futureValue]);Future<T> elementAt(int index);Future<bool> every(bool Function(T element) test);Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);Future forEach(void Function(T element) action);Future<String> join([String separator = ""]);Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});Future pipe(StreamConsumer<T> streamConsumer);Future<T> reduce(T Function(T previous, T element) combine);Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});Future<List<T>> toList();Future<Set<T>> toSet(); 

4 治理流订阅

咱们能够应用StreamSubscription对象来对流的订阅进行治理,listen办法的返回值就是StreamSubscription对象

 StreamSubscription subscription =    Stream.periodic(Duration(seconds: 1), (num) {    return num;  }).listen((num) {    print(num);  }); 

4.1 暂停订阅

subscription.pause(); 

4.2 复原订阅

subscription.resume(); 

4.3 勾销订阅

subscription2.cancel(); 

当不须要监听流时记得调用这个办法,否则会造成内存透露

4.4 操作流订阅的例子

以下示例用来展现如何操作流订阅

创立流

 static var _controller = StreamController<int>();  var _count = 1;  createStream() {    Timer.periodic(Duration(seconds: 1), (t) {      _controller.sink.add(_count);      _count++;    });  } 

创立监听及监听治理对象

 StreamSubscription subscription2 = _controller.stream.listen((event) {    print("单订阅流$event");  }); 

操作流订阅的办法

createStream();Future.delayed(Duration(seconds: 3), () {  print("暂停");  subscription2.pause();});Future.delayed(Duration(seconds: 5), () {  print("持续");  subscription2.resume();});Future.delayed(Duration(seconds: 7), () {  print("勾销");  subscription2.cancel();}); 

输入如下

5 在Flutter中应用StreamBuilder组件

5.1 StreamBuilder组件介绍

StreamBuilder组件次要有两个参数

第一个参数stream,要订阅的流

第二个参数builder,widget构建函数

能够应用builder函数的snapshot.connectionState属性依据流的不同状态返回不同的组件

每当StreamBuilder监听的stream有数据变动时,builder函数就会被调用,组件从新构建。

5.2示例代码

import 'package:flutter/cupertino.dart';import 'package:flutter/material.dart';import 'package:flutter_demo/util/util.dart';/// Copyright (C), 2020-2020, flutter_demo/// FileName: streamBuilder_demo/// Author: Jack/// Date: 2020/12/27/// Description:class StreamBuilderDemo extends StatelessWidget {  //创立流  Stream<int> _stream() {    Duration interval = Duration(seconds: 1);    Stream<int> stream = Stream<int>.periodic(interval, (num) {      return num;    });    stream = stream.take(59);    return stream;  }  @override  Widget build(BuildContext context) {    return Scaffold(      appBar: AppBar(        title: Text('Stream Demo'),      ),      body: Center(        child: StreamBuilder(          stream: _stream(),          builder: (BuildContext context, AsyncSnapshot<int> snapshot) {            if (snapshot.connectionState == ConnectionState.done) {              return Text(                '1 Minute Completed',                style: TextStyle(                  fontSize: 30.0,                ),              );            } else if (snapshot.connectionState == ConnectionState.waiting) {              return Text(                'Waiting For Stream',                style: TextStyle(                  fontSize: 30.0,                ),              );            }            return Text(              '00:${snapshot.data.toString().padLeft(2, '0')}',              style: TextStyle(                fontSize: 30.0,              ),            );          },        ),      ),    );  }}