BRICKSTUDY

[viral 탐지 프로젝트] 데이터 파이프라인 구축기 #1 본문

Data

[viral 탐지 프로젝트] 데이터 파이프라인 구축기 #1

kimseozero 2024. 9. 22. 09:44
더보기
본 프로젝트는 학업의 목적으로 진행되었으며 어떠한 금전적 이익을 취하지 않습니다.
바이럴 역시 널리 사용되는 마케팅 방법 중 하나로, 해당 프로젝트에서는 이에 어떠한 가치평가도 내리지 않습니다.

Introduction

안녕하세요? 글을 한번 날리고 정신건강도 함께 날아간 김서영입니다. 들어가기 앞서 저장을 생활화하자 구호를 크게 외쳐봅니다.

 

공용 데이터 플랫폼 구축기 1편을 올린 이후 시간이 훌쩍 흘렀는데요. 그동안 바이럴 탐지 프로젝트를 시작하며 즐거운 시간들을(?) 보내고 있었습니다. 에어플로우와(+스택오버플로우) 달달한 시간들을 함께하며 글감들을 많이 만들어왔으니 앞으로 3편의 포스팅으로 차근차근 풀어보겠습니다. 아래는 각 3편의 글들의 오버뷰입니다.

 

2편. 브랜드 수집 모듈 (related #PR14)

- Web Scrapping module

- Multiprocessing in Airflow with PythonVirtualOperator

 

3편. 데이터 수집 파이프라인 (related #PR17)

- Kafka

- Kafka Connect

 

4편. SNS 데이터 크롤러 (related #PR20)

- Twitter Crawler) Airflow DockerOperator

- Instagram Crawler

- Future Job

     - Twitter) Dynamic Task Mapping
     - Instagram) Selenium remote webdriver with DockerOperator and Docker network

 

Background

지난 시간에 승빈님께서 바이럴탐지 프로젝트에 대해 잘 소개해주셨는데요. 저는 이번 포스팅에서 (1) 코스메틱 브랜드 정보를 수집하는 모듈을 개발하고 (2) Airflow 를 통해 해당 모듈을 실행시켜 데이터를 수집 및 적재(EL)하는 DAG를 정의한 weekly 데이터 파이프라인을 소개하겠습니다.

 

프로젝트에 필요한 데이터를 수집하는 파이프라인을 개발하는 업무를 담당하면서 제가 수집해야 할 데이터는 크게 두 가지였습니다.

1. 코스메틱 브랜드 정보

2. SNS에서 수집된 브랜드를 언급한 컨텐츠

 

1번 화장품 브랜드 정보 데이터는 당연하게도 k-beauty의 중심 올리브영으로부터 수집했습니다. 한국의 왠만한 화장품 브랜드가 입점하고 있고, 이 브랜드 정보가 잘 카테고리화되어있어 

 

---

현재 파이프라인의 인프라 구조는 아래와 같습니다.

viral 프로젝트 데이터 파이프라인

 

2편의 개발 범위는 Kafka와 Kafka connect 인프라를 포함하지 않는데요. 데이터를 수집하는 과정에서 인프라가 추가되면 좋겠다는 판단 하에 추가하게 되었습니다. 이와 관련된 자세한 컨텍스트는 3편에서 풀어가겠습니다.

 

---

Oliveyoung scrapping module

어떤 브랜드를 SNS에서 살펴볼지를 정해야 하는데요. 이를 위해 먼저 한국의 코스메틱 브랜드를 수집하는 컴포넌트를 개발했습니다.

 

코스메틱 브랜드 정보를 올리브영에서 웹 스크래핑하는 Brand 클래스입니다.

Brand 클래스는 데이터를 담는 brand_metadata 필드, 수집 동작이 정의된 crawl_brand_metadata(), crawl_items() 메소드 이렇게 세 개의 주요 속성을 갖습니다.

 

1. brand_metadata

사전에 팀원과 논의한대로 브랜드 메타테이블의 속성들을 명확하게 정의해두고 데이터의 일관성과 안정성 측면을 고려했을 때 Dataclass를 정의해두는 것이 좋을 것 같다고 판단했습니다. 브랜드 이름을 key, OliveyoungBrand라는 데이터 객체를 value로 갖는 Hashmap을 필드로 정의하고, 이 객체에 각 브랜드의 메타정보들이 저장되도록 만들었습니다.

