Swift Concurrency

Concurrency

async

func listPhotos(inGallery name: String) async -> [String] {
    let result = await asyncFunction() // 비동기 코드
    return result
}
  • 함수 정의 뒷부분에 async를 붙이면 해당 함수는 비동기라는 것을 나타냄
  • async 함수는 concurrent context 내부 즉, 다른 async 함수 내부 혹은 Task 내부에서 사용 가능
extension UIImage {
    var thumbnail: UIImage? {
        get async {
            let size = CGSize(width: 40, height: 40)
            return await self.byPreparingThumbnail(ofSize: size)
        }
    }
}
init(userID: Int) async throws {
    let fetchedUser = try await UserManager.fetchUserData(id: userID)
    self.user = fetchedUser
}
  • 함수뿐만이 아니라, 프로퍼티나 생성자에도 async 사용 가능
  • 예를들어, read-only 프로퍼티는 async 사용 가능

await

print("비동기 함수 호출 전") // Thread A에서 실행
await asyncFunction() // Thread A 제어권 시스템에게 줌. 나중에 시스템이 이후 작업에 대한 Thread 제어권 알아서 줌.
  • async 함수를 호출하기 위해 await이 필요함
  • await는 potential suspension point로 지정 된다는 것을 의미
  • suspend 된다는 것은 해당 thread에 대한 control을 포기한다는 것
  • 해당 코드를 돌리고 있던 thread에 대한 control은 system에게 가고, system은 해당 thread를 사용하여 다른 작업 가능
print("비동기 함수 호출 전") // Thread A에서 실행
await asyncFunction() // Thread A 제어권 시스템에게 줬다가 시스템으로부터 Thread B 제어권을 받을때 실행
print("비동기 함수 호출 후") // Thread B에서 이어서 실행
  • await로 인한 중단은 해당 thread에서 다른 코드의 실행을 막지는 않음
  • function은 suspend 되고, 다른 것들이 먼저 실행 될 수 있고 그렇기에 그 동안 앱의 상태가 크게 변할 수 있음
  • thread를 차단하는 대신 control을 포기해 작업을 중지 및 재개할 수 있는 개념을 도입
func fetchData() async -> String {
    print("데이터 가져오기 시작")
    try? await Task.sleep(for: .seconds(2.0)) // 2초 대기
    print("데이터 가져오기 완료")
    return "데이터"
}

func processData() async {
    print("프로세스 시작")

    // await 호출: control을 시스템에 넘김
    let data = await fetchData() 
    
    // fetchData가 완료되면 이 부분이 재개됨
    print("가져온 데이터: \(data)")
    print("프로세스 종료")
}

Task {
    await processData()
    print("모든 작업 완료")
}

  • 정리하자면, await로 async 함수를 호출하는 순간 해당 thread control 포기
  • 따라서 async 작업 및 같은 블럭에 있는 다음 코드들을 바로 실행하지 못함
  • thread control을 system에게 넘기면서, system에게 해당 async 작업도 schedule
  • system은 다른 중요한 작업이 있다면 먼저 실행하고, 특정 thread control을 줘서 async 함수와 나머지 코드를 resume
for await id in staticImageIDsURL.lines {
    let thumbnail = await fetchThumbnail(for: id)
    collage.add(thumbnail)
}

let result = await collage.draw()
  • for loop에서도 await 사용 가능
swift-concurrency

Task

SwiftUI task에서 비동기 작업하기

print("1")
Task {
    print("2") // 비동기로 실행
}
print("3")
  • Task는 비동기 작업 단위: A unit of asynchronous work
  • 격리되어(isolated), 독립적으로(independently) 비동기 작업을 수행
  • 값이 공유될 상황이 있을때는 Sendable 체킹을 통해 Task가 격리된 상태로 남아있는지 체크
Task {
    // 비동기 context. 순서대로 실행.
    let fish = await catchFish()
    let dinner = await cook(fish)
    await eat(dinner)
}
  • Task 안에서의 작업은 처음부터 끝까지 순차적으로 실행
  • await를 만나면 작업은 몇번이고 중단될 수는 있지만, 실행 순서가 변경되지는 않음
if Task.isCancelled { }
try Task.checkCancellation()
  • isCancelled는 단순히 cancel 되었는지 확인하는 플래그
  • checkCancellation()은 cancel 되었으면 CancellationError을 throw 함
  • Cancellation은 어디서나 체크할수 있음
