乐趣区

关于前端:Dart基础如何在DartFlutter中使用Stream

1 什么是 Stream?

Stream 是 Dart 用来解决异步的 API,和同样用来解决异步的 Future 不同的是,Stream 能够异步的返回多个后果,而 Future 只能返回一个,如果你对 Future 有疑难,能够参考作者的上一篇文章,Dart 根底——Dart 异步 Future 与事件循环 Event Loop。

2 如何创立 Stream?

1.1 应用 Stream 的构造方法

Stream periodicStream = Stream.periodic(Duration(seconds: 2), (num) {return num;}); 

periodic 构造方法次要有两个参数,第一个参数类型为 Duration(工夫距离),第二个参数类型为 Function,Function 每隔一个 Duration(工夫距离)会被调用一次, 参数 num 为事件调用的次数,从 0 开始顺次递增。

翻阅源码 Stream.periodic 是应用 Timer.periodic 加_SyncStreamController 实现的

1.2 将办法的返回值申明为 Stream

Stream<String> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
   // 提早 interval(工夫距离)执行一次
    await Future.delayed(interval);
    // 返回 i  i++
    yield "stream 返回 ${i++}";
    if (i == maxCount) break;
  }
} 

看到这里你可能会有一些疑难什么是 async* 和 yield?

yield 为一个用 async *_润饰返回值为 Stream 的函数返回一个值,它就像 return, 不过他不会完结函数_

Stream asynchronousNaturalsTo(n) async* {
  int k = 0;
  while (k < n) yield k++;
} 

这里波及到了 Dart 的生成器函数概念,在这里你只须要简略了解 yield 的作用就能够了

1.3 应用 StreamController

 var _controller = StreamController<int>();

  var _count = 1;

  createStream() {
  // 函数每隔一秒调用一次
    Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
      _count++;
    });
  } 

咱们次要应用 _controller 的两个属性,应用 _controller.Stream 获取流,应用 _controller.sink.add 向流中增加数据,下面的例子应用定时器,每隔一秒向流中增加数据_count

3 Stream 的罕用办法

接下来介绍一下 Stream 的罕用办法

PS: 以下 Stream 罕用办法的展现都是用上面代码创立的流

Stream periodicStream = Stream.periodic(Duration(seconds: 1), (num) {return num;}); 

3.1 listen

listen 作为应用 Stream 最重要的办法,次要用于监听流的数据变动,每当流的数据变动时,listen 中的办法都会被调用。

 periodicStream.listen((event) {print(event);
    }); 

listen 办法默认参数为 Function,参数中的 event 为下面示例中返回的 num,每当流返回新的数据时,listen 办法都会被调用。

控制台输入如下

0
1
2
3
4 

打印流返回的数据

listen 的 onError 参数当流呈现谬误时调用。

listen 的 onDone 参数当流敞开时调用。

还有一个 cancelOnError 属性,默认状况下为 true,能够将其设置为 false 以使订阅在产生谬误后也能持续进行。

3.2 map

Stream.periodic(Duration(seconds: 1), (num) {return num;}).map((num) => num * 2) 

应用 map 将流返回的数据进行转换

控制台输入如下

0
2
4
6 

3.3 asBroadcastStream()&broadcast

通过 Stream 的 asBroadcastStream()或 StreamController 的 broadcast 将单订阅的流转换为多订阅流

什么是单订阅流和多订阅流?

3.3.1 单订阅流

单订阅流顾名思义,此流只能有一个订阅者,也就是单订阅流的 listen 办法只能被调用一次,当第二次调用单订阅流的 listen 时会报错,值得一提的是,当咱们创立流时,默认创立的就是单订阅流。

3.3.2 多订阅流

顾名思义,此流能够有多个订阅者,也就是多订阅流的 listen 办法能够被屡次调用,通过 Stream 的 asBroadcastStream()或 StreamController 的 broadcast 将单订阅流转换为多订阅流。

创立多订阅流
Stream broadcastStream = Stream.periodic(Duration(seconds: 5), (num) {return num;}).asBroadcastStream(); 
var _controller = StreamController<int>.broadcast() 

3.3.3 单订阅流与多订阅流的区别

第一个区别

第一个区别就是下面提到的订阅者数量的区别

第二个区别

咱们重点要议论一下两种流的第二个区别

第二个区别就是单订阅流会持有本人的数据,当订阅者呈现时将本身持有的数据全副返回给订阅者,而多订阅流不会持有任何数据,如果多订阅流没有订阅者,多订阅流会把数据抛弃。

