Stream이란?
스트림(Stream)이란 데이터의 흐름을 의미합니다.
이러한 데이터 흐름은 단방향이며, 한번에 일부 데이터를 처리합니다.
즉, 데이터를 한번에 모두 처리하지 않고, 데이터가 하나씩 또는 여러 개씩 일정한 간격으로 생성되어 이를 연속적으로 처리하는 방식입니다.
스트림은 많은 데이터를 처리하는 경우 유용하며, 여러 개의 데이터를 처리할 때 코드의 가독성과 유지보수성이 좋아집니다.
스트림은 실시간으로 데이터를 처리하고, 데이터가 발생하는 즉시 처리할 수 있어 대규모 데이터 처리에 유용합니다.
AsyncStream이란?
순서가 있고, 비동기적으로 생성된 요소들의 Sequence입니다. 클로저 내부에서 하고 싶은 일을 정의할 수 있습니다.
기존의 Sequence를 생성하려면 IteratorProtocol(반복자)를 직접 구현해서 만들어 줘야 했지만,
AsyncStream은 AsyncSequence를 준수하여 Iterator
를 직접 구현할 필요 없이 간편하게 사용할 수 있습니다.
특히, AsyncStream는 클로저 형태 사용되므로,
async-await를 사용 시, 콜백 또는 대리자 형식의 API를 적용하는 데 적합하다고 하네요.
AsyncStream를 제대로 이해하기 위해선 이전글인 Sequence 와 AsyncSequence 를 참고해 주세요.
선언
AsyncStream
struct AsyncStream<Element>
순서
1. AsyncStream로 인스턴스를 생성
let stream = AsyncStream(build: (AsyncStream<Element>.Continuation) -> Void)
2. 원하는 타입을 지정해 주기
타입추론이 안되므로 타입을 제너릭 또는 타입자체(Type.self)로 지정해줘야 합니다.
let stream = AsyncStream<Int> { continuation in
...
}
3. 클로저 안에서 하고 싶은 작업을 정의
sleep()
을 통하여 1초마다 순차적으로 숫자가 생성되게 만들어 줬습니다.
let stream = AsyncStream<Int> { continuation in
Task.detached {
for num in 1 ... 5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
}
4. 실행
Task {
for await num in stream {
print(num)
}
}
// 1초 마다
// 1
// 2
// 3
// 4
// 5
이와 같이 따로 Iterator
을 만들어 줄 필요 없이 간단히 AsyncStream를 생성할 수 있습니다.
Sequence 이므로 for-in loop에서 사용할 수 있게 되는 거죠.
초기화(init) 간단 정리
대략적인 순서는 알아봤으니 AsyncStream의 생성자(초기화)에 대하여 조금 더 알아봅시다.
AsyncStream.init
init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
_ build: (AsyncStream<Element>.Continuation) -> Void
)
AsyncStream.init의 파라미터
- elementType
- AsyncStream에 사용될 타입을 정의한다
- bufferingPolicy
- buffer란? 입출력 데이터 등의 정보를 전송할 때 일시적인 데이터 저장 장소로 사용되는 기억 장소.
buffering은 buffer에 데이터를 담는 과정을 말합니다. - AsyncStream의 버퍼링 동작을 설정합니다. 기본값으로는 무제한으로 버퍼링 합니다.
- buffer란? 입출력 데이터 등의 정보를 전송할 때 일시적인 데이터 저장 장소로 사용되는 기억 장소.
- build
- 클로저에서
continuation
을 통하여 생산(yield
) 및 종료(finish
), 종료 처리(onTermination
) 등을 정의
- 클로저에서
넘어가기 전에 bufferingPolicy에 대하여 간단히 알아보겠습니다.
case unbounded // 무제한으로 요소를 추가 시킵니다.
case bufferingOldest(Int) // 버퍼가 가득차면 오래된 요소를 제외하고 나머지를 버립니다.
case bufferingNewest(Int) // 버퍼가 가득차면 최신 요소를 제외하고 나머지를 버립니다.
아래 코드는 bufferingOldest
를 사용하여 buffer의 크기를 2로 정해줬습니다.
이렇게 정의를 하면 오래된 요소 2개만 남기게 되겠네요.
let stream = AsyncStream<Int>(Int.self,
bufferingPolicy: .bufferingOldest(2)) { continuation in 🎯
for num in 1 ... 5 {
continuation.yield(num)
}
continuation.finish()
}
Task {
for await num in stream {
print(num)
}
}
// 1
// 2
// 만약 bufferingPolicy를 bufferingNewest로 처리를 하면
// 4
// 5
이렇게 bufferingPolicy
가 대략적으로 어떻게 동작하는지 알아보았습니다.
AsyncThrowingStream 란
AsyncStream과 구조는 거의 유사합니다.
다만, AsyncThrowingStream은 에러를 던질(throw
) 수 있어, Stream의 완료 값으로 성공 또는 실패를 갖게 되죠.
참고로 에러가 발생하게 되면 Sequence가 종료됩니다.
선언
AsyncThrowingStream
struct AsyncThrowingStream<Element, Failure> where Failure : Error
순서
위의 AsyncStream의 예제와 동일하게 만들었습니다.
1초마다 숫자를 생성시켜 주고 num이 3보다 커지면 에러를 발생되도록 만들어 줬습니다.
에러 처리를 위해 finish(throwing:)
메서드를 사용하여 에러처리를 해줍시다.
enum NumError: Error {
case overThree
}
let stream = AsyncThrowingStream<Int, Error> { continuation in
Task.detached {
for num in 1 ... 5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
if num > 3 {
continuation.finish(throwing: NumError.overThree) 🎯
} else {
continuation.yield(num)
}
}
continuation.finish()
}
}
Task {
do {
for try await num in stream {
print(num)
}
} catch {
print(error)
}
}
//1초 마다
// 1
// 2
// 3
// overThree
AsyncStream / AsyncThrowingStream의 중요한 파라미터 (Continuation) ⭐️
위에서 AsyncStream이나 AsyncThrowingStream을 만들어 줄 때보면
yield
, finish
와 같은 생소한 메서드들이 나왔습니다.
이것들은 바로 AsyncStream.Continuation 에 포함돼 있는 메서드들입니다.
AsyncStream.Continuation
AsyncStream.Continuation
는 동기 코드와 비동기 스트림 간의 인터페이스를 위한 메커니즘입니다.
특징으로는 escaping을 지원합니다.
Continuation의 메서드
yield
func yield(_ value: Element) -> AsyncStream<Element>.Continuation.YieldResult
AsyncStream에 Element를 제공하여
yield
을 만나게 되면 다음 반복작업을 기다립니다. (다음 for-in loop를 기다린다)
finish
func finish()
Sequence의 반복(Iteration)은 nil
을 만나면 종료가 됩니다.
여기서 finish()
메서드는 nil
을 반환시켜 줍니다.
즉, 간단히 말해서 반복 작업을 종료한 뒤 다음 작업을 진행시키는 거죠.
onTermination
var onTermination: ((AsyncStream<Element>.Continuation.Termination) -> Void)? { get nonmutating set }
enum Termination {
case finished // Stream이 종료 될 때
case cancelled // Stream이 취소 될 때
}
AsyncStream의 종료 및 취소 시 콜백이 호출되며 스트림의 활성 상태 여부를 체크할 수 있습니다.
아래와 같이 사용할 수 있습니다.
let stream = AsyncStream<Int> { continuation in
continuation.onTermination = { status in 🎯
switch status {
case .finished:
print("Stream terminated with status : \(status)")
case .cancelled:
print("Stream terminated with status : \(status)")
}
}
Task.detached {
for num in 1...5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
print("Progress finish before")
continuation.finish()
print("Progress finish after")
}
}
let task = Task {
for await num in stream {
print(num)
}
}
// 1
// 2
// 3
// 4
// 5
// Progress finish before
// Stream terminated with status : finished
// Progress finish after
AsyncStream의 작업을 취소하기 위해서는
비동기 작업에 cancel()
을 걸어주면 작업이 중지되면서 onTermination
의 콜백 작업을 실행(Trigger)을 하게 되죠.
let task = Task {
for await num in stream {
print(num)
}
}
sleep(3)
task.cancel()
// 1
// 2
// Stream terminated with status : cancelled
onTermination
를 직접적으로 사용이 가능이 하긴한데...
continuation.onTermination?(.finished)
continuation.onTermination?(.cancelled)
호출을 해도 숫자 카운팅이 멈추지 않는 걸로 봐서는 onTermination
의 콜백을 정의해 줄 때
이런 식으로 따로 취소 작업을 해줘야 하는 것 같네요.
- Option 1 : 비동기 작업 취소
task.cancel()
- Option 2 :
continuation.finish()
로 작업을 종료 시킴 - Option 3 : break를 걸어준다
var task: Task<(), Never>?
let stream = AsyncStream<Int> { continuation in
continuation.onTermination = { status in
switch status {
case .finished:
print("Stream terminated with status : \(status)")
case .cancelled:
print("Stream terminated with status : \(status)")
task?.cancel() 🎯 // Option 1
continuation.finish() 🎯 // Option 2
break 🎯 // Option 3
}
}
task = Task.detached {
for num in 1...5 {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
if num > 3 {
continuation.onTermination?(.cancelled) 🎯
} else {
continuation.yield(num)
}
}
continuation.finish()
}
}
Task {
for await num in stream {
print(num)
}
}
// 1
// 2
// 3
// Stream terminated with status : cancelled
// Stream terminated with status : cancelled
// Stream terminated with status : finished
그런데 continuation.finish()
를 제외한 나머지는 남은 작업만큼 취소를 호출하게 되더라고요.
만약에 사용하게 된다면 continuation.finish()
로 사용합시다!
Stream은 확실히 편하게 Squence를 만들어 주는데
생소한 단어들이 많이 나와 어려웠던 문법이었네요...
부족한 설명이지만, 조금은 이해 가셨나요?
틀린 내용이 있다면 언제든지 지적해 주시면 감사히 받겠습니다. 🫠
읽어주셔서 감사합니다 😃
참고
- AsyncStream
- AsyncThrowingStream
- https://www.avanderlee.com/swift/asyncthrowingstream-asyncstream/
- https://zeddios.tistory.com/1341
'Xcode > Swift 문법' 카테고리의 다른 글
[iOS/Swift] 동시성(Concurrency) 프로그래밍 (0) | 2023.02.04 |
---|---|
[iOS/Swift] 형 변환 (Type Casting) (0) | 2023.01.31 |
[iOS/Swift] 프로토콜 AsyncSequence (비동기 시퀀스) (0) | 2023.01.20 |
[iOS/Swift] 프로토콜 Sequence (0) | 2023.01.19 |
[iOS/Swift] AssociatedTypes (프로토콜을 위한 Generic문법) (0) | 2023.01.13 |