共计 6042 个字符,预计需要花费 16 分钟才能阅读完成。
1 什么是 Stream?
Stream 是 Dart 用来解决异步的 API,和同样用来解决异步的 Future 不同的是,Stream 能够异步的返回多个后果,而 Future 只能返回一个,如果你对 Future 有疑难,能够参考作者的上一篇文章,Dart 根底——Dart 异步 Future 与事件循环 Event Loop。
2 如何创立 Stream?
1.1 应用 Stream 的构造方法
Stream periodicStream = Stream.periodic(Duration(seconds: 2), (num) {return num;});
periodic 构造方法次要有两个参数,第一个参数类型为 Duration(工夫距离),第二个参数类型为 Function,Function 每隔一个 Duration(工夫距离)会被调用一次, 参数 num 为事件调用的次数,从 0 开始顺次递增。
翻阅源码 Stream.periodic 是应用 Timer.periodic 加_SyncStreamController 实现的
1.2 将办法的返回值申明为 Stream
Stream<String> timedCounter(Duration interval, [int maxCount]) async* {
int i = 0;
while (true) {
// 提早 interval(工夫距离)执行一次
await Future.delayed(interval);
// 返回 i i++
yield "stream 返回 ${i++}";
if (i == maxCount) break;
}
}
看到这里你可能会有一些疑难什么是 async* 和 yield?
yield 为一个用 async *_润饰返回值为 Stream 的函数返回一个值,它就像 return, 不过他不会完结函数_
Stream asynchronousNaturalsTo(n) async* {
int k = 0;
while (k < n) yield k++;
}
这里波及到了 Dart 的生成器函数概念,在这里你只须要简略了解 yield 的作用就能够了
1.3 应用 StreamController
var _controller = StreamController<int>();
var _count = 1;
createStream() {
// 函数每隔一秒调用一次
Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
_count++;
});
}
咱们次要应用 _controller
的两个属性,应用 _controller.Stream
获取流,应用 _controller.sink.add
向流中增加数据,下面的例子应用定时器,每隔一秒向流中增加数据_count
。
3 Stream 的罕用办法
接下来介绍一下 Stream 的罕用办法
PS: 以下 Stream 罕用办法的展现都是用上面代码创立的流
Stream periodicStream = Stream.periodic(Duration(seconds: 1), (num) {return num;});
3.1 listen
listen 作为应用 Stream 最重要的办法,次要用于监听流的数据变动,每当流的数据变动时,listen 中的办法都会被调用。
periodicStream.listen((event) {print(event);
});
listen 办法默认参数为 Function,参数中的 event 为下面示例中返回的 num,每当流返回新的数据时,listen 办法都会被调用。
控制台输入如下
0
1
2
3
4
打印流返回的数据
listen 的 onError 参数当流呈现谬误时调用。
listen 的 onDone 参数当流敞开时调用。
还有一个 cancelOnError
属性,默认状况下为 true,能够将其设置为 false 以使订阅在产生谬误后也能持续进行。
3.2 map
Stream.periodic(Duration(seconds: 1), (num) {return num;}).map((num) => num * 2)
应用 map 将流返回的数据进行转换
控制台输入如下
0
2
4
6
3.3 asBroadcastStream()&broadcast
通过 Stream 的 asBroadcastStream()或 StreamController 的 broadcast 将单订阅的流转换为多订阅流
什么是单订阅流和多订阅流?
3.3.1 单订阅流
单订阅流顾名思义,此流只能有一个订阅者,也就是单订阅流的 listen 办法只能被调用一次,当第二次调用单订阅流的 listen 时会报错,值得一提的是,当咱们创立流时,默认创立的就是单订阅流。
3.3.2 多订阅流
顾名思义,此流能够有多个订阅者,也就是多订阅流的 listen 办法能够被屡次调用,通过 Stream 的 asBroadcastStream()或 StreamController 的 broadcast 将单订阅流转换为多订阅流。
创立多订阅流
Stream broadcastStream = Stream.periodic(Duration(seconds: 5), (num) {return num;}).asBroadcastStream();
var _controller = StreamController<int>.broadcast()
3.3.3 单订阅流与多订阅流的区别
第一个区别
第一个区别就是下面提到的订阅者数量的区别
第二个区别
咱们重点要议论一下两种流的第二个区别
第二个区别就是单订阅流会持有本人的数据,当订阅者呈现时将本身持有的数据全副返回给订阅者,而多订阅流不会持有任何数据,如果多订阅流没有订阅者,多订阅流会把数据抛弃。
上面咱们用两端代码来展现两种流解决数据上的差异
单订阅流代码展现
创立流
var _controller = StreamController<int>.broadcast();
var _count = 1;
createStream() {Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
_count++;
});
}
订阅流
createStream();
Future.delayed(Duration(seconds: 5), () {_controller.stream.listen((event) {print("单订阅流 $event");
});
});
控制台输入如下
能够看到,单订阅流即便前五秒咱们没有订阅,但单订阅流还是在持有数据,当订阅者呈现时将持有的所有数据发送给订阅者。
多订阅流代码展现
创立流
var _controller = StreamController<int>.broadcast();
var _count = 1;
createStream() {Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
_count++;
});
}
订阅流
createStream();
Future.delayed(Duration(seconds: 5), () {_controller.stream.listen((event) {print("多订阅流 $event");
});
});
Future.delayed(Duration(seconds: 10), () {_controller.stream.listen((event) {print("多订阅流二 $event");
});
});
控制台输入
能够看到多订阅流产生的前五条数据都被抛弃了,只有订阅者呈现后生成的数据被发送给了订阅者。
代码看完想必你曾经了解了单订阅流与多订阅流的第二种区别,我制作了两种流程图帮忙你了解
3.4 其余办法
解决 Stream 的办法
上面这些 Stream 类中的办法能够对 Stream 进行解决并返回后果:
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
4 治理流订阅
咱们能够应用 StreamSubscription 对象来对流的订阅进行治理,listen 办法的返回值就是 StreamSubscription 对象
StreamSubscription subscription =
Stream.periodic(Duration(seconds: 1), (num) {return num;}).listen((num) {print(num);
});
4.1 暂停订阅
subscription.pause();
4.2 复原订阅
subscription.resume();
4.3 勾销订阅
subscription2.cancel();
当不须要监听流时记得调用这个办法,否则会造成内存透露
4.4 操作流订阅的例子
以下示例用来展现如何操作流订阅
创立流
static var _controller = StreamController<int>();
var _count = 1;
createStream() {Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
_count++;
});
}
创立监听及监听治理对象
StreamSubscription subscription2 = _controller.stream.listen((event) {print("单订阅流 $event");
});
操作流订阅的办法
createStream();
Future.delayed(Duration(seconds: 3), () {print("暂停");
subscription2.pause();});
Future.delayed(Duration(seconds: 5), () {print("持续");
subscription2.resume();});
Future.delayed(Duration(seconds: 7), () {print("勾销");
subscription2.cancel();});
输入如下
5 在 Flutter 中应用 StreamBuilder 组件
5.1 StreamBuilder 组件介绍
StreamBuilder 组件次要有两个参数
第一个参数 stream,要订阅的流
第二个参数 builder,widget 构建函数
能够应用 builder 函数的 snapshot.connectionState 属性依据流的不同状态返回不同的组件
每当 StreamBuilder 监听的 stream 有数据变动时,builder 函数就会被调用,组件从新构建。
5.2 示例代码
import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:flutter_demo/util/util.dart';
/// Copyright (C), 2020-2020, flutter_demo
/// FileName: streamBuilder_demo
/// Author: Jack
/// Date: 2020/12/27
/// Description:
class StreamBuilderDemo extends StatelessWidget {
// 创立流
Stream<int> _stream() {Duration interval = Duration(seconds: 1);
Stream<int> stream = Stream<int>.periodic(interval, (num) {return num;});
stream = stream.take(59);
return stream;
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Stream Demo'),
),
body: Center(
child: StreamBuilder(stream: _stream(),
builder: (BuildContext context, AsyncSnapshot<int> snapshot) {if (snapshot.connectionState == ConnectionState.done) {
return Text(
'1 Minute Completed',
style: TextStyle(fontSize: 30.0,),
);
} else if (snapshot.connectionState == ConnectionState.waiting) {
return Text(
'Waiting For Stream',
style: TextStyle(fontSize: 30.0,),
);
}
return Text('00:${snapshot.data.toString().padLeft(2,'0')}',
style: TextStyle(fontSize: 30.0,),
);
},
),
),
);
}
}