초반에 데이터가 수집되는 자료구조를 데이터클래스로 지정해둔 덕분에 수집 코드를 명확하고 깔끔하게 유지할 수 있었고, 수집된 데이터를 파악하여 오류 발생 지점을 파악하거나 유지보수를 하기에 수월했습니다.

Brand class's instance field `brand_metadata` - 수집된 데이터 저장

2. crawl_brand_metadata()

올리브영 랜딩페이지 상단 배너에는 전체 카테고리 탭이 있고, 각 카테고리 페이지로 넘어가야 카테고리에 해당하는 브랜드명 정보를 얻을 수 있습니다. Brand 클래스의 crawl_brand_metadata() 메소드는 랜딩페이지에서 각 카테고리의 url정보를 뽑아 순차적으로 방문하여 해당 카테고리에 존재하는 브랜드 정보를 파싱합니다.

Brand class's method `crawl_brand_metadata()` - 브랜드 메타정보 수집 후 brand_metadata 에 저장

3. crawl_items()

앞에서 수집한 brand_shop_detail_url, 즉 각 브랜드 페이지 url에서 아이템 정보를 수집합니다. brand_metadata에 있는 브랜드를 기준으로 데이터가 수집됩니다.

Brand class's method `crawl_items()` - 아이템 정보 수집 후 brand_metadata 에 저장

 

 

이제 Brand 클래스를 실행시켜서 데이터를 수집하고 brand_metadata를 공용 스토리지로 사용하고 있는 S3에 적재해야 하는데요. 해당 작업은 일주일에 한 번씩 주기적으로 실행되어야겠고, 단순 웹 페이지의 html을 받아와서 파싱하는 동작이기 때문에 crawl_items메소드가 브랜드 단위로 concurrent하게 실행된다면 빠르게 데이터를 수집해올 수 있겠네요.

super이끌리는 어떤 툴

 

Airflow DAG

Airflow 클러스터는 브릭스터디에서 공용으로 사용하고 있는 데스크탑 로컬에 docker를 사용하여 각 데몬이 컨테이너화하여 운영합니다. Airflow 2.6.2, python 3.7을 사용하고 있고, Scheduler, Worker 1개, Celery Executor, Webserver가 각각 도커 컨테이너 안에 띄워져있습니다. 메타데이터 저장소는 PostgreSQL을, Celery Executor의 작업 큐잉은 Redis를 사용하고, 두 데이터베이스 역시 각자의 컨테이너 안에서 실행됩니다.

 

이제 데이터 수집 DAG를 작성해보겠습니다. 앞 단계에서 데이터 수집 클래스를 추상화해놓았기 때문에, 수집 함수 scrapping() 코드는 간단합니다. scrapping 함수를 callable로 받는 PythonOperator task를 만들고 실행 interval을 weekly로 설정한 DAG를 생성하면 Airflow를 사용해 데이터 수집을 스케줄링, 자동화할 수 있습니다.

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

DAG_ID = "bronze_viral_oliveyoung"
TARGET_PLATFORM = "brand"

# Set aiflow setting
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
}

def scrapping():
        brands = Brand()
        brands.crawl_brand_metadata()
        brands.crawl_items()
        
with DAG(
        dag_id=DAG_ID,
        default_args=default_args,
        schedule_interval='@weekly',
        catchup=False
):
        task = PythonOperator(
                task_id='get_brand_metadata',
                python_callable=scrapping
        )
        task

crawl_brand_metadata() 까지 수행된 brand.brand_metadata는 아래와 같은 자료구조를 갖습니다.

# brands.brand_metadata 형태
{
    'brand_a': OliveyoungBrand('query_keyword': [{keywords}], 'brand_shop_detail_url': 'https://..', 'items': [], 'category': [{categories_that_the_brand_belongs_to}], 'released_date': ''),
    'brand_b': OliveyoungBrand('query_keyword': [{keywords}], 'brand_shop_detail_url': 'https://..', 'items': [], 'category': [{categories_that_the_brand_belongs_to}], 'released_date': ''),
    ...
    'brand_n': OliveyoungBrand('query_keyword': [{keywords}], 'brand_shop_detail_url': 'https://..', 'items': [], 'category': [{categories_that_the_brand_belongs_to}], 'released_date': '')
}

따라서 아래와 같이 브랜드 단위로도 Brand 인스턴스를 생성하여 아이템 정보를 수집할 수 있습니다.

def scrapping():
        brands = Brand()
        brands.crawl_brand_metadata()

        for brand_dic in brands.brand_metadata.items():
                brand = Brand(brand_dic)
                brand.crawl_items()

airflow task 실행시간 최적화

