Future.dart阅读

Future主要用于处理flutter中的异步输入事件, 通过指定不同的初始化构造函数将异步任务转化为同步执行, 提交到当前Isolate的microTask队列或eventTask队列

此外还提供了遍历的方法对future进行管理, 错捕捉和拦截,future遍历, 合并, 同时还可以转换为更为灵活的Stream处理。在实际使用中应特殊注意异常的捕捉。它和Timer,Zone, Isolate密切相关.

Future执行顺序

根据执行效率来分, Future有3种创建方法, 它们的优先级按如下先后顺序

  1. Future.sync: 绝对同步指令, 在当前函数栈中同步执行, 优先级最高
  2. Future.microTask/Future.value: 优先级相同, FIFO
  3. Future(()=> { }): 异步创建, 优先级最低

顺序补充说明, 其实在理解上面 1, 2, 3之后就很容推导了

  1. 同优先级别按照先后顺序执行
  2. 同优先级内层嵌套Future优先级比外层底
  3. Future.sync后接then, 它的优先级降级为 mircoTask 级别, 其它的Future创建then的优先级沿用创建时的优先级。

以下是测试代码:

void main() {
  print('start ....');
  
  Future.microtask(() => print('microtask0'));

  //通过Future.value的方式直接返回一个完成的Future,它直接插入到micoktask中
  Future.value('feature 0').then((value) => print('feature0->1')).then((value) => print('feature0->2'));

  //插入到当前Isolate EventLoop的EventQueue中
  Future(()=> print('feature 1'));
  
  //同步执行插入到当前Isolate EventLoop microTask队列中
  Future.microtask(() => print('microtask2'));

  //Future.sync为同步执行指令,和函数内的同步方法具有相同优先级
  final syncFuture = Future.sync(() => print('sync feature2'));
  syncFuture.then((value) => print('sync then feature3')).then((value) => print('sync then feature4'));
  Future.microtask(() => print('microtask4'));

  //顺序执行,插入到EventQueue中,执行顺序 `feature4` -> `feature5` -> feature6`,小于microTask
  final future4 = Future(() => print('feature4'));
  final microtask = Future.microtask(() => print('microtask5'));
  microtask.then((value) => print('microtask5-0'));

  future4.then((value) => print('feature5')).then((value) => print('feature6'));
  Future.microtask(() => print('microtask6'));
  
  //同步等待 `feature7` 和 `feature8` 执行完毕后再执行后面的 `feature9`
  Future.wait([Future(() => print('feature7')), Future(() => print('feature8'))]).then((value) => print('feature 9'));
  
  //同层级的Future执行按先进先出的规则,嵌套的内层feature11,feature12的future要慢于外层 `feature15`
  Future((){
     print('feature 10');
     Future.microtask(() => 'feature 10 mictask');
     Future(() => print('feature11')).then((value) => print('feature 12')); //内层feature11,feature12的future要慢于外层 `feature15`
  }).then((value) => print('feature 13'));

  Future(() => print('feature 15'));

  Future.microtask(() => print('microtask15')).then((value) => print('microtask15 then'));

  Future.microtask(() => print('microtask16'));

  print('end ....');

//Log输出
/**
start ....
sync feature2
end ....
sync feature3
sync feature4
microtask1
microtask1 then
microtask2 then
microtask2
feature 1
feature4
feature5
feature6
feature7
feature8
feature 9
feature 10
  */
}

相关的类介绍

FutureOr<T> //用于描述一个Future对象,返回Future和Future的值

//T可以是任意的数据类型
Comparable (dart.core)
    BigInt (dart.core)  //大整数,hexacdemical literal
    DateTime (dart.core)
    Duration (dart.core)
    num (dart.core)
        double (dart.core)
        int (dart.core)
    String (dart.core)
````

