Skip to content

Latest commit

Β 

History

History
115 lines (89 loc) Β· 3.65 KB

File metadata and controls

115 lines (89 loc) Β· 3.65 KB

59. λ™μ‹œμ„±μ„ μœ„ν•΄ μŠ€λ ˆλ“œκ°€ ν•„μš”ν•œ κ²½μš°μ—λŠ” ThreadpoolExecutor λ₯Ό μ‚¬μš©ν•˜λΌ

1. concurrent.futures 의 ThreadpoolExecutor λ‚΄μž₯ λͺ¨λ“ˆ

  • Thread 와 Queue λ₯Ό μ‚¬μš©ν•œ μ ‘κ·Ό λ°©λ²•λ“€μ˜ μž₯점을 μ‘°ν•©ν•˜μ—¬ 병렬 I/O 문제 ν•΄κ²°
ALIVE = '*'
EMPTY = '-'

class Grid:
    ...
    
class LockingGrid(Grid):
    ...
    
def count_neighbors(y, x, get):
    ...
    
def game_logic(state, neighbors):
    ...
    # μ—¬κΈ°μ„œ λΈ”λ‘œν‚Ή# I/Oλ₯Ό μˆ˜ν–‰ν•œλ‹€
    data = my_socket.recv(100)
    ...
    
def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)
  • Grid 의 각 셀에 λŒ€ν•΄ μƒˆ Thread μΈμŠ€ν„΄μŠ€λ₯Ό μ‹œμž‘ν•˜λŠ” λŒ€μ‹  ν•¨μˆ˜λ₯Ό μ‹€ν–‰κΈ°(executor)에 μ „λ‹¬ν•˜μ—¬ νŒ¬μ•„μ›ƒ
  • μ‹€ν–‰κΈ°λŠ” 전달받은 ν•¨μˆ˜λ₯Ό λ³„λ„μ˜ μŠ€λ ˆλ“œμ—μ„œ μˆ˜ν–‰
  • λ‚˜μ€‘μ— νŒ¬μΈν•˜κΈ° μœ„ν•΄ λͺ¨λ“  μž‘μ—…μ˜ κ²°κ³Όλ₯Ό κΈ°λ‹€λ¦Ό
from concurrent.futures import ThreadPoolExecutor

def simulate_pool(pool, grid):
    next_grid = LockingGrid(grid.height, grid.width)
    futures = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            future = pool.submit(step_cell, *args) # νŒ¬μ•„μ›ƒ
            futures.append(future)
            
    for future in futures:
        future.result()                            # 팬인
        
    return next_grid
class ColumnPrinter:
    ...
                        
grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
with ThreadPoolExecutor(max_workers=10) as pool:
    for i in range(5):
        columns.append(str(grid))
        grid = simulate_pool(pool, grid)
        
print(columns)

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------
  • μ‹€ν–‰κΈ°λŠ” μ‚¬μš©ν•  μŠ€λ ˆλ“œλ₯Ό 미리 ν• λ‹Ή
    • simulate_pool 을 μ‹€ν–‰ν•  λ–„ λ§ˆλ‹€ μŠ€λ ˆλ“œ μ‹œμž‘ λΉ„μš© κ°μ†Œ
  • 병렬 I/O 문제 ν•΄κ²° μœ„ν•œ Thread 둜 μΈν•œ λ©”λͺ¨λ¦¬ λΆ€μ‘± 문제λ₯Ό μŠ€λ ˆλ“œ 풀에 μ‚¬μš©ν•  μŠ€λ ˆλ“œ μ΅œλŒ€ 개수λ₯Ό μ§€μ •ν•˜μ—¬ ν•΄κ²°
    • max_workers νŒŒλΌλ―Έν„° μ‚¬μš©
  • **ThreadPoolExecutor μž₯점**
    • submit λ©”μ†Œλ“œκ°€ λ°˜ν™˜ν•˜λŠ” Future μΈμŠ€ν„΄μŠ€μ— λŒ€ν•΄ result λ©”μ†Œλ“œλ₯Ό ν˜ΈμΆœν•˜λ©΄ μŠ€λ ˆλ“œλ₯Ό μ‹€ν–‰ν•˜λŠ” 쀑에 λ°œμƒν•œ μ˜ˆμ™Έ μžλ™ propagation
def game_logic(state, neighbors):
    ...
    raise OSError('I/O 문제 λ°œμƒ')
    ...
    
with ThreadPoolExecutor(max_workers=10) as pool:
    task = pool.submit(game_logic, ALIVE, 3)
    task.result()

>>>
Traceback ...
OSError: I/O 문제 λ°œμƒ
  • count_neighbors ν•¨μˆ˜μ— I/O 병렬성을 μ œκ³΅ν•΄μ•Ό ν•˜λŠ” 경우
    • ThreadPoolExecutor κ°€ step_cell 의 μΌλΆ€λΆ„μœΌλ‘œ 두 ν•¨μˆ˜λ₯Ό 이미 λ™μ‹œμ— μ‹€ν–‰ν•˜κ³  μžˆμœΌλ―€λ‘œ λ³€κ²½ ν•„μš” μ—†μŒ
  • 단점
    • ThreadPoolExecutor κ°€ μ œν•œλœ 수의 I/O λ³‘λ ¬μ„±λ§Œ μ œκ³΅ν•¨
      • max_workers λ₯Ό 100으둜 섀정해도 λ™μ‹œ I/O κ°€ ν•„μš”ν•œ κ·Έλ¦¬λ“œμ— 10000개 μ΄μƒμ˜ 셀을 넣을 경우 ν™•μž₯ λΆˆκ°€

2. 정리

  • ThreadPoolExecutor λŠ” 비동기적인 해법이 μ‘΄μž¬ν•˜μ§€ μ•Šμ€ 상황을 μ²˜λ¦¬ν•˜λŠ” 경우 쒋은 방법
  • λ§Žμ€ 경우 I/O 병렬성을 μ΅œλŒ€ν™”ν•  수 μžˆλŠ” 더 λ‚˜μ€ 방법 쑴재(Corutine)