跳到主要內容

在 Dart 中建立流

作者:Lasse Nielsen
2013 年 4 月 (2021 年 5 月更新)

dart:async 庫包含兩種對許多 Dart API 很重要的型別:StreamFuture。 Future 表示單個計算的結果,而 stream 則是結果的序列。你可以監聽流來獲取結果(資料和錯誤)以及流關閉的通知。在流完成之前,你也可以在監聽時暫停或停止監聽。

但本文不是關於使用流的,而是關於建立自己的流。你可以通過幾種方式建立流:

  • 轉換現有流。
  • 使用 async* 函式從頭建立流。
  • 使用 StreamController 建立流。

本文展示了每種方法的程式碼,並提供了提示,幫助你正確實現流。

關於使用流的幫助,請參見非同步程式設計:流

轉換現有流

#

建立流的常見情況是你已經有一個流,並希望基於原始流的事件建立一個新流。例如,你可能有一個位元組流,希望透過 UTF-8 解碼輸入將其轉換為字串流。最通用的方法是建立一個新流,該新流等待原始流上的事件,然後輸出新事件。示例:

dart
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

對於許多常見的轉換,你可以使用 Stream 提供的轉換方法,例如 map()where()expand()take()

例如,假設你有一個流 counterStream,它每秒發出一個遞增的計數器。它的實現可能如下:

dart
var counterStream = Stream<int>.periodic(
  const Duration(seconds: 1),
  (x) => x,
).take(15);

要快速檢視事件,可以使用如下程式碼:

dart
counterStream.forEach(print); // Print an integer every second, 15 times.

要轉換流事件,可以在監聽流之前在其上呼叫轉換方法,例如 map()。該方法返回一個新流。

dart
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

除了 map(),你還可以使用任何其他轉換方法,例如以下方法:

dart
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

通常,一個轉換方法就足夠了。但是,如果你需要對轉換進行更多控制,可以使用 Streamtransform() 方法指定一個 StreamTransformer。平臺庫為許多常見任務提供了流轉換器。例如,以下程式碼使用 dart:convert 庫提供的 utf8.decoderLineSplitter 轉換器。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();

從頭建立流

#

建立新流的一種方法是使用非同步生成器(async*)函式。流在函式呼叫時建立,函式體在流被監聽時開始執行。函式返回時,流關閉。在函式返回之前,它可以使用 yieldyield* 語句在流上發出事件。

這是一個以固定間隔發出數字的簡單示例:

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函式返回一個 Stream。當該流被監聽時,函式體開始執行。它會重複延遲指定的間隔,然後 yield 下一個數字。如果省略 maxCount 引數,迴圈就沒有停止條件,流將永遠輸出遞增的數字 - 或者直到監聽器取消其訂閱。

當監聽器取消(透過在 listen() 方法返回的 StreamSubscription 物件上呼叫 cancel())時,下一次函式體到達 yield 語句時,yield 將充當 return 語句。任何包含的 finally 塊都將執行,函式退出。如果函式在退出前嘗試 yield 一個值,那將失敗並充當 return。

當函式最終退出時,cancel() 方法返回的 Future 完成。如果函式因錯誤退出,則 Future 以該錯誤完成;否則,它以 null 完成。

另一個更有用的示例是將 Future 序列轉換為流的函式:

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

此函式向 futures 可迭代物件請求一個新的 Future,等待該 Future 完成,發出結果值,然後迴圈。如果某個 Future 完成時出現錯誤,則流也會以該錯誤完成。

async* 函式從零開始構建流的情況很少見。它需要從某個地方獲取資料,而這個地方通常是另一個流。在某些情況下,例如上面 Future 的序列,資料來自其他非同步事件源。然而,在許多情況下,async* 函式過於簡單,難以輕鬆處理多個數據源。這就是 StreamController 類發揮作用的地方。

使用 StreamController

#

如果你的流事件來自程式的各個不同部分,而不僅僅是來自可以透過 async 函式遍歷的流或 Future,那麼可以使用 StreamController 來建立和填充流。

StreamController 為你提供了一個新的流,以及一種在任何時間、從任何地方向流新增事件的方法。該流具備處理監聽器和暫停所需的所有邏輯。你返回流,並將控制器保留給自己。

以下示例(來自 stream_controller_bad.dart)展示了 StreamController 的基本(但有缺陷)用法,用於實現先前示例中的 timedCounter() 函式。此程式碼建立要返回的流,然後根據計時器事件向其提供資料,這些事件既不是 Future 也不是流事件。

