关于ios:Flutter-详解-六深入了解Stream

41次阅读

共计 8944 个字符,预计需要花费 23 分钟才能阅读完成。

Future

Future有三种状态 未实现 实现带有值 实现带有异样 ,应用Future 能够简化事件工作。如果你有一个按钮,点击之后开始下载图片,首先事件循环机制会解决你的点击事件,而后开始下载图片,当下载实现,你能够应用 then 来注册回调,而后获取到图片并显示进去。

通常咱们不会间接创立,网络下载图片会返回一个 Future, 文件I/O 会返回一个 Future, 那咱们怎么创立一个呢?只须要关键字async 就示意该函数异步执行,返回类型是Future<T>

Future<String> getStr()async{var str = HttpRequest.getString('www.fgyong.cn');
  return str;
}

应用 http 申请地址 www.fgyong.cn 获取数据,而后返回。

如何接管文本呢?

其实很简略,只须要应用 await 关键字即可,用来注册 then 回调。

main(List<String> args) async {String string = await getStr();
  print(string);
}

等同于:

main(List<String> args) async {getStr().then((value) {print(value);
  });
}

官网比拟举荐前者,因为前者看起来很像同步函数,少了层层嵌套,不便开发者了解代码。
网络下载想提早动画暗藏工夫,能够应用Future.delayed()

await Future.delayed(Duration(seconds: 2), () {hideAnimation();
});

如果曾经带有值的想异步去执行,那么能够应用Future.value()

Stream

dart:async库蕴含对许多 Dart API 很重要的两种类型:StreamFuture。如果Future 示意单个计算的后果,则流是一系列后果。您侦听流以获取无关后果(数据和谬误)以及流敞开的告诉。您还能够在收听流时暂停播放或在流实现之前进行收听。

如何应用 Stream

流能够通过多种形式创立,后续在认真解说,然而它们都能够以雷同的形式应用:异步 for 循环(通常仅称为 await for)遍历流的事件,如for 循环迭代遍历。例如:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {sum += value;}
  return sum;
}

此代码仅接管整数事件流中的每个事件,将它们相加,而后返回(和)其和。当循环主体完结时,函数将暂停,直到下一个事件达到或流实现为止。该函数标记有 async 关键字,在应用 await for 循环时须要此关键字。以下示例通过应用 async * 函数生成简略的整数流来测试后面的代码:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {sum += value;}
  return sum;
}

Stream<int> countStream(int to) async* {for (int i = 1; i <= to; i++) {yield i;}
}

main() async {var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

当流中没有更多事件时,就实现流,并告诉接管事件的代码,就像告诉新事件到来一样。应用 await for 循环读取事件时,流实现后循环进行。在某些状况下,流实现之前会产生谬误;可能是网络从近程服务器上获取文件时产生故障,或者创立事件的代码存在谬误,然而有人须要理解它。流还能够传递谬误事件,就像传递数据事件一样。大多数流将在呈现第一个谬误后进行,但有可能传递多个谬误的流以及在产生谬误事件后传递更多数据的流。在本文档中,咱们仅探讨最多产生一个谬误的流。当应用 await for 读取流时,循环语句会引发谬误。这也完结了循环。您能够应用 try-catch 捕捉谬误。以下示例在循环迭代器等于时引发谬误 4:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {await for (var value in stream) {sum += value;}
  } catch (e) {return -1;}
  return sum;
}

Stream<int> countStream(int to) async* {for (int i = 1; i <= to; i++) {if (i == 4) {throw new Exception('Intentional exception');
    } else {yield i;}
  }
}

