跳到主要內容

隔離區

本頁討論了一些使用 Isolate API 實現隔離區的示例。

每當您的應用程式處理的計算量大到足以暫時阻塞其他計算時,您都應該使用隔離區。最常見的例子是在 Flutter 應用程式中,當您需要執行大量計算時,否則可能會導致 UI 無響應。

沒有關於您*必須*何時使用隔離區的規定,但以下是一些隔離區可能有所幫助的更多情況:

  • 解析和解碼特大的 JSON 資料塊。
  • 處理和壓縮照片、音訊和影片。
  • 轉換音訊和影片檔案。
  • 在大型列表或檔案系統中執行復雜的搜尋和過濾。
  • 執行 I/O 操作,例如與資料庫通訊。
  • 處理大量的網路請求。

實現一個簡單的輔助隔離區

#

這些示例實現了一個主隔離區,它會生成一個簡單的輔助隔離區。Isolate.run() 簡化了設定和管理輔助隔離區的步驟:

  1. 生成(啟動並建立)一個隔離區。
  2. 在生成的隔離區上執行一個函式。
  3. 捕獲結果。
  4. 將結果返回給主隔離區。
  5. 工作完成後終止該隔離區。
  6. 檢查、捕獲並將異常和錯誤拋回給主隔離區。

在新隔離區中執行現有方法

#
  1. 直接在主隔離區中呼叫 run() 來生成一個新的隔離區(一個後臺工作者),同時 main() 等待結果:
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. 將您希望輔助隔離區執行的函式作為其第一個引數傳遞。在本例中,它是現有函式 _readAndParseJson()
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 獲取 _readAndParseJson() 返回的結果,並將該值傳送回主隔離區,然後關閉輔助隔離區。

  2. 輔助隔離區將儲存結果的記憶體*轉移*到主隔離區。它*不會複製*資料。輔助隔離區會執行一次驗證,以確保物件可以被轉移。

_readAndParseJson() 是一個現有的非同步函式,它同樣可以很方便地直接在主隔離區中執行。但是,使用 Isolate.run() 來執行它會啟用併發。輔助隔離區完全抽象了 _readAndParseJson() 的計算。它可以在不阻塞主隔離區的情況下完成。

Isolate.run() 的結果始終是一個 Future,因為主隔離區中的程式碼會繼續執行。無論輔助隔離區執行的計算是同步還是非同步的,都不會影響主隔離區,因為它無論如何都在併發執行。

有關完整的程式,請檢視 send_and_receive.dart 示例。

透過隔離區傳送閉包

#

您也可以直接在主隔離區中使用函式字面量或閉包,透過 run() 建立一個簡單的輔助隔離區。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

此示例與上一個示例實現相同的功能。一個新的隔離區生成、計算並返回結果。

然而,現在隔離區傳送的是一個閉包。閉包在功能和程式碼編寫方式上都比典型的命名函式限制更少。在此示例中,Isolate.run() 併發執行看起來像原生代碼的部分。從這個意義上講,您可以將 run() 想象成一個用於“並行執行”的控制流運算子。

使用埠在隔離區之間傳送多條訊息

#

短生命週期的隔離區使用起來很方便,但生成新的隔離區並將物件從一個隔離區複製到另一個隔離區需要效能開銷。如果您的程式碼依賴於使用 Isolate.run 重複執行相同的計算,您可以透過建立不立即退出的長生命週期隔離區來提高效能。

為此,您可以使用 Isolate.run 抽象的一些低階隔離區 API:

本節將介紹在新生成的隔離區和主隔離區之間建立雙向通訊所需的步驟。第一個示例基本埠在高層次上介紹了該過程。第二個示例健壯埠在第一個示例的基礎上逐步添加了更多實用的、真實世界的功能。

ReceivePortSendPort

#

在隔離區之間建立長生命週期通訊需要兩個類(除了 Isolate):ReceivePortSendPort。這些埠是隔離區之間相互通訊的唯一方式。

ReceivePort 是一個處理從其他隔離區傳送的訊息的物件。這些訊息透過 SendPort 傳送。