func makeSoup(order: Order) async throws -> Soup {
    async let pot = stove.boilBroth()
    
    // 1

    guard !Task.isCancelled else {
        throw SoupCancellationError()
    }
    
    // 2

    async let choppedIngredients = chopIngredients(order.ingredients)
    async let meat = marinate(meat: .chicken)
    let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
    return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
  • 부모 Task를 cancel 하면, 자식 Task는 isCancelled 플래그가 true가 될뿐임
  • 그래서 Task는 cancel 되고 즉시 멈추지는 않을 수 있음
  • 위 코드에서 1번 지점 전에 cancel이 호출 되었다면, throw 할 것임
  • 하지만 2번 지점 이후 cancel이 호출 되었다면, 그대로 진행될것임
@MainActor
class MyDelegate: UICollectionViewDelegate {
    var thumbnailTasks: [IndexPath: Task<Void, Never>] = [:]
    
    func collectionView(_ view: UICollectionView, willDisplay cell: UICollectionViewCell, forItemAt item: IndexPath) {
        let ids = getThumbnailIDs(for: item)
        thumbnailTasks[item] = Task {
            defer { thumbnailTasks[item] = nil }
            let thumbnails = await fetchThumbnails(for: ids)
            display(thumbnails, in: cell)
        }
    }
    
    func collectionView(_ view: UICollectionView, didEndDisplay cell: UICollectionViewCell, forItemAt item: IndexPath) {
        thumbnailTasks[item]?.cancel()
    }
}
  • 위 코드는 unstructured Task를 딕셔너리에 담고 있다가, 취소하는 코드
@MainActor
class MyDelegate: UICollectionViewDelegate {
    var thumbnailTasks: [IndexPath: Task<Void, Never>] = [:]
    
    func collectionView(_ view: UICollectionView, willDisplay cell: UICollectionViewCell, forItemAt item: IndexPath) {
        let ids = getThumbnailIDs(for: item)
        thumbnailTasks[item] = Task {
            defer { thumbnailTasks[item] = nil }
            let thumbnails = await fetchThumbnails(for: ids)
            Task.detached(priority: .background) {
                writeToLocalCache(thumbnails)
            }
            display(thumbnails, in: cell)
        }
    }
}
  • 위 코드는 Detach Task를 통해 별도로 백그라운드에서 Task를 실행하는 코드
  • 캐싱은 Main Thread에서 할 필요가 없고, 다른 썸네일이 fetch에 실패해도 캐싱 안할 이유가 없음
@MainActor
func example() {
    // MainActor context를 상속 받음
    Task {
        for i in 1...10000 {
            print("In Task 1: \(i)")
        }
    }
    
    // context를 상속 받지 않음
    Task.detached {
        for i in 1...10000 {
            print("In Task 2: \(i)")
        }
    }
}
  • Task vs Task.detached
  • Task는 MainActor, 부모 Task 등의 context 상태를 상속받아 실행되는 비동기 작업
  • Task.detached는 상속받는 context 없이 완전히 독립적으로 동작하는 태스크를 생성

주의할점

  • Concurrency를 사용하는 것은 추가적인 리소스 비용이 듬
  • Concurrency의 이점이 리소스 비용을 넘어설때만 사용하는 것을 추천
  • await 이전의 코드를 실행한 쓰레드가 계속해서 continuation 코드를 실행할 것이라는 보장은 없음
  • 작업을 자발적으로 스케줄링에서 제외함으로써 atomic을 깨뜨림
  • await를 사용하는 동안 Lock을 유지할 수 없음
  • await를 지나면 쓰레드 전용 데이터가 유지되지 않음

Concurrency +

async let

  • 아래와 같은 코드는 한 번에 하나의 비동기 코드만 실행
  • 비동기 코드가 실행되는 동안, 호출자는 다음 코드를 실행하기 전에 해당 코드가 완료될 때까지 기다림
func fetchOneThumbnail(withID id: String) async throws -> UIImage {
    let imageReq = imageRequest(for: id), metadataReq = metadataRequest(for: id)
    
    let (data, _) = try await URLSession.shared.data(for: imageReq)
    let (metadata, _) = try await URLSession.shared.data(for: metadataReq)
    
    guard let size = parseSize(from: metadata),
          let image = UIImage(data: data)?.byPreparingThumbnail(ofSize: size) else {
        throw ThumbnailFailedError()
    }
    
    return image
}
  • 비동기 함수를 호출하고 주변 코드와 병렬로 실행 하는것도 가능
  • 정의할 때 let 앞에 async를 작성한 다음 상수를 사용할때 await를 사용
func fetchOneThumbnail(withID id: String) async throws -> UIImage {
    let imageReq = imageRequest(for: id), metadataReq = metadataRequest(for: id)
    
    async let (data, _) = URLSession.shared.data(for: imageReq)
    async let (metadata, _) = URLSession.shared.data(for: metadataReq)
    
    guard let size = parseSize(from: try await metadata),
          let image = try await UIImage(data: data)?.byPreparingThumbnail(ofSize: size) else {
        throw ThumbnailFailedError()
    }
    
    return image
}
  • async let을 사용하면 실제 함수는 바로 실행 되는것을 알 수 있음
  • await을 사용하면 실행을 기다리지는 않지만, 값이 올때까지 기다림
func execute() async {
    // 실제 실행은 되지만 기다리지는 않음
    async let one = sleepOne()
    async let two = sleepTwo()
    
    print("await: ", await one) // one 대기
    print("await: ", await two) // two 대기
}

func sleepOne() async -> Int {
    try? await Task.sleep(for: .seconds(1))
    print("sleepOne completed")
    return 1
}

func sleepTwo() async -> Int {
    try? await Task.sleep(for: .seconds(2))
    print("sleepTwo completed")
    return 2
}

withTaskGroup

  • TaskGroup으로 동적으로 병렬 작업을 합쳐 모든 작업이 완료되면 결과를 반환 받을 수 있음
  • withTaskGroup, withThrowingTaskGroup
let images = await withTaskGroup(of: UIImage.self, returning: [UIImage].self) { taskGroup in
    let photoURLs = await loadPhotoUrls()
    
    for photoURL in photoURLs {
        taskGroup.addTask { await downloadPhoto(url: photoURL) }
    }

    var images = [UIImage]()
    
    for await result in taskGroup {
        images.append(result)
    }
    
    return images
}

withCheckedContinuation

  • 클로저 기반의 completion handler 비동기 처리를 async/await 형태로 바꿀때 자주 사용
  • withCheckedContinuation, withCheckedThrowingContinuation
func fetch(completion: @escaping ([String]) -> Void) {
    let url = URL(string: "https://www.hohyeonmoon.com")!

    URLSession.shared.dataTask(with: url) { data, _, _ in
        if let data {
            if let something = try? JSONDecoder().decode([String].self, from: data) {
                completion(something)
                return
            }
        }
        completion([])
    }.resume()
}

func fetch() async -> [String] {
    await withCheckedContinuation { continuation in
        fetch { something in
            continuation.resume(returning: something)
        }
    }
}

let something = await fetch()
  • 위와 같이 클로저 비동기 처리를 async로 변경 가능
  • continue에서 두 번 이상 resume을 호출하면 안됨
  • resume을 호출하지 않아도 Task가 무기한 일시 중단된 상태로 유지
  • 즉, resume은 정확히 한번 호출되어야 함
class ViewController: UIViewController {
    private var activeContinuation: CheckedContinuation<[Post], Error>?
    func sharedPostsFromPeer() async throws -> [Post] {
        try await withCheckedThrowingContinuation { continuation in
            self.activeContinuation = continuation
            self.peerManager.syncSharedPosts()
        }
    }
}

extension ViewController: PeerSyncDelegate {
    func peerManager(_ manager: PeerManager, received posts: [Post]) {
        self.activeContinuation?.resume(returning: posts)
        self.activeContinuation = nil // 중복 resume 호출 방지
    }

    func peerManager(_ manager: PeerManager, hadError error: Error) {
        self.activeContinuation?.resume(throwing: error)
        self.activeContinuation = nil // 중복 resume 호출 방지
    }
}
  • Delegate 패턴에서 async를 사용할때도 withCheckedContinuation을 사용할수 있음

AsyncStream

  • Swift Concurrency를 데이터 스트림 형태에서 사용할수 있게해줌
  • yield로 스트림에 element를 제공하고, finish로 정상적으로 스트림을 종료
  • AsyncStream, AsyncThrowingStream
class QuakeMonitor {
    var quakeHandler: (Quake) -> Void
    func startMonitoring()
    func stopMonitoring()
}

let quakes = AsyncStream(Quake.self) { continuation in
    let monitor = QuakeMonitor()
    monitor.quakeHandler = { quake in
        continuation.yield(quake)
    }
    continuation.onTermination = { termination in
        switch termination {
        case .finished:
            print("finished")
        case .cancelled:
            print("cancelled")
        }
        
        monitor.stopMonitoring()
    }
    monitor.startMonitoring()
}

let significantQuakes = quakes.filter { quake in
    quake.magnitude > 3
}

for await quake in significantQuakes {
    ...
}

Actor

Data Race

  • 여러 Task가 동시에 일을 하고 있지만 동일한 객체를 참조하기 때문에 발생
  • 동일한 객체에 동시에 접근해서 crash가 나거나, 예상밖의 순서로 작업이 진행됨

actor

  • 공유되는 데이터에 접근해야 하는 여러 Task를 조정하기 위해 존재
  • 데이터를 isolate 하고, 한 번에 하나의 Task만 내부 상태를 변경하도록 허용
  • 동시 변경으로 인한 Data Race를 피함
  • Actor는 mutable state를 shared 한다는 목적이기 때문에 class와 같은 reference 타입
actor SharedWallet {
    let name = "공유 지갑"
    var amount = 0
    
    init(amount: Int) {
        self.amount = amount
    }
    
    func spendMoney(ammount: Int) {
        self.amount -= ammount // 1
    }
}

Task {
    let wallet = SharedWallet(amount: 10000)
    let name = wallet.name // 2
    let amount = await wallet.amount // 3
    await wallet.spendMoney(ammount: 100) // 4
    await wallet.amount += 100 // 5
}
  • 1: actor 내부에서 변수 접근시 await 불필요
  • 2: 상수는 변경 불가능하기 때문에 actor 외부에서도 바로 접근 가능
  • 3: actor 외부에서 변수 접근시 await 필요
  • 4: actor 외부에서 메서드 호출시 await 필요
  • 5: 컴파일 에러, actor 외부에서 actor 내부의 변수를 변경할 수 없음

Sendable

  • Sendable 프로토콜을 준수하는 타입은 actor 간에 값을 공유할 수 있음
  • Value 타입은 Sendable, Actor 타입도 Sendable
  • Immutable한 데이터만 있는 Class는 Sendable
  • 내부적으로 synchronized 되어있는 Class는 Sendable
  • @Sendable로 표시된 함수도 actor 간에 공유 가능
  • 클로저도 마찬가지로 @Sendable 할 수 있음
// Sendable Class
final class User: Sendable {
    let id: Int
    let name: String

    init(id: Int, name: String) {
        self.id = id
        self.name = name
    }
}

// Sendable closure
actor Calculator {
    func calculate(operation: @Sendable () -> Int) -> Int {
        return operation()
    }
}

let calc = Calculator()

Task {
    let result = await calc.calculate {
        return 2 + 3
    }
    
    print(result) // 5
}

MainActor

  • MainActor: main thread를 나타내는 특별한 global actor
  • await이 사용되었다는 것은, 해당 thread에서 다른 코드가 실행될 수 있도록 실행 중인 함수가 중지될 수 있다는 의미
await MainActor.run {
    // UI 관련 코드 1
}
await MainActor.run {
    // UI 관련 코드 2
}
  • 따라서 메인 스레드에서 한꺼번에 작업이 이뤄지길 원하는 경우에는 관련 함수를 run block에 그룹화 해야함
  • 그래야 해당 함수들 사이에는 일시 중단 없이 호출이 실행되도록 할 수 있음
await MainActor.run {
    // UI 관련 코드 1
    // UI 관련 코드 2
}
  • Main Thread에서 실행되어야 하는 코드를 @MainActor 표시로 타입, 함수, 클로저, 프로퍼티 등에 적용 가능
  • class, struct, enum 같은 type에 붙으면, 내부에 있는 모든 property와 method가 isolated 되고 main thread에서 동작
actor SomeActor {
    let id = UUID().uuidString
    @MainActor var myProperty: String
    
    init(_ myProperty: String) {
        self.myProperty = myProperty
    }
    
    @MainActor func changeMyProperty(to newValue: String) {
        self.myProperty = newValue
    }
    
    func changePropertyToName() {
        Task { @MainActor in
            myProperty = "hohyeon"
        }
    }
}

참고