上面咱们用两端代码来展现两种流解决数据上的差异

单订阅流代码展现

创立流

 var _controller = StreamController<int>.broadcast();
  var _count = 1;

  createStream() {Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
      _count++;
    });
  } 

订阅流

createStream();
Future.delayed(Duration(seconds: 5), () {_controller.stream.listen((event) {print("单订阅流 $event");
    });
}); 

控制台输入如下

能够看到,单订阅流即便前五秒咱们没有订阅,但单订阅流还是在持有数据,当订阅者呈现时将持有的所有数据发送给订阅者。

多订阅流代码展现

创立流

 var _controller = StreamController<int>.broadcast();
  var _count = 1;

  createStream() {Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
      _count++;
    });
  } 

订阅流

 createStream();
    Future.delayed(Duration(seconds: 5), () {_controller.stream.listen((event) {print("多订阅流 $event");
      });
    });
    Future.delayed(Duration(seconds: 10), () {_controller.stream.listen((event) {print("多订阅流二 $event");
      });
    }); 

控制台输入

能够看到多订阅流产生的前五条数据都被抛弃了,只有订阅者呈现后生成的数据被发送给了订阅者。

代码看完想必你曾经了解了单订阅流与多订阅流的第二种区别,我制作了两种流程图帮忙你了解

3.4 其余办法

解决 Stream 的办法

上面这些 Stream 类中的办法能够对 Stream 进行解决并返回后果:

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet(); 

4 治理流订阅

咱们能够应用 StreamSubscription 对象来对流的订阅进行治理,listen 办法的返回值就是 StreamSubscription 对象

 StreamSubscription subscription =
    Stream.periodic(Duration(seconds: 1), (num) {return num;}).listen((num) {print(num);
  }); 

4.1 暂停订阅

subscription.pause(); 

4.2 复原订阅

subscription.resume(); 

4.3 勾销订阅

subscription2.cancel(); 

当不须要监听流时记得调用这个办法,否则会造成内存透露

4.4 操作流订阅的例子

以下示例用来展现如何操作流订阅

创立流

 static var _controller = StreamController<int>();
  var _count = 1;
  createStream() {Timer.periodic(Duration(seconds: 1), (t) {_controller.sink.add(_count);
      _count++;
    });
  } 

创立监听及监听治理对象

 StreamSubscription subscription2 = _controller.stream.listen((event) {print("单订阅流 $event");
  }); 

操作流订阅的办法

createStream();
Future.delayed(Duration(seconds: 3), () {print("暂停");
  subscription2.pause();});
Future.delayed(Duration(seconds: 5), () {print("持续");
  subscription2.resume();});
Future.delayed(Duration(seconds: 7), () {print("勾销");
  subscription2.cancel();}); 

输入如下

5 在 Flutter 中应用 StreamBuilder 组件

5.1 StreamBuilder 组件介绍

StreamBuilder 组件次要有两个参数

第一个参数 stream,要订阅的流

第二个参数 builder,widget 构建函数

能够应用 builder 函数的 snapshot.connectionState 属性依据流的不同状态返回不同的组件

每当 StreamBuilder 监听的 stream 有数据变动时,builder 函数就会被调用,组件从新构建。

5.2 示例代码

import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:flutter_demo/util/util.dart';

/// Copyright (C), 2020-2020, flutter_demo
/// FileName: streamBuilder_demo
/// Author: Jack
/// Date: 2020/12/27
/// Description:
class StreamBuilderDemo extends StatelessWidget {
  // 创立流
  Stream<int> _stream() {Duration interval = Duration(seconds: 1);
    Stream<int> stream = Stream<int>.periodic(interval, (num) {return num;});
    stream = stream.take(59);
    return stream;
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream Demo'),
      ),
      body: Center(
        child: StreamBuilder(stream: _stream(),
          builder: (BuildContext context, AsyncSnapshot<int> snapshot) {if (snapshot.connectionState == ConnectionState.done) {
              return Text(
                '1 Minute Completed',
                style: TextStyle(fontSize: 30.0,),
              );
            } else if (snapshot.connectionState == ConnectionState.waiting) {
              return Text(
                'Waiting For Stream',
                style: TextStyle(fontSize: 30.0,),
              );
            }
            return Text('00:${snapshot.data.toString().padLeft(2,'0')}',
              style: TextStyle(fontSize: 30.0,),
            );
          },
        ),
      ),
    );
  }
} 
退出移动版