埠的行為類似於 Stream 物件(事實上,接收埠實現了 Stream!)。您可以將 SendPortReceivePort 分別看作 Stream 的 StreamController 和監聽器。SendPort 就像一個 StreamController,因為您使用 SendPort.send() 方法向它們“新增”訊息,而這些訊息由監聽器(在本例中是 ReceivePort)處理。然後,ReceivePort 透過將其作為引數傳遞給您提供的回撥函式來處理接收到的訊息。

設定埠

#

新生成的隔離區只擁有透過 Isolate.spawn 呼叫接收到的資訊。如果您需要主隔離區在初始建立之後繼續與生成的隔離區通訊,則必須建立一個通訊通道,以便生成的隔離區可以向主隔離區傳送訊息。隔離區只能透過訊息傳遞進行通訊。它們無法“看到”彼此的記憶體內部,這就是“isolate”這個名稱的由來。

要設定這種雙向通訊,首先在主隔離區中建立一個 ReceivePort,然後在使用 Isolate.spawn 生成新隔離區時,將其 SendPort 作為引數傳遞給新隔離區。新隔離區隨後建立自己的 ReceivePort,並透過主隔離區傳遞給它的 SendPort 將*其自身*的 SendPort 傳送回去。主隔離區收到此 SendPort 後,雙方現在都擁有了一個開放的通道來發送和接收訊息。

A figure showing events being fed, one by one, into the event loop

  1. 在主隔離區中建立一個 ReceivePortSendPort 作為 ReceivePort 上的一個屬性自動建立。
  2. 使用 Isolate.spawn() 生成輔助隔離區
  3. ReceivePort.sendPort 的引用作為第一條訊息傳遞給輔助隔離區。
  4. 在輔助隔離區中建立另一個新的 ReceivePort
  5. 將輔助隔離區的 ReceivePort.sendPort 的引用作為第一條訊息*返回*給主隔離區。

除了建立埠和設定通訊之外,您還需要告知埠在接收到訊息時該做什麼。這可以透過在每個各自的 ReceivePort 上使用 listen 方法來完成。

A figure showing events being fed, one by one, into the event loop

  1. 透過主隔離區對輔助隔離區的 SendPort 的引用傳送訊息。
  2. 透過輔助隔離區的 ReceivePort 上的監聽器接收和處理訊息。這是您希望從主隔離區移出的計算執行的地方。
  3. 透過輔助隔離區對主隔離區的 SendPort 的引用傳送返回訊息。
  4. 透過主隔離區的 ReceivePort 上的監聽器接收訊息。

基本埠示例

#

本示例演示瞭如何設定一個長生命週期的輔助隔離區,並實現其與主隔離區之間的雙向通訊。程式碼使用將 JSON 文字傳送到新隔離區,並在那裡解析和解碼 JSON,然後將其傳送回主隔離區的示例。

步驟 1:定義工作器類

#

首先,為您的後臺輔助隔離區建立一個類。這個類包含您需要的所有功能,用於:

  • 生成一個隔離區。
  • 向該隔離區傳送訊息。
  • 讓隔離區解碼一些 JSON。
  • 將解碼後的 JSON 傳送回主隔離區。

該類暴露了兩個公共方法:一個用於生成輔助隔離區,另一個用於處理向該輔助隔離區傳送訊息。

本示例的其餘部分將逐一向您展示如何填充類方法。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

步驟 2:生成一個輔助隔離區

#

Worker.spawn 方法是您將用於建立輔助隔離區並確保其能夠接收和傳送訊息的程式碼分組的地方。

  • 首先,建立一個 ReceivePort。這允許主隔離區接收從新生成的輔助隔離區傳送的訊息。
  • 接下來,向接收埠新增一個監聽器,以處理輔助隔離區將傳送回的訊息。傳遞給監聽器的回撥函式 _handleResponsesFromIsolate 將在步驟 4 中介紹。
  • 最後,使用 Isolate.spawn 生成輔助隔離區。它需要兩個引數:一個將在輔助隔離區上執行的函式(在步驟 3 中介紹),以及接收埠的 sendPort 屬性。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