기껏 브랜드 별 독립 실행이 되도록 만들어둔 당신 병렬 처리 참을건가요?

 

위의 방법들은 전체 iteration이 단일 task 인스턴스 내에서 순차적으로 진행됩니다. 즉 동시성을 전혀 활용하지 못하고 있는데, 함수가 http 호출과 respond로 받아온 html을 파싱하는 작업을 수행한다는 점에서 동시성 제어를 통해 실행 속도를 충분히 개선할 수 있다고 생각했습니다. 어떻게 할 수 있을까요?

 

[아이템 수집 작업을 브랜드 단위로 분리하여 병렬로 처리하면 더 효율적이다.] 라는 가설을 세우고 가장 먼저 고민이 든 것은 함수를 멀티프로세싱 할 것이냐, 멀티스레딩 할 것이냐 였습니다. 

 

결론부터 얘기하면 PythonVirtualOperator를 사용하여 multiprocessing으로 브랜드 페이지에서 아이템 정보를 수집해오는 방식으로 개발했습니다. 결론으로 가는 과정은 아래와 같습니다.

 

1. multi-processing vs multi-threading

처음에는 멀티스레딩이 훨씬 효율적인거 아니야? 라고 생각했는대요, 스레드 간 코드 및 데이터 영역 공유, 빠르고 가벼운 context switching ( -> cpu overhead 적음) 등 단순 생각해봐도 프로세스 만드는 것보단 스레드 만드는게 더 빠를거라 생각했기 때문입니다. 하지만 파이썬의 Global Interpreter Lock(Cpython 인터프리터 특성 - 단일 스레드가 한 번에 하나의 bytecode 명령어만 실행하도록 보장한다. Cpython 메모리 관리 방법이 스레드 안정성을 보장하지 않기 때문에 존재하는 특징) 때문에 여러 스레드가 동시에 실행되더라도 실제로는 한번에 하나의 스레드만 실행된다. 는 사실을 알게 되었고, Airflow 역시 CPython환경에서 동작하기에 멀티프로세싱이 적절하겠다 판단했습니다.

multiprocessing모듈의 Pool 사용해서 브랜드들에 대해 get_items 브랜드 상세 페이지로부터 제품 정보를 수집하는 작업이 병렬로 수행되도록 합니다.

import multiprocessing

CONCURRENCY_LEVEL = multiprocessing.cpu_count()
def get_item(data: dict):
    brand = Brand(data)
    brand.crawl_items()
    
def scrapping():
    brand = Brand()
    brand.crawl_brand_metadata()
    with multiprocessing.Pool(CONCURRENCY_LEVEL) as p:
        p.map(get_item, brand.brand_metadata)

 

2. Daemonic processes are not allowed to have children.

그러면 daemonic processes are not allowed to have children 이라는 오류가 발생합니다.

Python에서 데몬 프로세스는 보안 및 안정성을 위해 자식 프로세스를 생성할 수 없는데, 코드가 이를 위반했나봅니다. 그럼 누가 데몬이고 누가 자식일까요?

결론부터 말하면 전자: celery worker가 실행하는 python process, 후자: multiprocessing 모듈이 생성하려는 process 입니다.

 

multiprocessing.Pool 이 작업을 멀티프로세싱하기 위해 데몬 프로세스를 생성하기 때문에, 해당 오류를 구글링하면 저 부분을 오류 원인으로 지적하는 글들이 많아 헷갈렸는데요. multiprocessing.Pool이 생성하는 프로세스가 데몬인지 여부에 관계없이 Celery worker 가 데몬 상태이기 때문에, Celery Worker가 데몬 프로세스로 실행되고 있을 때 subprocess를 생성하려고 시도하는게 문제인 상황입니다.

* Celery Worker가 데몬 프로세스인지 여부는 Celery 설정과 실행 환경에 따라 달라질 수 있습니다. (각자의 인프라 환경 설정 확인 필요)

 

다양한 해결 방법이 있겠지만, PythonVirtualenvOperator 로 별도 가상 환경에서 파이썬 코드를 실행하도록 해서 celery worker의 데몬 프로세스 상태에 관계없이 프로세스가 생성될 수 있게 수정하여 문제를 해결했습니다. 가상 환경을 만들어주면 독립적인 python 환경에서 코드가 실행되어 프로세스 생성 시 상위 celery worker의 데몬 프로세스 제한을 받지 않을 수 있기 때문입니다.

 

