본문 바로가기
카테고리 없음

aiohttp, asyncio 법무부 생활법률지식 정보 공공 데이터 크롤링

by 퍼포먼스마케팅코더 2023. 8. 14.
반응형

법무부 생활법률지식 정보 공공 데이터 크롤링이다. 아래와 같은 방식으로 작성을 했는데, 아무래도 requests 보다는 aiohttp, asyncio 등을 통해 수정 변경해서 작성해보았다. 종합적으론 특정 URL에서 XML 데이터를 추출하고 해당 데이터를 DataFrame으로 변환하여 CSV 파일로 저장하는 작업을 수행합니다. 비동기 프로그래밍을 사용하여 여러 페이지에서 데이터를 동시에 추출하고 병합한 코드이다. 실행 결과는 약 50초 정도에 약 21,000개 의 데이터를 바로 csv 에 가져올 수 있다. 물론 테스트를 통해 더 수정 변경해야될 게 많지만.. 여기까지만 테스트 해보는걸로..

 

https://www.data.go.kr/tcs/dss/selectApiDataDetailView.do?publicDataPk=15028606

 

from collections import OrderedDict
from typing import Any, Dict, List, Union
import xml.etree.ElementTree as ET
import aiohttp
import asyncio
import pandas as pd
import time

# 주어진 OrderedDict 객체를 중첩 딕셔너리로 변환하는 함수
def convert_to_dict(ordered_dict: OrderedDict) -> Dict[str, Any]:
    return {key: convert_to_dict(value) if isinstance(value, OrderedDict) else value for key, value in ordered_dict.items()}

# 주어진 URL에서 비동기 요청을 실행하고 응답을 DataFrame으로 변환하는 함수
async def fetch(session: aiohttp.ClientSession, url: str, params: Dict[str, str], retries: int = 5) -> pd.DataFrame:
    for retry in range(retries):  # 재시도 횟수만큼 반복
        try:
            async with session.get(url, params=params) as response:  # 비동기 요청
                xml_response = await response.text()  # 응답을 텍스트로 변환

                # XML 응답을 해석하고 항목을 DataFrame으로 변환
                root = ET.fromstring(xml_response)
                items: List[OrderedDict[str, str]] = []
                for item in root.findall('./body/items/item'):
                    items.append(OrderedDict((child.tag, child.text) for child in item))
                return pd.DataFrame(items)
        except KeyError as e:  # 키 오류 발생 시 재시도
            print(f"KeyError encountered. Retrying... ({retry + 1}/{retries})")
            continue
    raise KeyError(f"Failed after {retries} retries.")  # 모든 재시도 실패 시 오류 발생

# 비동기 작업을 실행하고 결과를 DataFrame으로 병합하는 메인 함수
async def main() -> pd.DataFrame:  
    url = 'https://apis.data.go.kr/1270000/lawedu/lawqna'
    tasks: List[asyncio.Task] = []
    connector = aiohttp.TCPConnector(limit_per_host=10)  # 호스트당 연결 제한
    async with aiohttp.ClientSession(connector=connector) as session:  # 세션 설정
        for page_no in range(1, 211):  # 페이지 숫자 반복
            params = {
                'serviceKey': 'serviceKey',
                'pageNo': str(page_no),
                'numOfRows': '100'
            }
            tasks.append(fetch(session, url, params))
        
        dataframes = await asyncio.gather(*tasks)  # 모든 비동기 작업 수집
        return pd.concat(dataframes, ignore_index=True)  # 결과 DataFrame 병합

if __name__ == '__main__':
    start_time = time.time()  # 시작 시간
    result_df = asyncio.run(main())  # 메인 함수 실행
    end_time = time.time()  # 종료 시간

    # 결과를 CSV 파일로 저장
    result_df.to_csv('C:/Users/user/Desktop/test/test/result.csv', encoding='utf-8-sig', index=False)

    print(f"Execution Time: {end_time - start_time} seconds")  # 실행 시간 출력

 

 

 

 

 

 

 

반응형

댓글