receivePort.sendPort 引數在輔助隔離區上呼叫回撥函式(_startRemoteIsolate)時,它將作為引數傳遞給該回調函式。這是確保輔助隔離區能夠將訊息傳送回主隔離區的第一步。

步驟 3:在輔助隔離區上執行程式碼

#

在此步驟中,您將定義 _startRemoteIsolate 方法,該方法被髮送到輔助隔離區,在它生成時執行。此方法類似於輔助隔離區的“main”方法。

  • 首先,建立另一個新的 ReceivePort。此埠將接收來自主隔離區的未來訊息。
  • 接下來,將該埠的 SendPort 傳送回主隔離區。
  • 最後,向新的 ReceivePort 新增一個監聽器。此監聽器處理主隔離區傳送給輔助隔離區的訊息。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

輔助隔離區 ReceivePort 上的監聽器解碼從主隔離區傳遞過來的 JSON,然後將解碼後的 JSON 傳送回主隔離區。

此監聽器是主隔離區傳送到輔助隔離區訊息的入口點。**這是您唯一一次有機會告訴輔助隔離區將來要執行的程式碼。**

步驟 4:在主隔離區上處理訊息

#

最後,您需要告訴主隔離區如何處理從輔助隔離區傳送回主隔離區的訊息。為此,您需要填充 _handleResponsesFromIsolate 方法。回想一下,如步驟 2 所述,此方法已傳遞給 receivePort.listen 方法。

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

還請記住,您在步驟 3 中將一個 SendPort 傳送回了主隔離區。此方法處理該 SendPort 的接收,以及處理未來的訊息(這些訊息將是解碼後的 JSON)。

  • 首先,檢查訊息是否為 SendPort。如果是,則將該埠賦值給類的 _sendPort 屬性,以便將來可以用來發送訊息。
  • 接下來,檢查訊息是否為 Map<String, dynamic> 型別,這是解碼 JSON 的預期型別。如果是,則使用您的應用程式特定邏輯處理該訊息。在此示例中,訊息被打印出來。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

步驟 5:新增 Completer 以確保您的隔離區已設定完成

#

為了完善該類,定義一個名為 parseJson 的公共方法,它負責向輔助隔離區傳送訊息。它還需要確保在隔離區完全設定好之前,訊息不會被髮送。為了處理這種情況,請使用 Completer

  • 首先,新增一個名為 Completer 的類級別屬性,並將其命名為 _isolateReady
  • 接下來,如果訊息是 SendPort,則在 _handleResponsesFromIsolate 方法(在步驟 4 中建立)中新增對 completer 的 complete() 呼叫。
  • 最後,在 parseJson 方法中,在新增 _sendPort.send 之前新增 await _isolateReady.future。這確保了在輔助隔離區生成*並且*已將其 SendPort 傳送回主隔離區之前,不會向其傳送任何訊息。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整示例

#
展開以檢視完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }

}

健壯埠示例

#

上一個示例解釋了設定具有雙向通訊的長期隔離區所需的基本構建塊。如前所述,該示例缺少一些重要功能,例如錯誤處理、在不再使用時關閉埠的能力,以及在某些情況下訊息排序的不一致性。

本示例在第一個示例的資訊基礎上進行了擴充套件,透過建立一個具有這些附加功能和更多功能的長期輔助隔離區,並遵循更好的設計模式。儘管此程式碼與第一個示例有相似之處,但它不是該示例的擴充套件。

步驟 1:定義工作器類

#

首先,為您的後臺輔助隔離區建立一個類。這個類包含您需要的所有功能,用於:

  • 生成一個隔離區。
  • 向該隔離區傳送訊息。
  • 讓隔離區解碼一些 JSON。
  • 將解碼後的 JSON 傳送回主隔離區。

該類暴露了三個公共方法:一個用於建立輔助隔離區,一個用於處理向該輔助隔離區傳送訊息,另一個用於在埠不再使用時關閉它們。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._responses, this._commands) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

步驟 2:在 Worker.spawn 方法中建立 RawReceivePort

#

在生成隔離區之前,您需要建立一個 RawReceivePort,它是 ReceivePort 的一個低階版本。使用 RawReceivePort 是一種首選模式,因為它允許您將隔離區啟動邏輯與處理隔離區上的訊息傳遞邏輯分開。

