Danny의 iOS 컨닝페이퍼
article thumbnail
반응형

Stream이란?

스트림(Stream)이란 데이터의 흐름을 의미합니다.

 

이러한 데이터 흐름은 단방향이며, 한번에 일부 데이터를 처리합니다.

 

즉, 데이터를 한번에 모두 처리하지 않고, 데이터가 하나씩 또는 여러 개씩 일정한 간격으로 생성되어 이를 연속적으로 처리하는 방식입니다.

 

스트림은 많은 데이터를 처리하는 경우 유용하며, 여러 개의 데이터를 처리할 때 코드의 가독성과 유지보수성이 좋아집니다.

 

스트림은 실시간으로 데이터를 처리하고, 데이터가 발생하는 즉시 처리할 수 있어 대규모 데이터 처리에 유용합니다.

 

 

AsyncStream이란?

순서가 있고, 비동기적으로 생성된 요소들의 Sequence입니다. 클로저 내부에서 하고 싶은 일을 정의할 수 있습니다.

 

기존의 Sequence를 생성하려면 IteratorProtocol(반복자)직접 구현해서 만들어 줘야 했지만,

 

AsyncStreamAsyncSequence를 준수하여 Iterator를 직접 구현할 필요 없이 간편하게 사용할 수 있습니다.

 

특히, AsyncStream는 클로저 형태 사용되므로,

async-await를 사용 시, 콜백 또는 대리자 형식의 API를 적용하는 데 적합하다고 하네요.

 

AsyncStream를 제대로 이해하기 위해선 이전글인 SequenceAsyncSequence 를 참고해 주세요.

 

선언

AsyncStream

struct AsyncStream<Element>

 

순서

1. AsyncStream로 인스턴스를 생성

let stream = AsyncStream(build: (AsyncStream<Element>.Continuation) -> Void)

 

2. 원하는 타입을 지정해 주기

 

타입추론이 안되므로 타입을 제너릭 또는 타입자체(Type.self)로 지정해줘야 합니다.

let stream = AsyncStream<Int> { continuation in
    ...
}

 

3. 클로저 안에서 하고 싶은 작업을 정의

 

sleep()을 통하여 1초마다 순차적으로 숫자가 생성되게 만들어 줬습니다.

let stream = AsyncStream<Int> { continuation in
    Task.detached {
        for num in 1 ... 5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
    }
}

 

4. 실행

Task {
    for await num in stream {
        print(num)
    }
}

// 1초 마다

// 1
// 2
// 3
// 4
// 5

이와 같이 따로 Iterator을 만들어 줄 필요 없이 간단히 AsyncStream를 생성할 수 있습니다.

 

Sequence 이므로 for-in loop에서 사용할 수 있게 되는 거죠.

 

초기화(init) 간단 정리

대략적인 순서는 알아봤으니 AsyncStream생성자(초기화)에 대하여 조금 더 알아봅시다.

 

AsyncStream.init

init(
    _ elementType: Element.Type = Element.self,
    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
    _ build: (AsyncStream<Element>.Continuation) -> Void
)

 

AsyncStream.init의 파라미터

  • elementType
    • AsyncStream에 사용될 타입을 정의한다
  • bufferingPolicy
    • buffer란? 입출력 데이터 등의 정보를 전송할 때 일시적인 데이터 저장 장소로 사용되는 기억 장소.
      bufferingbuffer데이터를 담는 과정을 말합니다.
    • AsyncStream버퍼링 동작설정합니다. 기본값으로는 무제한으로 버퍼링 합니다.
  • build
    • 클로저에서 continuation을 통하여 생산(yield) 및 종료(finish), 종료 처리(onTermination) 등을 정의

 

넘어가기 전에 bufferingPolicy에 대하여 간단히 알아보겠습니다.

case unbounded             // 무제한으로 요소를 추가 시킵니다.
case bufferingOldest(Int)  // 버퍼가 가득차면 오래된 요소를 제외하고 나머지를 버립니다.
case bufferingNewest(Int)  // 버퍼가 가득차면 최신 요소를 제외하고 나머지를 버립니다.

 

아래 코드는 bufferingOldest를 사용하여 buffer의 크기를 2로 정해줬습니다.

 

이렇게 정의를 하면 오래된 요소 2개만 남기게 되겠네요.

let stream = AsyncStream<Int>(Int.self,
                              bufferingPolicy: .bufferingOldest(2)) { continuation in  🎯
    for num in 1 ... 5 {
            continuation.yield(num)
        }
        continuation.finish()
}

Task {
    for await num in stream {
        print(num)
    }
}

// 1
// 2

// 만약 bufferingPolicy를 bufferingNewest로 처리를 하면
// 4
// 5

이렇게 bufferingPolicy가 대략적으로 어떻게 동작하는지 알아보았습니다.

 

 

AsyncThrowingStream 란

AsyncStream구조는 거의 유사합니다.

 

다만, AsyncThrowingStream에러를 던질(throw) 수 있어,  Stream의 완료 값으로 성공 또는 실패를 갖게 되죠.

 

참고로 에러가 발생하게 되면 Sequence종료됩니다.

 

선언

AsyncThrowingStream

struct AsyncThrowingStream<Element, Failure> where Failure : Error

 

순서

위의 AsyncStream의 예제와 동일하게 만들었습니다. 

 

