在 Dart 中建立串流
作者:Lasse Nielsen
2013 年 4 月(2021 年 5 月更新)
dart:async 函式庫包含兩種對許多 Dart API 來說很重要的類型:串流和未來值。未來值表示單一運算的結果,而串流則是結果的序列。您會在串流中監聽以接收結果(資料和錯誤)以及串流關閉的通知。您也可以在串流完成之前暫停監聽或停止監聽串流。
但本文並非在探討使用串流。而是探討如何建立您自己的串流。您可以透過幾種方式建立串流
- 轉換現有的串流。
- 使用
async*函式從頭開始建立串流。 - 使用
StreamController建立串流。
本文會顯示每種方法的程式碼,並提供提示以協助您正確實作串流。
如需有關如何使用串流的說明,請參閱非同步程式設計:串流。
轉換現有串流
#建立串流的常見情況是您已經有一個串流,而您想要根據原始串流的事件建立一個新的串流。例如,您可能有一個位元組串流,而您想要透過 UTF-8 解碼輸入將其轉換成字串串流。最通用的方法是建立一個新的串流,等待原始串流上的事件,然後輸出新的事件。範例
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
// Stores any partial line from the previous chunk.
var partial = '';
// Wait until a new chunk is available, then process it.
await for (final chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // Prepend partial line.
partial = lines.removeLast(); // Remove new partial line.
for (final line in lines) {
yield line; // Add lines to output stream.
}
}
// Add final partial line to output stream, if any.
if (partial.isNotEmpty) yield partial;
}對於許多常見的轉換,您可以使用 Stream 提供的轉換方法,例如 map()、where()、expand() 和 take()。
例如,假設您有一個串流 counterStream,每秒發射一個遞增的計數器。以下是它的實作方式
var counterStream =
Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);若要快速查看事件,您可以使用類似這樣的程式碼
counterStream.forEach(print); // Print an integer every second, 15 times.若要轉換串流事件,您可以在監聽串流之前呼叫串流上的轉換方法,例如 map()。此方法會傳回一個新的串流。
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);除了 map() 之外,您還可以呼叫任何其他轉換方法,例如以下方法
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.通常,轉換方法就足以滿足您的需求。不過,如果您需要對轉換有更進一步的控制,您可以使用 Stream 的 transform() 方法指定一個串流轉換器。平台函式庫提供許多常見任務的串流轉換器。例如,以下程式碼使用 dart:convert 函式庫提供的 utf8.decoder 和 LineSplitter 轉換器。
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
.transform(utf8.decoder)
.transform(const LineSplitter())
.toList();從頭開始建立串流
#建立新串流的一種方式是使用非同步產生器 (async*) 函式。當函式被呼叫時會建立串流,而當串流被監聽時,函式的本體會開始執行。當函式傳回時,串流會關閉。在函式傳回之前,它可以使用 yield 或 yield* 陳述式在串流上發出事件。
以下是定期發出數字的原始範例
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}此函式傳回一個 Stream。當該串流被監聽時,本體會開始執行。它會重複延遲所要求的間隔,然後產生下一個數字。如果省略 maxCount 參數,則迴圈沒有停止條件,因此串流會永遠輸出越來越大的數字,或直到監聽器取消其訂閱。
當監聽器取消 (在 listen() 方法傳回的 StreamSubscription 物件上呼叫 cancel()) 時,下次本體到達 yield 陳述式時,yield 會作為 return 陳述式。任何封閉的 finally 區塊會被執行,函式會結束。如果函式在結束前嘗試產生一個值,則會失敗並作為傳回。
當函式最終結束時,cancel() 方法傳回的未來會完成。如果函式以錯誤結束,未來會以該錯誤完成;否則,它會以 null 完成。
另一個更有用的範例是將未來序列轉換為串流的函式
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
for (final future in futures) {
var result = await future;
yield result;
}
}此函式向 futures 可迭代物件要求一個新的未來,等待該未來,發出結果值,然後迴圈。如果未來以錯誤完成,則串流會以該錯誤完成。
很少有 async* 函式從無中建立串流。它需要從某個地方取得資料,而大多數情況下,那個地方是另一個串流。在某些情況下,例如上述的未來序列,資料來自其他非同步事件來源。然而,在許多情況下,async* 函式過於簡化,無法輕鬆處理多個資料來源。這就是 StreamController 類別發揮作用的地方。
使用 StreamController
#如果串流的事件來自程式不同部分,而不仅仅來自 async 函式可以遍歷的串流或未來,請使用 StreamController 建立並填入串流。
StreamController 提供一個新串流和一種方式,可以在任何時間點和從任何地方向串流新增事件。串流具有處理監聽器和暫停所需的所有邏輯。您傳回串流並保留控制器。
以下範例(取自 stream_controller_bad.dart)展示了使用 StreamController 來實作前一個範例中的 timedCounter() 函式的基本用法,儘管有缺陷。此程式碼會建立一個串流來傳回,然後根據計時器事件(既不是 future 也不是串流事件)將資料傳入串流中。
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
return controller.stream;
}如同先前,您可以像這樣使用 timedCounter() 傳回的串流
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.此 timedCounter() 實作有幾個問題
- 它在有訂閱者之前就開始產生事件。
- 即使訂閱者要求暫停,它也會繼續產生事件。
如下一節所述,您可以在建立 StreamController 時指定 onListen 和 onPause 等回呼,來修正這兩個問題。
等待訂閱
#原則上,串流應在開始運作前等待訂閱者。async* 函式會自動執行此動作,但當使用 StreamController 時,您擁有完全的控制權,可以在不適當的時候新增事件。當串流沒有訂閱者時,其 StreamController 會緩衝事件,如果串流從未取得訂閱者,則可能會導致記憶體外洩。
嘗試將使用串流的程式碼變更為以下內容
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
// After 5 seconds, add a listener.
await for (final n in counterStream) {
print(n); // Print an integer every second, 15 times.
}
}當此程式碼執行時,儘管串流正在運作,但在前 5 秒內不會印出任何內容。然後會新增監聽器,並一次印出前 5 個左右的事件,因為它們已由 StreamController 緩衝。
若要接收訂閱通知,請在建立 StreamController 時指定 onListen 參數。當串流取得第一個訂閱者時,會呼叫 onListen 回呼。如果您指定 onCancel 回呼,則會在控制器失去最後一個訂閱者時呼叫它。在前一個範例中,Timer.periodic() 應移至 onListen 處理常式,如下一節所示。
尊重暫停狀態
#在監聽器要求暫停時,請避免產生事件。async* 函式會在串流訂閱暫停時自動在 yield 陳述式暫停。另一方面,StreamController 會在暫停期間緩衝事件。如果提供事件的程式碼不尊重暫停,則緩衝區的大小可能會無限增加。此外,如果監聽器在暫停後不久停止監聽,則建立緩衝區所花的功夫就白費了。
若要查看在沒有暫停支援的情況下會發生什麼情況,請嘗試將使用串流的程式碼變更為以下內容
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
late StreamSubscription<int> subscription;
subscription = counterStream.listen((int counter) {
print(counter); // Print an integer every second.
if (counter == 5) {
// After 5 ticks, pause for five seconds, then resume.
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}當五秒的暫停結束時,在這段時間內觸發的事件會一次全部收到。這是因為串流的來源不尊重暫停,並持續將事件新增到串流中。因此,串流會緩衝事件,然後在串流取消暫停時清空緩衝區。
timedCounter() 的下列版本(來自 stream_controller.dart)使用 StreamController 上的 onListen、onPause、onResume 和 onCancel 回呼來實作暫停。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
late StreamController<int> controller;
Timer? timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (counter == maxCount) {
timer?.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}使用上述的 listenWithPause() 函式執行這段程式碼。您會看到它在暫停期間停止計數,然後在之後順利恢復。
您必須使用所有監聽器(onListen、onCancel、onPause 和 onResume)才能收到暫停狀態變更的通知。原因是如果訂閱和暫停狀態同時變更,則只會呼叫 onListen 或 onCancel 回呼。
最後提示
#在不使用非同步*函式建立串流時,請記住以下提示
使用同步控制器時請小心,例如使用
StreamController(sync: true)建立的控制器。當您在取消暫停的同步控制器上傳送事件(例如使用 EventSink 定義的add()、addError()或close()方法)時,事件會立即傳送給串流上的所有監聽器。Stream監聽器絕不能在新增監聽器的程式碼完全傳回之前呼叫,而在錯誤的時間使用同步控制器可能會破壞此承諾,並導致良好的程式碼失敗。避免使用同步控制器。如果您使用
StreamController,則會在listen呼叫傳回StreamSubscription之前呼叫onListen回呼。不要讓onListen回呼依賴於已經存在的訂閱。例如,在以下程式碼中,onListen事件會在subscription變數具有有效值之前觸發(並呼叫handler)。dartsubscription = stream.listen(handler);當串流的監聽器狀態變更時,
StreamController定義的onListen、onPause、onResume和onCancel回呼會由串流呼叫,但絕不會在觸發事件期間或呼叫另一個狀態變更處理常式期間呼叫。在這些情況下,狀態變更回呼會延遲到前一個回呼完成為止。不要嘗試自己實作
Stream介面。事件、回呼、以及新增和移除監聽器之間的互動很容易出錯。請務必使用現有的串流,可能來自StreamController,來實作新串流的listen呼叫。雖然可以透過延伸
Stream類別並實作listen方法和額外的功能,來建立延伸Stream並具有更多功能的類別,但通常不建議這樣做,因為這會引入使用者必須考量的全新類型。與其建立一個 是Stream(以及更多)的類別,您通常可以建立一個 具有Stream(以及更多)的類別。