Worker.spawn 方法中:

  • 首先,建立 RawReceivePort。這個 ReceivePort 只負責接收來自輔助隔離區的初始訊息,該訊息將是一個 SendPort
  • 接下來,建立一個 Completer,它將指示隔離區何時準備好接收訊息。當它完成時,它將返回一個包含 ReceivePortSendPort 的記錄。
  • 接下來,定義 RawReceivePort.handler 屬性。此屬性是一個 Function?,其行為類似於 ReceivePort.listener。當此埠接收到訊息時,將呼叫該函式。
  • 在處理函式中,呼叫 connection.complete()。此方法需要一個包含 ReceivePortSendPort記錄作為引數。SendPort 是從輔助隔離區傳送的初始訊息,它將在下一步中賦值給類級別的 SendPort,名為 _commands
  • 然後,使用 ReceivePort.fromRawReceivePort 建構函式建立一個新的 ReceivePort,並傳入 initPort
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
  }
}

透過首先建立 RawReceivePort,然後再建立 ReceivePort,您稍後就可以向 ReceivePort.listen 新增新的回撥。相反,如果您直接建立 ReceivePort,則只能新增一個 listener,因為 ReceivePort 實現了 Stream,而不是 BroadcastStream

實際上,這允許您將隔離區啟動邏輯與在通訊設定完成後處理接收訊息的邏輯分開。隨著其他方法中邏輯的增長,這種好處將變得更加明顯。

步驟 3:使用 Isolate.spawn 生成輔助隔離區

#

此步驟繼續填充 Worker.spawn 方法。您將新增生成隔離區所需的程式碼,並從該類返回一個 Worker 例項。在此示例中,對 Isolate.spawn 的呼叫被包裝在 try/catch中,這確保瞭如果隔離區啟動失敗,initPort 將被關閉,並且不會建立 Worker 物件。

  • 首先,嘗試在 try/catch 塊中生成一個輔助隔離區。如果生成輔助隔離區失敗,則關閉上一步中建立的接收埠。傳遞給 Isolate.spawn 的方法將在稍後的步驟中介紹。
  • 接下來,await connection.future,並從它返回的記錄中解構傳送埠和接收埠。
  • 最後,透過呼叫其私有建構函式,並傳入該 completer 中的埠,返回一個 Worker 例項。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }
}

請注意,在此示例中(與上一個示例相比),Worker.spawn 充當此類的非同步靜態建構函式,並且是建立 Worker 例項的唯一方式。這簡化了 API,使建立 Worker 例項的程式碼更簡潔。

步驟 4:完成隔離區設定過程

#

在此步驟中,您將完成基本的隔離區設定過程。這與上一個示例幾乎完全相關,並且沒有新的概念。有一個細微的變化是程式碼被拆分成了更多的方法,這是一種設計實踐,可以為您在示例的其餘部分新增更多功能做好準備。有關設定隔離區基本過程的深入講解,請參見基本埠示例

首先,建立從 Worker.spawn 方法返回的私有建構函式。在建構函式體中,向主隔離區使用的接收埠新增一個監聽器,並向該監聽器傳遞一個尚未定義的方法,名為 _handleResponsesFromIsolate

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }
}

接下來,向 _startRemoteIsolate 新增負責初始化輔助隔離區上埠的程式碼。回想一下,此方法已在 Worker.spawn 方法中傳遞給 Isolate.spawn,它將把主隔離區的 SendPort 作為引數傳遞給它。

  • 建立一個新的 ReceivePort
  • 將該埠的 SendPort 傳送回主隔離區。
  • 呼叫一個名為 _handleCommandsToIsolate 的新方法,並將新的 ReceivePort 和來自主隔離區的 SendPort 都作為引數傳遞。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接下來,新增 _handleCommandsToIsolate 方法,它負責接收來自主隔離區的訊息,在輔助隔離區上解碼 JSON,並將解碼後的 JSON 作為響應傳送回去。

  • 首先,在輔助隔離區的 ReceivePort 上宣告一個監聽器。
  • 在新增到監聽器的回撥函式中,嘗試在 try/catch中解碼從主隔離區傳遞的 JSON。如果解碼成功,則將解碼後的 JSON 傳送回主隔離區。
  • 如果發生錯誤,則返回一個 RemoteError
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接下來,新增 _handleResponsesFromIsolate 方法的程式碼。

  • 首先,檢查訊息是否為 RemoteError,如果是,則應 throw 該錯誤。
  • 否則,列印訊息。在未來的步驟中,您將更新此程式碼以返回訊息而不是列印它們。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最後,新增 parseJson 方法,這是一個公共方法,允許外部程式碼將 JSON 傳送到輔助隔離區進行解碼。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

