Post

Concurrent모듈을 활용하여 병렬/비동기 처리

Concurrent모듈을 활용하여 병렬/비동기 처리

비동기

concurrent.futuers모듈

Python에서 병렬작업을 쉽게 처리할 수 있도록 도와주는 고수준의 Interface를 제공. 이 모듈은 Thread Pool또는 Process Pool에 제출하여 비동기 실행을 관리

주요 구성요소

1. Executoer 클래스
  • concurrent.futures.Executor는 추상클래스이며, 병렬 실행환경을 제공
  • 두가지 주요 서브클래스틀 제공
    • ThreadPoolExecutor : 스레드를 사용하여 병렬 실행을 처리
    • ProcessPoolExecutor : 프로세스 사용하여 병렬 실행 처리
2. Future객채
  • 작업이 완료될때 결과를 나타나는 객체
  • Future객체를 통해
3. 비동기 작업 제출 및관리
  • submit() 또는 map() 메서드를 사용하여 작업을 제출
  • as_completedwait 함수로 작업의 완료상태를 관리

주요 메서드 및 함수

  1. submit() : 작업을 비동기로 제출하여 Future객체를 반환
1
executor.submit(func, *args, **kwargs)
  1. map() : 1. submit : 작업을 비동기로 제출하여 Future객체를 반환
1
executor.map(func, iterable, timeout=None)
  1. as_completed() : 1. submit : 작업 완료 순서대로 Future객체를 반환
1
as_completed(futures, timeout=None)
  1. wait() : 1. submit : 작업이 완료되거나 취소될 떄까지 대기
1
wait(futures, return_when)

예제

ThredPoolExecutor사용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def task(n):
    time.sleep(n)
    return f"Task {n} completed"

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in [3, 1, 2]]

    for future in as_completed(futures):  # 완료된 순서대로 처리
        print(future.result())


ProcessPoolExecutor사용

1
2
3
4
5
6
7
8
9
from concurrent.futures import ProcessPoolExecutor
import os

def square(n):
    return n * n

with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(square, range(10))
    print(list(results))  # [0, 1, 4, 9, ..., 81]

스크립트

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from google.cloud import asset_v1
from google.cloud import resourcemanager_v3
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import time

# Google Cloud 클라이언트 초기화
FOLDER_CLIENT = resourcemanager_v3.FoldersClient()
ASSET_CLIENT = asset_v1.AssetServiceClient()

folder_cache = {}


def fetch_folder_name(folder_id):
    if folder_id in folder_cache:  # 캐시확인
        return folder_cache[folder_id]

    try:
        time.sleep(0.1)  # 속도 제한
        folder = FOLDER_CLIENT.get_folder(name=f"folders/{folder_id}")
        folder_name = folder.display_name
        folder_cache[folder_id] = folder_name
        return folder_name
    except Exception as e:
        print(f"Error retrieving folder {folder_id}: {e}")
        return None


def get_folder_names(folder_id_list):
    folder_names = {}

    # 멀티스레딩: 최대 5개의 스레드 사용
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 태스크 제출: fetch_folder_name 호출
        # future_to_id : dictionary
        # Future객체와 (idx,folder_id) 튜플을 매핑한 dict
        # Future객체를 key로 사용하여 해당 작업과 관련된 idx,folder_id를 값으로 저장

        future_to_id = {
            executor.submit(fetch_folder_name, folder_id): (idx, folder_id)
            for idx, folder_id in enumerate(folder_id_list)
        }

        # 완료된 태스크에서 결과 수집
        # 완료된 작업의 결과를 순차적으로 가져옴
        for future in as_completed(future_to_id):
            idx, folder_id = future_to_id[future]  # 인덱스와 ID 가져오기 (위에서 저장한 tuple)
            try:
                name = future.result()  # fetch_folder_name의 반환값
                if name:  # 이름이 유효한 경우만 추가
                    folder_names[f"folder_{idx}_name"] = name
            except Exception as e:
                print(f"Error in future for folder {folder_id}: {e}")

    return folder_names

함수 1: fetch_folder_name(folder_id)

  • 역할: 주어진 folder_id에 대해 Google Cloud API를 호출하여 폴더의 display name을 가져옵니다. 가져온 이름은 캐시에 저장하여, 동일한 folder_id에 대해 반복적으로 API를 호출하지 않도록 합니다.

  • 주요 동작:
  • 캐시 확인: folder_id가 이미 folder_cache에 저장되어 있으면, 저장된 이름을 반환합니다. 캐시를 통해 불필요한 API 호출을 줄이고 응답 속도를 높입니다.
  • API 호출: FOLDER_CLIENT.get_folder를 사용해 Google Cloud API로 폴더 정보를 가져옵니다. folder.display_name 속성을 추출하여 폴더 이름을 반환합니다.

  • 에러 처리: API 호출 실패 시 예외를 출력하고 None을 반환합니다.

함수 2: get_folder_names(folder_id_list)

  • 역할: 폴더 ID 목록을 입력받아 각 폴더의 이름을 가져오고, 결과를 딕셔너리로 반환합니다. 반환 형식은 {“folder_{idx}_name”: folder_name}입니다.

concurrent.futures vs 멀티스레딩/멀티프로세싱 모듈

Featureconcurrent.futuresthreading / multiprocessing
추상화 수준고수준 (더 쉽고 간결)저수준 (세부 제어 가능)
스레드/프로세스 관리자동 (풀 관리)수동 (직접 생성 및 관리)
적합한 작업 유형병렬 작업 (I/O 또는 CPU)더 복잡한 동작 필요 시
코드 복잡성낮음 (간단한 API로 작업 가능)높음 (스레드/프로세스 수동 제어 필요)
성능 튜닝제한적 (풀 크기 등 기본 제공 수준)유연함 (정교한 설정 및 최적화 가능)
에러 처리내장된 에러 추적 및 재시도 기능 제공수동으로 처리 코드 구현 필요
유지보수성좋음 (표준화된 인터페이스)복잡한 로직일 경우 유지보수 어려움
사용 사례단순 병렬 실행, 작업 큐 처리고급 병렬 처리, 사용자 정의 워크플로우

참고

This post is licensed under CC BY 4.0 by the author.