1초마다 숫자를 생성시켜 주고 num이 3보다 커지면 에러를 발생되도록 만들어 줬습니다.

 

에러 처리를 위해 finish(throwing:) 메서드를 사용하여 에러처리를 해줍시다.

enum NumError: Error {
    case overThree
}

let stream = AsyncThrowingStream<Int, Error> { continuation in
    Task.detached {
        for num in 1 ... 5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            if num > 3 {
                continuation.finish(throwing: NumError.overThree)  🎯
            } else {
                continuation.yield(num)
            }
        }
        continuation.finish()
    }
}

Task {
    do {
        for try await num in stream {
            print(num)
        }
    } catch {
      print(error)
    }
}

//1초 마다

// 1
// 2
// 3
// overThree

 

 

AsyncStream / AsyncThrowingStream의 중요한 파라미터 (Continuation) ⭐️

위에서 AsyncStream이나 AsyncThrowingStream을 만들어 줄 때보면

 

yield, finish와 같은 생소한 메서드들이 나왔습니다.

 

이것들은 바로 AsyncStream.Continuation포함돼 있는 메서드들입니다.

AsyncStream.Continuation

AsyncStream.Continuation는 동기 코드비동기 스트림 간의 인터페이스를 위한 메커니즘입니다.

 

특징으로는 escaping을 지원합니다.

 

Continuation의 메서드

  • yield
func yield(_ value: Element) -> AsyncStream<Element>.Continuation.YieldResult

AsyncStream에 Element를 제공하여

 

yield을 만나게 되면 다음 반복작업기다립니다. (다음 for-in loop를 기다린다)


  • finish
func finish()

Sequence반복(Iteration)nil을 만나면 종료가 됩니다.

 

여기서 finish() 메서드는 nil반환시켜 줍니다.

 

즉, 간단히 말해서 반복 작업종료한 뒤 다음 작업진행시키는 거죠.


  • onTermination
var onTermination: ((AsyncStream<Element>.Continuation.Termination) -> Void)? { get nonmutating set }

enum Termination {
    case finished     // Stream이 종료 될 때
    case cancelled    // Stream이 취소 될 때
}

AsyncStream종료취소콜백이 호출되며 스트림활성 상태 여부를 체크할 수 있습니다.

 

아래와 같이 사용할 수 있습니다.

let stream = AsyncStream<Int> { continuation in
    continuation.onTermination = { status in  🎯 
        switch status {
        case .finished: 
            print("Stream terminated with status : \(status)")  
        case .cancelled:                 
            print("Stream terminated with status : \(status)")  
        }
    }
    
    Task.detached {
        for num in 1...5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        print("Progress finish before")
        continuation.finish()
        print("Progress finish after")
    }
}

let task = Task {
    for await num in stream {
        print(num)
    }
}

// 1
// 2
// 3
// 4
// 5
// Progress finish before
// Stream terminated with status : finished
// Progress finish after

 

AsyncStream작업취소하기 위해서는

 

비동기 작업cancel()을 걸어주면 작업이 중지되면서 onTermination콜백 작업을 실행(Trigger)을 하게 되죠.

let task = Task {
    for await num in stream {
        print(num)
    }
}
sleep(3)
task.cancel()

// 1
// 2
// Stream terminated with status : cancelled

 

onTermination직접적으로 사용이 가능이 하긴한데...

continuation.onTermination?(.finished)
continuation.onTermination?(.cancelled)

호출을 해도 숫자 카운팅이 멈추지 않는 걸로 봐서는 onTermination의 콜백을 정의해 줄 때 

 

이런 식으로 따로 취소 작업을 해줘야 하는 것 같네요.

  • Option 1 : 비동기 작업 취소 task.cancel()
  • Option 2 : continuation.finish()로 작업을 종료 시킴 
  • Option 3 : break를 걸어준다
var task: Task<(), Never>?

let stream = AsyncStream<Int> { continuation in
    
    continuation.onTermination = { status in
        switch status {
        case .finished:
            print("Stream terminated with status : \(status)")
        case .cancelled:
            print("Stream terminated with status : \(status)")
            
            task?.cancel()         🎯  // Option 1 
            continuation.finish()  🎯  // Option 2
            break                  🎯  // Option 3
        }
    }
    
    task = Task.detached {
        for num in 1...5 {
            try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            
            if num > 3 {
                continuation.onTermination?(.cancelled)  🎯
            } else {
                continuation.yield(num)
            }
        }
        continuation.finish()
    }
    
}


Task {
    for await num in stream {
        print(num)
    }
}

// 1
// 2
// 3
// Stream terminated with status : cancelled
// Stream terminated with status : cancelled
// Stream terminated with status : finished

그런데 continuation.finish()를 제외한 나머지는 남은 작업만큼 취소를 호출하게 되더라고요.

 

만약에 사용하게 된다면 continuation.finish()로 사용합시다!

 

 


Stream은 확실히 편하게 Squence를 만들어 주는데
생소한 단어들이 많이 나와 어려웠던 문법이었네요...

부족한 설명이지만, 조금은 이해 가셨나요?

틀린 내용이 있다면 언제든지 지적해 주시면 감사히 받겠습니다. 🫠
읽어주셔서 감사합니다 😃

 

 

참고

 

 

 

 

 

반응형
profile

Danny의 iOS 컨닝페이퍼

@Danny's iOS

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!