Flutter中stream学习

发布于:2025-03-12 ⋅ 阅读:(14) ⋅ 点赞:(0)

概述

Stream 主要应用于 Flutter 的异步操作,在其他编程语言中也存在;Stream 提供了一种接受事件队列的方法,可通过 listen 进行数据监听,通过 error 接收失败状态,通过 done 来接收结束状态;

Stream的基础概念

  • Stream:表示一个可以接收异步事件的数据源。可以生成一个或多个值。
  • StreamController:控制Stream,可以向其添加事件、错误以及关闭它。
  • StreamSubscription:表示对Stream的监听,可以用来取消订阅。
  • Sink:用来向Stream添加数据、错误、以及关闭。

stream的常用方法

Stream.fromFuture(Future future)

Stream通过Future对象创建新的单订阅流, 当Future对象完成时会触发 data / error, 然后已done事件结束

Future<String> getDate() async {
    await Future.delayed(const Duration(seconds: 3));

    return "当前时间为${DateTime.now()}";
  }

  void testStreamFromFuture() {
    Stream.fromFuture(getDate()).listen((event) {
      print("testStreamFromFuture============$event");
    }).onDone(() {
      print("testStreamFromFuture==========done 结束");
    });
  }

输出结果:
在这里插入图片描述

Stream.fromFutures(Iterable<Future> futures)

Stream 通过一系列的 Future 创建新的单订阅流,每个 Future 都会有自身的 data / error 事件, 当这一系列的 Future 均完成时,Stream 以 done 事件结束;若 Futures 为空,则 Stream 会立刻关闭;其分析源码,很直接的看到是将每一个 Future 事件监听完之后才会执行的微事件结束;

源码代码:

factory Stream.fromFutures(Iterable<Future<T>> futures) {
    _StreamController<T> controller =
        new _SyncStreamController<T>(null, null, null, null);
    int count = 0;
    // Declare these as variables holding closures instead of as
    // function declarations.
    // This avoids creating a new closure from the functions for each future.
    void onValue(T value) {
      if (!controller.isClosed) {
        controller._add(value);
        if (--count == 0) controller._closeUnchecked();
      }
    }

    void onError(Object error, StackTrace stack) {
      if (!controller.isClosed) {
        controller._addError(error, stack);
        if (--count == 0) controller._closeUnchecked();
      }
    }

    // The futures are already running, so start listening to them immediately
    // (instead of waiting for the stream to be listened on).
    // If we wait, we might not catch errors in the futures in time.
    for (var future in futures) {
      count++;
      future.then(onValue, onError: onError);
    }
    // Use schedule microtask since controller is sync.
    if (count == 0) scheduleMicrotask(controller.close);
    return controller.stream;
  }

示例代码:

var datas = [getDate(), getDate(), getDate()];
    Stream.fromFutures(datas).listen((event) {
      print("testStreamFromFutures============$event");
    }).onDone(() {
      print("testStreamFromFutures==========done 结束");
    });

输出结果:
在这里插入图片描述

Stream.fromIterable(Iterable elements)

Stream 通过数据集合中获取并创建单订阅流,通过 listen 监听迭代器中每一个子 element,当 Stream 监听到取消订阅或 Iterator.moveNext 返回 false / throw 异常 时停止迭代;

void testStreamFromIterable() {
    var datas = [1, 2, "5.toStroing", false, 9];
    Stream.fromIterable(datas).listen((event) {
      print("testStreamFromIterable============$event");
    }).onDone(() {
      print("testStreamFromIterable==========done 结束");
    });
  }

输出结果:
在这里插入图片描述

Stream.periodic(Duration period, [T computation(int computationCount)?])

Stream 通过 Duration 对象作为参数创建一个周期性事件流,其中若不设置 computation 时 onData 获取数据为 null;若没有事件结束则会一直周期性执行; 因为 computation 函数是返回流的结果

void testStreamPeriodic() {
    Stream.periodic(const Duration(seconds: 1)).listen((event) {
      print("testStreamPeriodic===没有computation==================$event");
    });
    Stream.periodic(const Duration(seconds: 1), (x) => x).listen((event) {
      print("testStreamPeriodic---- listen========$event");
    }).onDone(() {
      print("testStreamPeriodic==========done 结束");
    });
  }

输出结果:
在这里插入图片描述

Stream take(int count)

take() 对于单订阅方式,可以提供 take 设置之前的 Stream 订阅数据,例如设置中断 Stream.periodic 周期展示次数;小菜粗略理解为 take 可以作为中断订阅, 如果 take 设置次数大于 onDone 之前的订阅数据次数,Stream 依旧获取所有 onDone 之前的订阅数据

void testStreamTake() {
    Stream.periodic(const Duration(seconds: 1), (x) => x)
        .take(5) // 如果不设置这个, 这个流将一直会执行, 但是设置之后只会执行设置的数的次数
        .listen((event) {
      print("testStreamTake===========$event");
    }).onDone(() {
      print("testStreamTake==============done 结束");
    });
  }

输出结果:
在这里插入图片描述

Stream takeWhile(bool test(T element))

takeWhile 也可以实现上述take方法相同效果, 返回一个 boolean 类型,如果为 false 则中断订阅