def entrypoint():
    import logging
    import multiprocess
    from dataclasses import asdict
    from src.scrapper.oliveyoung import Brand

    CONCURRENCY_LEVEL = multiprocess.cpu_count()

    def get_item(data: tuple):
        brand = Brand({data[0]: data[1]})
        brand.crawl_items()

    try:
        brand = Brand()
        brand.crawl_brand_metadata()
        with multiprocess.Pool(CONCURRENCY_LEVEL) as p:
            p.map(get_item, list(brand.brand_metadata.items()))
    except Exception as e:
        logging.error("***entrypoint error***", e)
        raise

with DAG(
    dag_id=DAG_ID,
    default_args=default_args,
    schedule_interval='@weekly',
    catchup=False
):

    task = PythonVirtualenvOperator(
        task_id='get_brand_metadata',
        python_version='3.7',
        system_site_packages=False,
        requirements=["beautifulsoup4==4.9.3", "python-dotenv==0.19.0", "multiprocess", "kafka-python"],
        python_callable=entrypoint
    )

    task

 

task 완전히 독립된 환경에서 실행하는 것이기 때문에 PythonVirtualenvOperator 연산자 설정 파라미터로 필요한 파이썬 라이브러리를 명시해주어야 하고, python_callable 함수 내에 작업에 필요한 모든 의존성들이 명시되어있어야 합니다.

 

---

 

Is it? #1

이후 작업들을 진행하면서 Airflow의 다양한 Operator와 기법들을 배우고 다시 돌아와 바라보니 이후 포스팅에서 다룰 Dynamic task mapping을 사용하면 따로 가상환경을 만들 필요 없이 PythonOperator만으로 더 가볍게 병렬 처리를 할 수 있겠다, 라는 아쉬움도 드네요. 

 

Is it? #2

정리를 하는 과정에서 좀 더 공부하고 알게 된 사실은, multithreading을 했어도 GIL에 관계없이 병렬 실행되었을 것이라는 점입니다.

GIL이 I/O 시점에 해제되기 때문에, CPU bound 작업에서 멀티스레딩은 의도한 parallel execution이 안될지언정 I/O bound 작업에는 성능을 향상할 수 있기 때문입니다. 현재 수행하는 수집 작업은 http request, 파일 생성 즉 I/O bound 작업이기 때문에 thread 기반 concurrency가 가능(을 넘어 더 적절)합니다.

 

현재 수집 로직에 따로 병목이 있는게 아니기 때문에 Future job으로 코드 개선을 남겨두고 계속 이어가보도록 하겠습니다. 스레드 단위 병렬성으로 리팩토링하겠습니다.

---

 

여러모로 최적의 경로가 아닌듯 하지만(가상환경 생성 노드로 내려가지 말고 backtracking을 했어야만..) DFS의 정신으로 일단 더 파고들어보겠습니다.

 

brand.brandmetadata  브랜드의 모든 화장품 아이템들을 위에서 정의했던 dataclass 구조로 예쁘게 갖고있으니 이제 어디론가 저장하면 되겠습니다. host 볼륨 설정이 되어있는 airflow 컨테이너 로컬에 json 파일로 저장합니다. 가장 쉽고 간단하죠 !

설정한 경로에 브랜드 별로 json 파일들이 저장되고, 수집 task 끝나면 브랜드 수만큼 생성된 json 파일들을 하나의 파일로 합쳐서 bronze레이어 virial/oliveyoung/{scrapped_date)/ 아래에 적재합니다.

 

	task1 = PythonVirtualenvOperator(..)
    ...
	task2 = BashOperator(
        task_id='merge_json_files_into_single_json_file',
        bash_command="""
        jq -s 'flatten' {{ BRAND_JSON_FILE_PATH }}/*.json > {{ MERGED_JSON_FILE }}
        """,
        env={
            'BRAND_JSON_FILE_PATH': BRAND_JSON_FILE_PATH,
            'MERGED_JSON_FILE': MERGED_JSON_FILE
        }
    )

    task3 = PythonOperator(
        task_id="upload_brand_json_file_to_s3",
        python_callable=upload_to_s3
    )

    task1 >> task2 >> task3

 

 

무지에서 비롯된 의사결정에서 비롯된 삽질과 아쉬움이 남는 과정이네요. 그렇지만 이 삽질이 경험으로 남아야 다음에 더 나은 결정을 할 수 있지 않겠습니까? 셀프 위로와 리팩토링을 TODO풀에 넣고 이번 편을 마치겠습니다.

 

다음 편 예고

?? : airflow 로컬쪽에 파일 저장이요....?..😑