AsyncThrowingStream and AsyncStream are part of the concurrency framework introduced in Swift 5.5 due to SE-314. Async streams allow you to replace existing code that is based on closures or Combine publishers.
Before diving into details around throwing streams, I recommend reading my article covering async-await if you didn’t do so yet. Most of the code explained in this article will use the APIs explained there.
What is an AsyncThrowingStream?
You can see an AsyncThrowingStream as a stream of elements that could potentially result in a thrown error. Values deliver over time, and the stream can be closed by a finish event. A finish event could either be a success or a failure once an error occurs.
What is an AsyncStream?
An AsyncStream is similar to the throwing variant but will never result in a throwing error. A non-throwing async stream finishes based on an explicit finished call or when the stream cancels.
In this article, we’ll explain how to use an AsyncThrowingStream. The code examples are similar for AsyncStream except for pieces where error handling occurs.
How to use AsyncThrowingStream
An AsyncThrowingStream can be an excellent replacement for existing code based upon closures like progress and completion handlers. To better understand what I mean, I’ll introduce you to a scenario we encountered in the WeTransfer app.
In our app, we had an existing class based on closures called the FileDownloader
:
struct FileDownloader {
enum Status {
case downloading(Float)
case finished(Data)
}
func download(_ url: URL, progressHandler: (Float) -> Void, completion: (Result<Data, Error>) -> Void) throws {
// .. Download implementation
}
}
The file downloader takes a URL, reports progress, and completes with a result containing the downloaded data or an error on failure.
The file downloader reports a stream of values during the file download. In this case, it’s reporting a stream of status values to report the current status of the running download. The FileDownloader
is a perfect example of a piece of code that you can rewrite to use AsyncThrowingStream. Though, rewriting requires you to rewrite your code at the implementation level as well, so let’s define an overload method instead:
extension FileDownloader {
func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
return AsyncThrowingStream { continuation in
do {
try self.download(url, progressHandler: { progress in
continuation.yield(.downloading(progress))
}, completion: { result in
switch result {
case .success(let data):
continuation.yield(.finished(data))
continuation.finish()
case .failure(let error):
continuation.finish(throwing: error)
}
})
} catch {
continuation.finish(throwing: error)
}
}
}
}
As you can see, we wrapped the download method inside an AsyncThrowingStream. We describe the stream’s type of value Status
as a generic, allowing us to continue the stream with status updates.
We will finish the stream by throwing an error whenever an error occurs. In the case of the completion handler, we’re either finishing by throwing an error or following up the yield with data with a non-throwing finish callback:
switch result {
case .success(let data):
continuation.yield(.finished(data))
continuation.finish()
case .failure(let error):
continuation.finish(throwing: error)
}
It’s essential to not forget about the finish()
callback after you’ve received the final status update. Otherwise, we will keep the stream alive, and code at the implementation level will never continue.
We could rewrite the above code by making use of another yield method, accepting a Result
enum as an argument:
continuation.yield(with: result.map { .finished($0) })
continuation.finish()
The rewrite simplifies our code and takes away the switch case. We do have to map our Result
enum to match the expected Status
value. Our stream will finish after throwing the contained error if we yield a failing result.
Iterating over an AsyncThrowingStream
You can start iterating over the stream of values once you’ve configured your async throwing stream. In the case of our FileDownloader
example, it will look as follows:
do {
for try await status in download(url) {
switch status {
case .downloading(let progress):
print("Downloading progress: \(progress)")
case .finished(let data):
print("Downloading completed with data: \(data)")
}
}
print("Download finished and stream closed")
} catch {
print("Download failed with \(error)")
}
We handle any status update, and we can use the catch closure to handle any occurred errors. You can iterate using a for ... in
loop based on the AsyncSequence
interface, which works the same for an AsyncStream.
In case you’re running into a compile error like:
‘async’ in a function that does not support concurrency
You might want to read my article covering async-await in depth.
The print statements in the above code example help you understand the lifecycle of an AsyncThrowingStream. You can replace the print statements to handle the progress updates and process the data to visualize it for your users.
Debugging an AsyncStream
If a stream fails to report values, we could debug the stream’s yield callbacks by placing breakpoints. Though it could also be that the above “Download finished and stream closed” print statement won’t call, which means your code at the implementation level never continues. The latter could be a result of an unfinished stream.
To validate, we could make use of the onTermination
callback:
func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
return AsyncThrowingStream { continuation in
/// Configure a termination callback to understand the lifetime of your stream.
continuation.onTermination = { @Sendable status in
print("Stream terminated with status \(status)")
}
// ..
}
}
The callback is called on termination of the stream and will tell you whether your stream is still alive or not. I recommend reading Sendable and @Sendable closures explained with code examples to understand the @Sendable
attribute.
In case of a thrown error, the output could look as follows:
Stream terminated with status finished(Optional(FileDownloader.FileDownloadingError.example))
The above output will only be possible when using an AsyncThrowingStream. In the case of a regular AsyncStream, the finished output looks as follows:
Stream terminated with status finished
While the result of cancellation looks like this for both types of streams:
Stream terminated with status cancelled
You can also use this termination callback for any cleanup after the stream finishes. Examples could be removing any observers or cleaning disk space after the file download.
Canceling an AsyncStream
An AsyncStream or AsyncThrowingStream can cancel due to an enclosing task getting canceled. An example could look as follows:
let task = Task.detached {
do {
for try await status in download(url) {
switch status {
case .downloading(let progress):
print("Downloading progress: \(progress)")
case .finished(let data):
print("Downloading completed with data: \(data)")
}
}
} catch {
print("Download failed with \(error)")
}
}
task.cancel()
A stream cancels when going out of scope or when the enclosing task cancels. As mentioned before, the cancellation will trigger the onTermination
callback accordingly.
Continuing your journey into Swift Concurrency
If you like what you’ve read about async streams, you might enjoy other concurrency topics as well:
- MainActor usage in Swift explained to dispatch to the main thread
- How to Use URLSession with Async/Await for Network Requests in Swift
- Async await in Swift explained with code examples
- Swift 6: Incrementally migrate your Xcode projects and packages
- Concurrency-safe global variables to prevent data races
- Unit testing async/await Swift code
- Thread dispatching and Actors: understanding execution
- @preconcurrency: Incremental migration to concurrency checking
- Detached Tasks in Swift explained with code examples
- Task Groups in Swift explained with code examples
- Sendable and @Sendable closures explained with code examples
- AsyncSequence explained with Code Examples
- AsyncThrowingStream and AsyncStream explained with code examples
- Tasks in Swift explained with code examples
- Nonisolated and isolated keywords: Understanding Actor isolation
- Async let explained: call async functions in parallel
- Actors in Swift: how to use and prevent data races
Conclusion
An AsyncThrowingStream or AsyncStream is a great way to rewrite existing code based on closures to async-await supporting alternatives. You can deliver a continuous stream of values and finish a stream on success or failure. You can iterate values on the implementation level using a for loop based on the AsyncSequence
APIs.
If you like to improve your Swift knowledge, even more, check out the Swift category page. Feel free to contact me or tweet me on Twitter if you have any additional tips or feedback.
Thanks!