您將在下一步中更新此方法。

步驟 5:同時處理多條訊息

#

目前,如果您快速向輔助隔離區傳送訊息,隔離區將以*它們完成的順序*傳送解碼後的 JSON 響應,而不是它們被髮送的順序。您無法確定哪個響應對應哪條訊息。

在此步驟中,您將透過為每條訊息分配一個 ID,並使用 Completer 物件來解決此問題,以確保當外部程式碼呼叫 parseJson 時,返回給該呼叫者的響應是正確的響應。

首先,向 Worker 新增兩個類級別屬性:

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  // ···
}

_activeRequests 對映將傳送到輔助隔離區的訊息與一個 Completer 相關聯。_activeRequests 中使用的鍵取自 _idCounter,它將隨著更多訊息的傳送而增加。

接下來,更新 parseJson 方法,使其在向輔助隔離區傳送訊息之前建立 completer。

  • 首先建立一個 Completer
  • 接下來,遞增 _idCounter,以便每個 Completer 都與一個唯一的數字相關聯。
  • _activeRequests 對映中新增一個條目,其中鍵是當前的 _idCounter 值,值是 completer。
  • 將訊息連同 ID 一起傳送給輔助隔離區。因為您只能透過 SendPort 傳送一個值,所以將 ID 和訊息包裝在記錄中。
  • 最後,返回 completer 的 future,它最終將包含來自輔助隔離區的響應。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

您還需要更新 _handleResponsesFromIsolate_handleCommandsToIsolate 以處理此係統。

_handleCommandsToIsolate 中,您需要考慮 message 是一個包含兩個值的記錄,而不僅僅是 JSON 文字。透過從 message 解構值來實現這一點。

然後,在解碼 JSON 後,更新對 sendPort.send 的呼叫,以再次使用記錄將 ID 和解碼後的 JSON 都傳回主隔離區。

dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最後,更新 _handleResponsesFromIsolate

  • 首先,再次從訊息引數中解構出 ID 和響應。
  • 然後,從 _activeRequests 對映中移除與此請求對應的 completer。
  • 最後,不要丟擲錯誤或列印解碼後的 JSON,而是完成 completer,並傳入響應。當它完成時,響應將返回給在主隔離區上呼叫 parseJson 的程式碼。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

步驟 6:新增關閉埠的功能

#

當您的程式碼不再使用隔離區時,您應該關閉主隔離區和輔助隔離區上的埠。

  • 首先,新增一個類級別的布林值,用於跟蹤埠是否已關閉。
  • 然後,新增 Worker.close 方法。在該方法中:
    • _closed 更新為 true。
    • 向輔助隔離區傳送最後一條訊息。此訊息是一個讀取“shutdown”的 String,但它可以是您喜歡的任何物件。您將在下一個程式碼片段中使用它。
  • 最後,檢查 _activeRequests 是否為空。如果為空,則關閉主隔離區中名為 _responsesReceivePort
dart
class Worker {
  bool _closed = false;
  // ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}
  • 接下來,您需要在輔助隔離區中處理“shutdown”訊息。將以下程式碼新增到 _handleCommandsToIsolate 方法中。此程式碼將檢查訊息是否為讀取“shutdown”的 String。如果是,它將關閉輔助隔離區的 ReceivePort 並返回。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最後,您應該新增程式碼,在嘗試傳送訊息之前檢查埠是否已關閉。在 Worker.parseJson 方法中新增一行。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整示例

#
點選此處展開檢視完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
    await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]),
  );
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}