``` dart
Future (dart.async)
    _Future (dart.async)  //一个延迟计算的对象,它是一个抽象类
    SynchronousFuture (synchronous_future.dart) //提供一个同步执行的futrue
    TickerFuture (ticker.dart) //提供一个间隔执行的Future对象,动画vsyn信号回调专用
    DelegatingFuture (future.dart) //对 `_Future` 类的包装,简化用户调用接口
        ResultFuture (future.dart) //用于提供Future value,当future完成时可以同步获取到它对应的value

Future源码解读

abstract class Future<T> { 
  //在当前函数栈中同步创建一个带有数据的Future,访问时会被提交到microTaskQueue中
  _Future.value(T value) : this.zoneValue(value, Zone.current);
 
  //计算Future执行的命令,如果有嵌套则会异步全部执行完毕
  factory Future(FutureOr<T> computation()) {
    Timer.run(() { //通过Timer插入到EventLoop的EventQueue中,在CPU分配的时间片内执行本次任务
        result._complete(computation());//串联所有的Future,之后计算出最后的结果 

  //直接将计算任务添加到当前Isolate的microTaskQueue中
  factory Future.microtask(FutureOr<T> computation()) {
    scheduleMicrotask(() {
        //在本次调度的时间片内,完成计算操作
        result._complete(computation());
   
  //同步执行,直接采用当前函数分配的时间片执行
  factory Future.sync(FutureOr<T> computation()) { ...
      var result = computation();  ...
  
  //提交任务到microTask完成和value绑定
  @pragma("vm:entry-point")
  factory Future.value([FutureOr<T> value]) {
    return new _Future<T>.immediate(value); 
  //提交一个错误的值到 microTaskQueue中
  factory Future.error(Object error, [StackTrace stackTrace]) { ...
    return new _Future<T>.immediateError(error, stackTrace); 
  
  //提交一延迟执行的timer source到 EventLoop的EventQueue,
  factory Future.delayed(Duration duration, [FutureOr<T> computation()]) { ...
    new Timer(duration, () { 

  //合并多个Future事件,任何一个错误会导致整个list future抛出异常,通过指定cleanUp可以挽回部分成功的数据
  static Future<List<T>> wait<T>(Iterable<Future<T>> futures,
      {bool eagerError: false, void cleanUp(T successValue)}) { ...
    handleError(Object theError, StackTrace theStackTrace) {  
                cleanUp(value); ...
      //遍历合并数据
      for (var future in futures) {
        int pos = remaining;
        future.then((T value) { ...
              result._completeWithValue(values);
  

  /**
   * Returns the result of the first future in [futures] to complete.
   *
   * The returned future is completed with the result of the first
   * future in [futures] to report that it is complete,
   * whether it's with a value or an error.
   * The results of all the other futures are discarded.
   *
   * If [futures] is empty, or if none of its futures complete,
   * the returned future never completes.
   */
   //返回第一个future的执行结果
  static Future<T> any<T>(Iterable<Future<T>> futures) {
    var completer = new Completer<T>.sync();
    var onValue = (T value) {
      if (!completer.isCompleted) completer.complete(value);
    };
    var onError = (error, StackTrace stack) {
      if (!completer.isCompleted) completer.completeError(error, stack);
    };
    for (var future in futures) {
      future.then(onValue, onError: onError);
    }
    return completer.future;
  }
 
  //遍历执行 `elements` ,可以支持同步和异步
  static Future forEach<T>(Iterable<T> elements, FutureOr action(T element)) { ...
  
  //配合forEach对elements进行遍历,并执行
  static Future doWhile(FutureOr<bool> action()) {
    _Future doneSignal = new _Future();
    void Function(bool) nextIteration;
    nextIteration = Zone.current.bindUnaryCallbackGuarded((bool keepGoing) {
      while (keepGoing) {
        FutureOr<bool> result;
        try {
          result = action(); //具体实现暴露给开发者,对数组类的元素进行转换 

  //callbak注册 订阅成功/失败/完成的几件
  Future<R> then<R>(FutureOr<R> onValue(T value), {Function onError});
  Future<T> catchError(Function onError, {bool test(Object error)});
  Future<T> whenComplete(FutureOr action());

  //转化为流事件
  Stream<T> asStream();

  //注册timeout事件,通常用于对突发异常进行retry操作
  Future<T> timeout(Duration timeLimit, {FutureOr<T> onTimeout()});
}

_Future

它是对 Future 的具体实现, 对future输出的控制

class _Future<T> implements Future<T> { ...
  //状态标志位,代表了当前 `_future` 执行的状态
  static const int _stateIncomplete = 0;
  static const int _statePendingComplete = 1;
  static const int _stateChained = 2;
  static const int _stateValue = 4;
  static const int _stateError = 8;

  //初始化当前future的状态,当有订阅时开始变更
  int _state = _stateIncomplete;
  //当前Future执行的zone,用于提交then的microTask事件,绑定success和error事件
  final Zone _zone; 

  //存储当前的result或listeners,中间存储值
  @pragma("vm:entry-point")
  var _resultOrListeners;

  
  static List<Function> _continuationFunctions(_Future<Object> future) {
    List<Function> result = null;
    while (true) {
      //当pendding的时候才能添加future的listener
      if (future._mayAddListener) return result; ...
      //遍历执行listenr的绑定操作,Future每次listener操作完成之后返回一个新的Future用于串联future
        (result ??= <Function>[]).add(listener.handleValue);
        future = listener.result; 
  

  Future<R> then<R>(FutureOr<R> f(T value), {Function onError}) { ...
     //绑定成功的订阅事件到CurrentZone
      f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f); 
      //选择性绑定onError事件到CurrentZone
      onError = _registerErrorHandler(onError, currentZone); 
    }
    //添加listener,每个listen持有了 then注册的回掉函数,并且在执行完成之后会返回一个新的Future,这也是为什么then能够连续串联好多个,由于 `f` 绑定的是同一个zone,所以他们执行在同一个zone下
    _Future<R> result = new _Future<R>();
    _addListener(new _FutureListener<T, R>.then(result, f, onError)); ...

  /// Registers a system created result and error continuation.
  ///
  /// Used by the implementation of `await` to listen to a future.
  /// The system created liseners are not registered in the zone,
  /// and the listener is marked as being from an `await` .
  /// This marker is used in [_continuationFunctions].
  //注册处一个系统回调事件,返回result由 `await` 关键字决定, `await future` 就相当于执行这个方法
  Future<E> _thenAwait<E>(FutureOr<E> f(T value), Function onError) {
    _Future<E> result = new _Future<E>();
    _addListener(new _FutureListener<T, E>.thenAwait(result, f, onError));
    return result;
  }

  Future<T> catchError(Function onError, {bool test(error)}) {  
  Future<T> whenComplete(dynamic action()) { 
  
  //转换成一个future对象
  Stream<T> asStream() => new Stream<T>.fromFuture(this);
  
  //添加listener对象
  void _addListener(_FutureListener listener) {...
      _zone.scheduleMicrotask(() {  ... 

   //反转链表,确保先添加的先执行FIFO
  _FutureListener _reverseListeners(_FutureListener listeners) { ...

  //传递future用
  static void _chainForeignFuture(Future source, _Future target) { ...
  static void _chainCoreFuture(_Future source, _Future target) { ...
  
  void _complete(FutureOr<T> value) { 
  /**
   * Propagates the value/error of [source] to its [listeners], executing the
   * listeners' callbacks.
   */
   //在future完成时遍历listener发送事件给订阅者
  static void _propagateToListeners(_Future source, _FutureListener listeners) {
    while (true) { ...
      assert(source._isComplete);  ...
      while (listeners._nextListener != null) { ...
        void handleValueCallback() { ...
            listenerValueOrError = listener.handleValue(sourceResult);
        void handleError() {
              listenerValueOrError = listener.handleError(asyncError);
  
  //注册timeout订阅事件
  Future<T> timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) { ...
      timer = new Timer(timeLimit, () { ...
          result._complete(zone.run(onTimeout)); ..
          result._completeError(e, s); 
    //自动实现了它的链式订阅
    this.then((T v) { ...
        result._completeWithValue(v);
        result._completeError(e, s); 

SynchronousFuture

从名字上看, 它是一个同步执行的future

class SynchronousFuture<T> implements Future<T> { ...
  @override
  Future<T> catchError(Function onError, { bool test(Object error) }) => Completer<T>().future;
  
  //f函数执行执行,没有绑定zone的复杂流程,Future.sync
  @override
  Future<E> then<E>(FutureOr<E> f(T value), { Function onError }) {...
    final dynamic result = f(_value);   
  @override
  Future<T> whenComplete(FutureOr<dynamic> action()) { ...
}

DelegatingFuture

代理Future, ResultFuture继承于它, 提供了 Future直接访问reuslt的能力, 具体实现在 Result 类中

class DelegatingFuture<T> implements Future<T> { ...
  DelegatingFuture(this._future);
  @override
  Stream<T> asStream() => _future.asStream(); ...
  @override
  Future<T> catchError(Function onError, {bool Function(Object error) test}) =>  ...
  @override
  Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function onError}) => ...
  @override
  Future<T> whenComplete(FutureOr Function() action) => ...
  @override
  Future<T> timeout(Duration timeLimit, {FutureOr<T> Function() onTimeout})  ...

class ResultFuture<T> extends DelegatingFuture<T> { 
  bool get isComplete => result != null; 
  Result<T> get result => _result;
  Result<T> _result;

  ResultFuture(Future<T> future) : super(future) {
    Result.capture(future).then((result) {
      _result = result;
    });
  }
}

abstract class Result<T> { ....
  final T value; ...
  static Future<Result<T>> capture<T>(Future<T> future) {
    return future.then((value) => ValueResult(value),
        onError: (error, StackTrace stackTrace) =>
            ErrorResult(error, stackTrace));
  }

Completer

用来包装future, 避免then嵌套, 优化代码结构

Completer (dart.async)
    _Completer (dart.async)
        _AsyncCompleter (dart.async)
        _SyncCompleter (dart.async)

与FutureBuilder的结合使用

  • FutureBuilder用于实现一个异步构建的widget, 根据当前异步事件执行的状态显示不同的widget, 在实际工作中会经常用到, 它的本质就是将 需要传递的数据分成不同的的阶段获取, 然后再拿到每个阶段的数据后调用setState去触发Widget的build, 实际的widget封装在它的构造然后中,

  • 下面是一个简易的FutureBuider, 只是为了理解其中的运作流程, 具体细节暂时忽略

//提供构建的四个阶段的数据,根据实际需要还可以定义得更多。
enum FutureStatus {
  initial,
  done,
  error,
  pending,
}
//包装状态和数据,用于构建每个阶段的widget
class FutureSnapshot {
  dynamic data;
  FutureStatus status;
  FutureSnapshot({this.data, this.status});
}

typedef WidgetBuilder = Widget Function(BuildContext, FutureSnapshot);

class CustomFutureBuilder extends StatefulWidget {
  final Future future;
  final WidgetBuilder builder;
  CustomFutureBuilder({this.future, this.builder});
  @override
  CustomFutureBuilderState createState() => CustomFutureBuilderState();
}

class CustomFutureBuilderState extends State<CustomFutureBuilder> {
  FutureSnapshot snapshot;

  @override
  Widget build(BuildContext context) => widget.builder(context, snapshot);
  @override
  void initState() {
    super.initState();
    snapshot = FutureSnapshot(status: FutureStatus.initial, data: null); //初始化阶段,
    subscribe();
  }

  void subscribe() {
    widget.future.then((value) {
      setState(() {
        snapshot = FutureSnapshot(status: FutureStatus.initial, data: value); //数据加载成功阶段
      });
    }, onError: (error, stackTrace) {
      setState(() {
        snapshot = FutureSnapshot(
            status: FutureStatus.initial,
            data: FlutterErrorDetails(exception: error, stack: stackTrace)); //数据加载失败阶段
      });
    });
    snapshot = FutureSnapshot(
        status: FutureStatus.initial, data: FlutterErrorDetails()); //等待阶段
  }
}

class CustomFutureBuilderDemo extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return CustomFutureBuilder(     //具体使用.
        future: Future.value('xxxxx'),
        builder: (contxt, snapShot) {
          switch (snapShot.status) {
            case FutureStatus.error:
              return Text('error');
            case FutureStatus.initial:
              return Text('initial');
            case FutureStatus.done:
              return Text('done');
            case FutureStatus.pending:
              return CircularProgressIndicator();
          }
          return Container();
        });
  }
}

上面的例子主要介绍了FutureBuilder的一些基本功能, 在实际的使用中, 还需要考虑到widget的刷新, future的变化, 如何去解除订阅, 重新初始化构建树, 下面是标准的futurebuilder实现。

class _FutureBuilderState<T> extends State<FutureBuilder<T>> {
 
  Object _activeCallbackIdentity; //增加了Callback标志符,来控制future的同步
  AsyncSnapshot<T> _snapshot; //同上面的FutureSnapShot一样,包装状态和值

  @override
  void initState() {
    super.initState();
    _snapshot = AsyncSnapshot<T>.withData(ConnectionState.none, widget.initialData); //初始化阶段状态和值
    _subscribe();  //订阅Future
  }

  @override
  void didUpdateWidget(FutureBuilder<T> oldWidget) {
    super.didUpdateWidget(oldWidget);
    if (oldWidget.future != widget.future) {  //视图更新后对future解绑操作,重新订阅新的future
      if (_activeCallbackIdentity != null) {
        _unsubscribe();
        _snapshot = _snapshot.inState(ConnectionState.none); //重制阶段状态和值
      }
      _subscribe();
    }
  }

  @override
  Widget build(BuildContext context) => widget.builder(context, _snapshot);

  @override
  void dispose() {
    _unsubscribe();
    super.dispose();
  }

  void _subscribe() {
    if (widget.future != null) {
      final Object callbackIdentity = Object();
      _activeCallbackIdentity = callbackIdentity;
      widget.future.then<void>((T data) {
        if (_activeCallbackIdentity == callbackIdentity) {
          setState(() {
            _snapshot = AsyncSnapshot<T>.withData(ConnectionState.done, data); //完成阶段状态值
          });
        }
      }, onError: (Object error) {
        if (_activeCallbackIdentity == callbackIdentity) {
          setState(() {
            _snapshot = AsyncSnapshot<T>.withError(ConnectionState.done, error); //错误阶段状态值
          });
        }
      });
      _snapshot = _snapshot.inState(ConnectionState.waiting); //等待阶段状态值
    }
  }

  void _unsubscribe() {
    _activeCallbackIdentity = null;  //控制future和build同步的标志位,类似信号量(不过flutter ui线程为单线程)避免future执行时widget已经被移除了.
  }
}

StreamBuilder与Future应用

和Future的设计方式基本一致, 只不过他的状态多一点点, 数据源是持续的,不想future的数据是单次的,StreamBuilder继承了 _StreamBuilderBase , _StreamBuilderBase 用来管理stream流的订阅和更新, StreamBuilder 则更多的是用来实现暴露给用户的接口,状态值的变化, Widget的builder实现.

相比较FutureBuilder主要差异主要在这里,状态值多一些.

class _StreamBuilderBaseState<T, S> extends State<StreamBuilderBase<T, S>> ...
void _subscribe() {
    if (widget.stream != null) {
      _subscription = widget.stream.listen((T data) {
        setState(() {
          _summary = widget.afterData(_summary, data);
        });
      }, onError: (Object error) {
        setState(() {
          _summary = widget.afterError(_summary, error);
        });
      }, onDone: () {
        setState(() {
          _summary = widget.afterDone(_summary);
        });
      });
      _summary = widget.afterConnected(_summary);
    }
  }

通常而言, 处理简单的网络回调请求一般用 FutureBuider 就能满组需求了,如果是涉及到持续的数据订阅如蓝牙数据, 定位信息等, 可以采用StreamBuilder. 在使用过程中需要特别注意 异步Future/Stream对象变更后, 他所构建的wiget的作用域问题. 因为widget切换会导致他们所依赖的 InheritedWidget 被移除, 如果是项目中使用了Bloc应特别注意。

小结

Future是为适配Isolate中的 microTask和eventTask2而涉及, 包含了常规的数据订阅,输入输出, 异常捕捉, 多个异步数据合并, 通过异步数据顺序执行, 并且提供了便利的Completer对future的使用进行解耦操作, 基于Future的基本功能, 同时也为Stream流和RxData的封装奠定了基石。