void testStreamTakeWhile() {
    Duration interval = const Duration(seconds: 1);
    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data);
    streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element < 5;
    }).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });
  }

输出结果:
在这里插入图片描述

Stream where(bool test(T event))

where 用于在当前 Stream 中创建一个新的 Stream 用来丢弃不符合 test 的数据;简单理解为类似数据库查询一样,仅过滤符合需求的数据流;且 where 可以设置多次

void testStreamWhere() {
    Stream.periodic(const Duration(seconds: 1), (data) => data)
        .takeWhile((element) => element <= 5)
        .where((event) {
      print('Stream.periodic.where -> $event');
      return event > 3;
    }).listen((event) {
      print("testStreamWhere==================$event");
    }).onDone(() {
      print("testStreamWhere===================== done 结束");
    });
  }

输出结果:
在这里插入图片描述

Stream distinct([bool equals(T previous, T next)])

作用:相邻的两个数据去重哈

void testStreamDistinct() {
    var datas = [1, 2, '3.toString()', true, true, false, true, 6];
    Stream.fromIterable(datas).distinct().listen((event) {
      print("testStreamDistinct===========================$event");
    }).onDone(() {
      print('testStreamDistinct============================ done 结束');
    });
  }

输出结果:
在这里插入图片描述

Stream skip(int count)

作用: skip 用于跳过符合条件的订阅数据次数 count: 跳过的次数;

void testStreamSkip() {
    Stream<int> streamData =
        Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);
    streamData
        .takeWhile((element) {
          print('Stream.periodic.takeWhile -> $element');
          return element <= 6;
        })
        .where((event) {
          print('Stream.periodic.where -> $event');
          return event > 2;
        })
        .skip(2)
        .listen((event) {
          print('Stream.periodic -> $event');
        })
        .onDone(() {
          print('Stream.periodic -> done 结束');
        });
  }

输出结果 :
在这里插入图片描述

Stream skipWhile(bool test(T element))

skipWhile 用于跳过在 where 符合条件下满足设置 条件的订阅数据;即当 返回 为 true 时跳过当前订阅数据监听;

void testSkipWhile() {
    Stream.periodic(const Duration(seconds: 1), (data) => data)
        .takeWhile((element) => element < 5)
        .skipWhile((element) => element < 3)
        .listen((event) {
      print("testSkipWhile=========$event");
    }).onDone(() {
      print("testSkipWhile========done 结束");
    });
  }

输出 结果:
在这里插入图片描述

Stream map(S convert(T event))

在当前 Stream 基础上创建一个新的 Stream 并对当前 Stream 进行数据操作,onData 监听到的是 map 变更后的新的数据流;

void testStreamMap() {
    // 创还能一个stream刘
    Stream<int> streamData =
        Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);
    streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element < 5;
    }).map((event) {
      print('Stream.periodic.map -> $event -> ${event * 100}');
      return event * 100;
    }).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });
  }

输出结果:
在这里插入图片描述

Stream expand(Iterable convert(T element))

在当前 Stream 基础上创建新的 Stream 并将当前订阅数据转为新的订阅数据组,onData 监听 数据组 中每个新的订阅数据元素;

void testStreamExpand() {
    Stream<int> streamData =
        Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);
    streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element <= 6;
    }).expand((element) {
      print(
          'Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}');
      return [element, element * 10, element * 100];
    }).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });
  }

输出结果:
在这里插入图片描述

Stream的分类

单订阅

默认情况下Streams会被设置成单订阅,点订阅会保持当前的值,直到有其它的订阅。

单订阅Stream(Single-Subscription Stream)一次只能有一个监听器(listener),当我们对单订阅进行监听的时候,程序会被错。通常用于一次性事件

void testStreamController() {
    // 使用streamController创建一个stream
    final streamController = StreamController<int>();

    // 获取stream流
    final stream = streamController.stream;

    // 监听stream
    stream.listen((event) {
      print("testStreamController========================$event");
    });

    // stream.listen((event) {
    //   print("testStreamController=11111=======================$event");
    // });


    // 添加测试数据到stream
    streamController.sink.add(1);
    streamController.sink.add(2);
    streamController.sink.add(3);


    // 关闭stream流
    streamController.close();
  }

如果我打开上述注释掉的监听, 对一个单订阅的stream进行多次监听会报如下错误:
在这里插入图片描述

广播订阅

广播(Broadcast Stream)允许多个监听器,可以同时向多个订阅者推送数据。 这种类型适合用于事件广播,比如用户操作、全局数据推送等。

void testStreamBoardcast() {
    final streamController = StreamController<int>.broadcast();
    final stream = streamController.stream;

    stream.listen((event) {
      print("testStreamBoardcast==============$event");
    });

    stream.listen((event) {
      print("testStreamBoardcast111111111===================$event");
    });

    streamController.sink.add(1);
    streamController.sink.add(2);
    streamController.sink.add(3);



    streamController.close();
  }

输出结果:
在这里插入图片描述

除此之外Flutter官方还提供了StreamBuilder这种专门用于监听Stream并根据数据变化更新UI的Widget。具体用法可以参考官方文档。