非同步程式設計: 流
Dart 中的非同步程式設計以 Future 和 Stream 類為特徵。
Future 表示不會立即完成的計算。普通函式返回結果,而非同步函式返回一個 Future,它最終會包含結果。Future 會告訴您結果何時準備就緒。
流是一系列非同步事件。它類似於非同步的 Iterable——您不是在請求時獲得下一個事件,而是流在事件準備就緒時通知您。
接收流事件
#流可以透過多種方式建立,這是另一篇文章的主題,但它們都可以透過相同的方式使用:非同步 for 迴圈(通常簡稱為 await for)迭代流中的事件,就像 for 迴圈迭代 Iterable 一樣。例如
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}此程式碼簡單地接收整數事件流中的每個事件,將它們相加,並返回總和(一個 future)。當迴圈體結束時,函式會暫停,直到下一個事件到來或流完成。
該函式使用 async 關鍵字標記,在使用 await for 迴圈時這是必需的。
以下示例透過使用 async* 函式生成一個簡單的整數流來測試前面的程式碼
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // 55
}
錯誤事件
#當流中沒有更多事件時,流就完成了,接收事件的程式碼會收到完成通知,就像收到新事件到來通知一樣。使用 await for 迴圈讀取事件時,當流完成時迴圈會停止。
在某些情況下,錯誤會在流完成之前發生;例如從遠端伺服器獲取檔案時網路失敗,或者建立事件的程式碼有 bug,需要有人知道。
流也可以像傳遞資料事件一樣傳遞錯誤事件。大多數流在第一個錯誤後會停止,但也可能存在傳遞多個錯誤的流,以及在錯誤事件後繼續傳遞更多資料的流。本文件只討論最多傳遞一個錯誤的流。
使用 await for 讀取流時,錯誤由迴圈語句丟擲。這也會結束迴圈。您可以使用 try-catch 捕獲錯誤。以下示例在迴圈迭代器等於 4 時丟擲錯誤
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
try {
await for (final value in stream) {
sum += value;
}
} catch (e) {
return -1;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i == 4) {
throw Exception('Intentional exception');
} else {
yield i;
}
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // -1
}
使用流
#Stream 類包含許多輔助方法,可以幫助您對流執行常見操作,類似於 Iterable 上的方法。例如,您可以使用 Stream API 中的 lastWhere() 方法查詢流中最後一個正整數。
Future<int> lastPositive(Stream<int> stream) =>
stream.lastWhere((x) => x >= 0);兩種流
#流有兩種型別。
單訂閱流
#最常見的流包含一系列事件,這些事件是更大整體的一部分。事件需要按正確順序傳遞,並且不能丟失任何一個。當您讀取檔案或接收 Web 請求時,就會得到這種流。
這種流只能監聽一次。稍後再次監聽可能意味著錯過了初始事件,然後流的其餘部分就沒有意義了。當您開始監聽時,資料將被獲取並以塊的形式提供。
廣播流
#另一種流適用於可以一次處理一個的獨立訊息。例如,這種流可用於瀏覽器中的滑鼠事件。
您可以隨時開始監聽這種流,並且在監聽期間會收到觸發的事件。多個監聽器可以同時監聽,並且在取消之前的訂閱後,您可以稍後再次監聽。
處理流的方法
#Stream<T> 上的以下方法處理流並返回結果
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();所有這些函式,除了 drain() 和 pipe(),都對應於 Iterable 上的類似函式。每個函式都可以透過使用帶有 await for 迴圈的 async 函式(或僅使用其他方法之一)輕鬆編寫。例如,一些實現可以是
Future<bool> contains(Object? needle) async {
await for (final event in this) {
if (event == needle) return true;
}
return false;
}
Future forEach(void Function(T element) action) async {
await for (final event in this) {
action(event);
}
}
Future<List<T>> toList() async {
final result = <T>[];
await forEach(result.add);
return result;
}
Future<String> join([String separator = '']) async =>
(await toList()).join(separator);(實際實現略微複雜,但主要是出於歷史原因。)
修改流的方法
#Stream 上的以下方法根據原始流返回一個新的流。每個方法都會等待新的流被監聽後,才開始監聽原始流。
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);前面的方法對應於 Iterable 上的類似方法,它們將一個可迭代物件轉換為另一個可迭代物件。所有這些都可以使用帶有 await for 迴圈的 async 函式輕鬆編寫。
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);asyncExpand() 和 asyncMap() 函式類似於 expand() 和 map(),但允許其函式引數是非同步函式。distinct() 函式在 Iterable 上不存在,但可以有。
Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(
Duration timeLimit, {
void Function(EventSink<T> sink)? onTimeout,
});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);最後三個函式更具特殊性。它們涉及 await for 迴圈無法直接處理的錯誤處理;遇到的第一個錯誤將終止迴圈及其流訂閱,並且沒有內建的恢復機制。
以下程式碼演示瞭如何使用 handleError() 在 await for 迴圈消費流之前從中過濾掉錯誤。
Stream<S> mapLogErrors<S, T>(
Stream<T> stream,
S Function(T event) convert,
) async* {
var streamWithoutErrors = stream.handleError((e) => log(e));
await for (final event in streamWithoutErrors) {
yield convert(event);
}
}在前面的示例中,如果流沒有發出任何事件,則永遠不會返回到 await for 迴圈。為了避免這種情況,請使用 timeout() 函式建立一個新流。timeout() 使您能夠設定時間限制並在返回的流上繼續發出事件。
以下程式碼修改了前面的示例。它添加了一個兩秒的超時,並且如果在兩秒或更長時間內沒有事件發生,則會產生相關的錯誤。
Stream<S> mapLogErrors<S, T>(
Stream<T> stream,
S Function(T event) convert,
) async* {
var streamWithoutErrors = stream.handleError((e) => log(e));
var streamWithTimeout = streamWithoutErrors.timeout(
const Duration(seconds: 2),
onTimeout: (eventSink) {
eventSink.addError('Timed out after 2 seconds');
eventSink.close();
},
);
await for (final event in streamWithTimeout) {
yield convert(event);
}
}transform() 函式
#transform() 函式不僅用於錯誤處理;它是一種更通用的流的“對映”。普通的 map 對於每個輸入的事件只需要一個值。然而,特別是對於 I/O 流,可能需要多個輸入的事件才能產生一個輸出事件。StreamTransformer 可以處理這種情況。例如,像 Utf8Decoder 這樣的解碼器就是轉換器。轉換器只需要一個函式 bind(),可以使用 async 函式輕鬆實現。
讀取和解碼檔案
#以下程式碼讀取檔案,並在流上執行兩次轉換。它首先將資料從 UTF8 轉換,然後透過 LineSplitter 進行處理。所有行都被打印出來,除了以井號 # 開頭的行。
import 'dart:convert';
import 'dart:io';
void main(List<String> args) async {
var file = File(args[0]);
var lines = utf8.decoder
.bind(file.openRead())
.transform(const LineSplitter());
await for (final line in lines) {
if (!line.startsWith('#')) print(line);
}
}listen() 方法
#Stream 上的最後一個方法是 listen()。這是一個“低階”方法——所有其他流函式都基於 listen() 定義。
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
});要建立一個新的 Stream 型別,您只需擴充套件 Stream 類並實現 listen() 方法——Stream 上的所有其他方法都會呼叫 listen() 才能工作。
listen() 方法允許您開始監聽流。在此之前,流只是一個描述您希望看到哪些事件的惰性物件。當您監聽時,會返回一個 StreamSubscription 物件,它代表正在生成事件的活動流。這類似於 Iterable 只是物件的集合,而迭代器才是實際執行迭代的物件。
流訂閱允許您暫停訂閱、在暫停後恢復訂閱以及完全取消訂閱。您可以設定回撥函式,以便在每個資料事件或錯誤事件發生時以及流關閉時呼叫。
其他資源
#閱讀以下文件,瞭解更多關於在 Dart 中使用流和非同步程式設計的詳細資訊。
- 在 Dart 中建立流,一篇關於建立自己的流的文章
- Future 與錯誤處理,一篇解釋如何使用 Future API 處理錯誤的文章
- 非同步程式設計,深入探討 Dart 對非同步性的語言支援
- Stream API 參考