有缺陷的示例dart
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

和之前一樣,你可以像這樣使用 timedCounter() 返回的流:

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

timedCounter() 的這個實現有兩個問題:

  • 在有訂閱者之前就開始產生事件。
  • 即使訂閱者請求暫停,它也繼續產生事件。

正如接下來的部分所示,你可以在建立 StreamController 時指定 onListenonPause 等回撥來解決這兩個問題。

等待訂閱

#

通常,流在開始工作之前應等待訂閱者。async* 函式會自動執行此操作,但使用 StreamController 時,你可以完全控制,即使不應該新增事件時也可以新增。當流沒有訂閱者時,其 StreamController 會緩衝事件,如果流從未獲得訂閱者,這可能導致記憶體洩漏。

嘗試將使用流的程式碼更改為以下內容:

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

當這段程式碼執行時,前 5 秒沒有輸出任何內容,儘管流正在工作。然後添加了監聽器,由於事件被 StreamController 緩衝,前 5 個左右的事件會一次性打印出來。

要在訂閱時收到通知,請在建立 StreamController 時指定 onListen 引數。onListen 回撥在流獲得第一個訂閱者時呼叫。如果指定 onCancel 回撥,則在控制器失去最後一個訂閱者時呼叫。在前面的示例中,Timer.periodic() 應該移至 onListen 處理程式中,如下一節所示。

遵守暫停狀態

#

當監聽器請求暫停時,避免產生事件。當流訂閱暫停時,async* 函式會在 yield 語句處自動暫停。另一方面,StreamController 在暫停期間會緩衝事件。如果提供事件的程式碼不遵守暫停,緩衝區的大小可能會無限增長。此外,如果監聽器在暫停後不久停止監聽,那麼用於建立緩衝區的工作就被浪費了。

要檢視沒有暫停支援時會發生什麼,請嘗試將使用流的程式碼更改為以下內容:

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

當五秒暫停結束後,在此期間觸發的事件會一次性全部接收到。這是因為流的源不遵守暫停,並持續向流新增事件。因此流會緩衝事件,並在流恢復非暫停狀態時清空其緩衝區。

以下版本的 timedCounter()(來自 stream_controller.dart)透過使用 StreamController 上的 onListenonPauseonResumeonCancel 回撥來實現暫停。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
    onListen: startTimer,
    onPause: stopTimer,
    onResume: startTimer,
    onCancel: stopTimer,
  );

  return controller.stream;
}

使用上面提供的 listenWithPause() 函式執行這段程式碼。你會看到它在暫停時停止計數,並在之後順利恢復。

你必須使用所有監聽器——onListenonCancelonPauseonResume——才能收到暫停狀態變化的通知。原因是如果訂閱和暫停狀態同時改變,只會呼叫 onListenonCancel 回撥。

最後提示

#

在不使用 async* 函式建立流時,請記住以下提示:

  • 使用同步控制器時要小心——例如,使用 StreamController(sync: true) 建立的控制器。當你在未暫停的同步控制器上傳送事件時(例如,使用 EventSink 定義的 add()addError()close() 方法),事件會立即傳送給流上的所有監聽器。Stream 監聽器絕不應在新增監聽器的程式碼完全返回之前被呼叫,在錯誤的時間使用同步控制器可能會違反此約定並導致正常程式碼失敗。避免使用同步控制器。

  • 如果你使用 StreamControlleronListen 回撥會在 listen 呼叫返回 StreamSubscription 之前被呼叫。不要讓 onListen 回撥依賴於訂閱已經存在。例如,在以下程式碼中,onListen 事件會在 subscription 變數具有有效值之前觸發(並呼叫 handler)。

    dart
    subscription = stream.listen(handler);
  • StreamController 定義的 onListenonPauseonResumeonCancel 回撥在流的監聽器狀態改變時由流呼叫,但絕不會在事件觸發期間或呼叫另一個狀態改變處理程式期間呼叫。在這些情況下,狀態改變回調會延遲到先前的回撥完成之後。

  • 不要嘗試自己實現 Stream 介面。很容易在事件、回撥以及新增和移除監聽器之間的互動上出現細微的錯誤。始終使用現有流(可能是來自 StreamController 的流)來實現新流的 listen 呼叫。

  • 儘管可以透過擴充套件 Stream 類並實現 listen 方法及額外的功能來建立具有更多功能的類,但通常不推薦這樣做,因為它引入了一個使用者必須考慮的新型別。與其建立一個 Stream(及更多)的類,你通常可以建立一個擁有 Stream(及更多)的類。