main() async {var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

两种 Stream

繁多订阅流 最常见的流蕴含一系列事件,这些事件是较大整体的一部分。事件必须以正确的程序传递,并且不能失落任何事件。这是您在读取文件或接管 Web 申请时取得的流。这样的流只能被收听一次。稍后再次收听可能意味着错过了最后的事件,而后其余部分毫无意义。当您开始收听时,数据将被提取并以块的模式提供。播送流 另一种流是针对能够一次解决的单个音讯的。例如,这种流可用于浏览器中的鼠标事件。您能够随时开始收听这样的流,并且在收听时会触发事件。多个收听者能够同时收听,并且您能够在勾销上一个订阅之后稍后再次收听。

Stream 的办法

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

这么多办法根本都能够应用 await for 来循环

Future<bool> contains(Object needle) async {await for (var event in this) {if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {await for (var event in this) {action(event);
  }
}

Future<List<T>> toList() async {final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

批改 Stream

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

这些根本都是应用闭包过滤流的内容,当然也能够转换内容。

监听 Stream

最初是监听,当流扭转时会触发监听listen(),所有的流都能够被监听。

StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

创立本人的 Stream

创立 Stream 大略有三种,如下所示:

  • 转换Stram
  • 应用 async* 创立Stream
  • 应用 StreamController 创立

转换 Stram

常常有些 Stream 蕴含的值不是咱们想要的,那么就须要咱们转换一下了,例如咱们把数字转成字符串.

  Future<String> _toString() async {var s = await _stream().map((event) => event.toString()).join('|');
    return s;
  }

也能够转成数组

 Future<List> _toList() async {var s = await _stream().toList();
    return s;
  }

应用 async* 创立Stream

创立新流的一种办法是应用异步生成器 (async *) 函数。在调用该函数时创立该流,并且在侦听该流时该函数的主体开始运行。函数返回时,流敞开。在函数返回之前,它能够应用 yieldyield *语句在流上收回事件。

这是一个原始示例,该示例会定期发射数字:

Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函数返回一个 Stream。当收听该流时,主体开始运行。它重复提早申请的工夫距离,而后产生下一个数字。如果疏忽count 参数,则循环上没有进行条件,因而流永远输入越来越大的数字 - 或直到侦听器勾销其订阅为止。当侦听器勾销时(通过在 listen() 办法返回的 StreamSubscription 对象上调用 cancel()),则主体下一次达到yield 语句时,yield将充当 return 语句。执行所有关闭的 finally 块,而后函数退出。如果函数尝试在退出前产生一个值,则该操作将失败并充当返回值。当函数最终退出时,由 cancel() 办法返回的 Future 实现。如果函数以谬误退出,则 Future 会以该谬误完结;否则,它以 null 完结。另一个更有用的示例是一个转换序列的函数:

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {for (var future in futures) {
    var result = await future;
    yield result;
  }
}

应用 yield*

应用 yield* 来调用其余的函数取下一个值,当不获取的时候,则不运行。

  Stream<int> _stream() async* {if (_count < 10) {
      yield _count++;
      await Future.delayed(Duration(seconds: 1));
      sleep(Duration(seconds: 1));
      yield* _getDataFromServer();}
  }

yield*yield 区别是后者间接返回一个固定的值,而前者返回是的是一个函数。前者多用于分流,把一个 Stream<T> 分为其余的 Stream<R>Stream<E>

应用 StreamController 创立stream

_streamController = StreamController();
// 监听
_streamController.stream.listen((event) {})
/// 增加数据
_streamController.add('data');

StreamControlelr的类图如下所示,他们从 SinkStreamSink都只是到导入接口,并无间接继承关系,在抽象类方面依照性能解耦,最终由 StreamController 整合,负责增加的是 StreamControlelr, 负责监听的是Stream, 最终在以后的Zone 中执行回调 Zone.runUnaryGuarded()Zone 相似一个沙盒环境,在 APP 启动的时候创立。

原理

StreamController中初始化,间接调用了 _SyncStreamController,具体性能全副在_SyncStreamController(同步) 或_AsyncStreamController(异步)中实现。

factory StreamController({void onListen(),
  void onPause(),
  void onResume(),
  onCancel(),
  bool sync: false}) {
return sync
    ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
    : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}

最终在 _BufferingStreamSubscription 类中,实现了
onDoneonError函数,他们别离在类实例化的时候注册回调函数。

void onData(void handleData(T event)) {
  handleData ??= _nullDataHandler;// 默认值
  _onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
}
void onDone(void handleDone()) {
  handleDone ??= _nullDoneHandler;
  _onDone = _zone.registerCallback(handleDone);
}

那么 listener 函数是在何时增加的呢?
在获取 stream_ControllerStream(), 最终 listener 函数是 _StreamImpl extends Steam 中实现的,代码如下:

StreamSubscription<T> listen(void onData(T data),
      {Function onError, void onDone(), bool cancelOnError}) {cancelOnError = identical(true, cancelOnError);
    StreamSubscription<T> subscription =
        _createSubscription(onData, onError, onDone, cancelOnError);
    _onListen(subscription);
    return subscription;
  }
  
/// 创立一个订阅者
StreamSubscription<T> _createSubscription(void onData(T data),
    Function onError, void onDone(), bool cancelOnError) {
  return new _BufferingStreamSubscription<T>(onData, onError, onDone, cancelOnError);
}

最终监听是执行的 _BufferingStreamSubscription_onData(void HandleData(T event)),这里是应用了 _zone.registerUnaryCallback 来注册回调函数,否则在调用的时候会报错。

void onData(void handleData(T event)) {
  handleData ??= _nullDataHandler;
  _onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
}

stream.add() 函数执行了_add(), 源码如下

void _add(T data) {assert(!_isClosed);
  if (_isCanceled) return;
  if (_canFire) {_sendData(data);
  } else {_addPending(new _DelayedData<T>(data));
  }
}

当状态已敞开,间接断言,当状态已勾销,返回操作,当能够发送数据,则执行 _sendData() 函数,该函数才是最终发送数据,执行 listen 操作的要害函数,源码如下:

void _sendData(T data) {assert(!_isCanceled);
  assert(!_isPaused);
  assert(!_inCallback);
  bool wasInputPaused = _isInputPaused;
  _state |= _STATE_IN_CALLBACK;// 取出来第六位
  _zone.runUnaryGuarded(_onData, data);
  _state &= ~_STATE_IN_CALLBACK;// 舍弃第六位
  _checkState(wasInputPaused);
}

通过 final Zone _zone = Zone.current; 获取以后的 Zone 来执行曾经注册的回调 _zone.runUnaryGuarded(_onData, data), 执行结束, 应用_state&=~_STATE_IN_CALLBACK 来保留以后状态,应用 &=~ 舍弃第 6 位数字,_STATE_IN_CALLBACK值为 32,那么低第六位是 1,~_STATE_IN_CALLBACK, 除了第六位,剩下的都是 1,而后&= 来取出来其余的位数值保留下来。

而后在 _checkState(wasInutPaused) 来确定在执行 callback 两头并无状态扭转。

那么再执行结束 _sendDone() 的时候也是如此,首先判断是否曾经勾销,没勾销的话,执行 _onDone 回调。

 void _sendDone() {assert(!_isCanceled);
    assert(!_isPaused);
    assert(!_inCallback);

    void sendDone() {
      // If the subscription has been canceled while waiting for the cancel
      // future to finish we must not report the done event.
      if (!_waitsForCancel) return;
      _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
      _zone.runGuarded(_onDone);
      _state &= ~_STATE_IN_CALLBACK;
    }

    _cancel();
    _state |= _STATE_WAIT_FOR_CANCEL;
    if (_cancelFuture != null &&
        !identical(_cancelFuture, Future._nullFuture)) {_cancelFuture.whenComplete(sendDone);
    } else {sendDone();
    }
  }

事件流就是在后期应用 Zone 注册,在 add 的时候应用 Zone 调用曾经注册好的回调,播送是循环调用

要害函数:

/// 注册回调
_zone.registerUnaryCallback<dynamic, T>(handleData)

/// 注册不带参数的回调

_zone.registerCallback(R callback())

/// 执行 附带参数的回调

_zone.runUnaryGuarded(_onData, data)

更多 API 能够查看官网源码。

参考

  • 官网 库
  • 一个好玩的 动画 loading 库
  • demo code 汇合 github

文章汇总

<<Dart 异步与多线程 >>

<<Flutter 详解(一、深刻理解状态治理 –ScopeModel)>>

<<Flutter 详解(二、深刻理解状态治理 –Redux)>>

<<Flutter 详解(三、深刻理解状态治理 –Provider)>>

<<Flutter 详解(四、深刻理解状态治理 –BLoC)>>

<<Flutter 详解 (五、深刻理解 –Key)>>

<<Flutter 详解(六、深刻理解 –Stream>>

正文完
 0