반응형
아래의 코드는 aiohttp 라이브러리를 사용하여 비동기 HTTP 요청을 처리하는 예제입니다. 주요 구성요소는 다음과 같습니다:
timeit 데코레이터: 비동기 함수의 실행 시간을 측정합니다. 함수를 감싸고, 실행 전후의 시간을 측정하여 실행 시간을 출력합니다.
nest_asyncio.apply(): 일부 환경(예: Jupyter 노트북)에서 발생할 수 있는 asyncio 루프 문제를 해결합니다.
fetch_url 함수: 지정된 URL('https://www.naver.com')을 비동기적으로 요청합니다. 요청이 실패하거나 200 상태 코드가 아닌 경우 지정된 횟수만큼 재시도합니다.
main 함수: fetch_url 함수를 병렬로 실행하여 여러 HTTP 요청을 동시에 처리합니다. 총 3000번의 요청
을 100개씩 나누어 처리하고, 각 청크가 완료될 때마다 상태를 출력합니다.
- 이벤트 루프: asyncio의 이벤트 루프를 사용하여 main 함수를 실행합니다. 이벤트 루프는 비동기 작업을 관리하고 실행 순서를 조정합니다.
이 코드는 비동기 프로그래밍의 기본적인 패턴을 보여줍니다. 특히, HTTP 요청을 비동기적으로 처리하고, 실행 시간을 측정하는 방법을 배울 수 있습니다.
import aiohttp
import asyncio
import nest_asyncio
import time
from functools import wraps
def timeit(func):
# 데코레이터 함수 timeit 정의. 함수 실행 시간을 측정합니다.
@wraps(func)
async def wrapper(*args, **kwargs):
# 비동기 함수를 감싸는 래퍼 함수.
start_time = time.perf_counter() # 시작 시간 기록
result = await func(*args, **kwargs) # 비동기 함수 실행
end_time = time.perf_counter() # 종료 시간 기록
print(f"{func.__name__} took {end_time - start_time:.2f} seconds to execute.") # 실행 시간 출력
return result
return wrapper
nest_asyncio.apply() # Jupyter 노트북과 같은 환경에서 asyncio 루프 문제를 해결
status_200_count = 0 # HTTP 200 상태 코드 수를 세는 전역 변수
async def fetch_url(retries=5, delay_seconds=2):
# 주어진 URL을 재시도 횟수에 따라 비동기적으로 요청하는 함수.
global status_200_count # 전역 변수 사용 선언
async with aiohttp.ClientSession() as session: # aiohttp 세션 시작
for attempt in range(retries): # 지정된 재시도 횟수만큼 반복
try:
async with session.get('https://www.naver.com') as response: # URL 요청
if response.status == 200:
status_200_count += 1 # 200 상태 코드인 경우 카운트 증가
return # 함수 종료
else:
pass
# 응답 상태가 200이 아닌 경우 주석 처리된 부분 활성화 가능
#print(f"Status code {response.status}, retrying... ({attempt + 1}/{retries})")
except Exception as e:
print(f"An error occurred: {e}, retrying... ({attempt + 1}/{retries})") # 예외 발생시 로깅
if attempt < retries - 1: # 마지막 시도가 아닌 경우에만 지연
await asyncio.sleep(delay_seconds) # 지정된 시간만큼 대기
print("Max retries reached, exiting.") # 최대 재시도 횟수 도달시 메시지 출력
@timeit
async def main():
# 메인 비동기 함수. fetch_url 함수를 병렬로 실행합니다.
chunk_size = 100 # 한 번에 실행할 작업의 크기
for i in range(0, 3000, chunk_size): # 3000회를 chunk_size만큼의 작업으로 나누어 실행
tasks = [fetch_url() for _ in range(chunk_size)] # 작업 리스트 생성
await asyncio.gather(*tasks) # 모든 작업을 동시에 실행
print(f"Chunk {i // chunk_size + 1} completed") # 각 청크 완료시 메시지 출력
time.sleep(1) # 1초 대기
print(f"Total 200 status responses: {status_200_count}") # 총 200 상태 응답 횟수 출력
loop = asyncio.get_event_loop() # 이벤트 루프 가져오기
loop.run_until_complete(main()) # 메인 함수 실행까지 이벤트 루프 실행
반응형
댓글