일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- backend
- video understanding
- frontend
- 챗봇
- 디스코드챗봇
- lambda
- Spark
- 디스코드
- Rust
- embodied
- airflow
- Hexagonal Architecture
- data discovery
- Ai
- datahub
- MLFlow
- web3.0
- GPT
- embodied ai
- vln
- discord
- 웹3.0
- databricks
- bricksassistant
- s3
- vision-language navigation
- 디스코드봇
- Golang
- 블록체인
- ChatGPT
- Today
- Total
BRICKSTUDY
[viral 탐지 프로젝트] 데이터 파이프라인 구축기 #2 본문
본 프로젝트는 학업의 목적으로 진행되었으며 어떠한 금전적 이익을 취하지 않습니다.
바이럴 역시 널리 사용되는 마케팅 방법 중 하나로, 해당 프로젝트에서는 이에 어떠한 가치평가도 내리지 않습니다.
Introduction
안녕하세요, 김서영입니다.
2편에 이어 데이터 적재 과정에서 고민한 과정들과 결과를 이어 적어보겠습니다.
2편. 브랜드 수집 모듈 (related #PR14)
- Web Scrapping module
- Multiprocessing in Airflow with PythonVirtualOperator
3편. 데이터 수집 파이프라인 (related #PR17)
- Kafka
- Kafka Connect
4편. SNS 데이터 크롤러 (related #PR20)
- Twitter Crawler) Airflow DockerOperator
- Instagram Crawler
- Future Job
- Twitter) Dynamic Task Mapping
- Instagram) Selenium remote webdriver with DockerOperator and Docker network
Main
airflow 장비에 적재할 데이터를 쌓아놓는게 이렇게 잠깐 프로젝트 할 때나 가능하지 적절한 방법은 아닙니다.
어떤 점에서 문제가 될 수 있을까요?
1. 데이터 일관성, 동시성 보장 곤란, 동기화 필요
airflow는 병렬 처리가 가능하며, 여러 task가 동시에 실행될 수 있습니다. 따라서 여러 airflow 워커나 task instance가 동시에 같은 파일에 write 할 경우, 데이터 손상이나 파일 접근 충돌이 발생하여 일관성을 보장할 수 없습니다. 보장하기 위해 task간 write 작업을 동기화해야하는 작업이 추가되어야합니다.
2. 성능
지금이야 작고 소중하게 데이터를 수집한다고 하지만 조금만 확장해서 생각해보면 I/O 가 계속 발생하는 것이기 때문에 airflow 실행 로컬파일시스템 I/O 성능이 airflow 전체 작업 처리 속도에 병목 지점으로 작용할 수 있습니다.
이 밖에도 airflow 인스턴스가 실제 여러 서버에 분산되어 있는 경우를 생각해보면 환경에 따라 로컬 파일시스템 데이터에 대한 접근이 제한될 수도 있고. 보안 측면에서도 관리가 어렵고 데이터 손실 위험으로부터 자유롭지 못합니다. 물론 지금이야 한 장비에서 모든 데몬들이 컨테이너로 띄워지니 관리가 얼마나 어렵겠냐만, 항상 이런 것들을 염두에 두고 개발하는 것이 좋다 생각합니다.
그렇기 때문에 파일시스템에 파일로 쓰는 것보단 데이터베이스를 사용하는게 좋겠는데요. 인프라 담당 민준님에 의해 Kafka가 되었습니다. Kafka가 DBMS라기보단(일정 맞는 말이긴 하다만) 현재 상황이 최종 파일 적재 장소는 s3이고 버퍼처럼 임시로 데이터를 쏠 곳이 필요한 상황이기 때문에, Kafka의 영속성없는(기본값 상) 특징, pub-sub 구조라는 점, 향후 sns 데이터 수집 시 consumer로 붙여서 브랜드 저장된 토픽에서 꺼내와서 데이터 수집을 보내면 되겠구나 싶어 동의했습니다.
airflow DAG의 json 파일을 합치고 S3에 write하는 task들을 걷어내고 브랜드 제품 수집 즉시 kafka topic으로 produce되도록 합니다.
def entrypoint():
...
from src.common.kafka.utils import Kafka
CONCURRENCY_LEVEL = multiprocess.cpu_count()
def get_item(data: tuple):
brand = Brand({data[0]: data[1]})
brand.crawl_items()
json_data = {b_name: asdict(details) for b_name, details in brand.brand_metadata.items()}
producer.send_data_to_kafka(
kafka_topic='oliveyoung',
data=json_data
)
try:
brand = Brand()
brand.crawl_brand_metadata()
producer = Kafka()
with multiprocess.Pool(CONCURRENCY_LEVEL) as p:
p.map(get_item, list(brand.brand_metadata.items()))
...
그리고 고민했습니다. Kafka에서 S3로 어떻게 보내지?
Kafka의 Consumer가 될 수 있는 후보는 1. python Kafka 모듈 consumer 인스턴스 2. Kafka S3SinkConnecter 이렇게 두 가지가 있겠습니다. 1의 개발 공수와 구현 속도가 압도적이겠으나 2를 선택했는데, 그 과정을 같이 보겠습니다.
what is kafka connect?
먼저 S3SinkConnecter 를 왜 썼고 어떻게 썼는지 공유하기 이전에 이게 뭔지부터 간단히 알아보겠습니다.
결론부터 말하면 Kafka 데이터를 S3에 적재하는 Kafka Connect의 plugin 입니다. Kafka connect 를 잠깐 소개하겠습니다.
Kafka Connect는 Apache Kafka의 일부로, 저희 케이스인 S3부터 MySQL, ElasticSearch 등 여러가지 다른 시스템과 데이터 스트림을 쉽게 통합할 수 있도록 오픈소스로 개발된 컴포넌트입니다. Kafka와 외부 시스템을 연결하는걸 직접 구현하는 대신 선택할 수 있는 플랫폼입니다. 매번 다양한 외부 시스템에 메시지 송수신을 위한 producer, consumer를 구현하는 개발 비용을 줄일 수 있는거죠.
구축 방법
Kafka broker cluster와 별도로 구성됩니다. standalone mode, distributed mode로 구성할 수 있는데, distributed mode는 작업이 자동 분산 동작하고 노드에 장애 발생 시 자동으로 다른 노드에 작업을 이관합니다. distributed mode는 같은 Kafka cluster(bootstrap.servers)를 바라보고, 같은 그룹 아이디(group.id)로 지정하면 같은 작업 내역을 공유하는 Kafka Connect cluster로 구성할 수 있게 됩니다.
Connector
Kafka Connect에서 데이터소스->토픽은 Source Connector, 토픽->외부 시스템은 Sink Connector 가 구현합니다.
따라서 Connector는 Kafka Connect 내부의 실제 메시지 파이프라인 개념입니다. connector를 구성하기 위해선 관련 property(설정) 명세를 전달하면 됩니다. (standalone mode는 파일 형태, distributed mode는 REST API를 통해서 전달 가능)
Kafka connect는 API 요청을 받으면(분산모드 기준, 앞으로도 얘가 계속 기준입니다.) 설정 값에 맞게 connector를 구성하는데요. 이 connector가 주기적으로 메시지를 확인하고 topics 설정에 명시한 토픽에 새로운 메시지가 있으면 파이프라인을 통해 흘려보냅니다. 템플릿처럼 플러그인이 구현되어있고 그 템플릿이랑 알려준 설정 값 기준으로 인스턴스를 생성해주는 것입니다.
아래는 구체적인 Kafka Connect 아키텍처입니다.
- Connector : 파이프라인에 대한 추상 객체. task들을 관리한다.
- Task : Kafka의 메시지를 복제하는 구현체. 실제 파이프라인 동작 요소
- Worker : Connector와 Task를 실행하는 프로세스
- Convertor : Kafka connect와 external system 간 메시지를 변환하는 객체
- Transform : Kafka connect를 통해 흘러가는 각 메시지에 대해 간단하게 처리
- Dead Letter Queue : Kafka connect가 connector의 에러를 처리하는 방식
why kafka connect?
다시 S3로 적재 어떻게 하지? 의 고민으로 돌아와서,
1. python Kafka 모듈 consumer 인스턴스 2. Kafka S3SinkConnecter 중 2번을 선택한 이유를 설득해보겠습니다.
1. fully decoupled
python의 consumer를 사용하면 kafka topic에 적재하는 airflow task가 모두 완료된 이후에 이를 다시 다 consume해서 python 안에 다 붙들고 있다가 s3에 write하게 됩니다. 즉 decoupled 되지 않게 되는데, Kafka Connect를 사용하면 앞단 토픽으로의 produce 작업이 성공하든 실패하든지에 관계없이 S3SinkConnector은 브랜드 데이터가 수집되는 토픽만 바라보고 있다가 일정 configured 된 사이즈가 차면 S3에 적재하면 됩니다.
또 decoupling이 안될 뿐만 아니라 그럴거면 차라리 kafka를 걷어내고 앞에 브랜드 수집기 마지막에 brand.brandmetadata 객체를 s3에 쓰는게 가장 간단합니다.
2. 확장성
이후 instagram, twitter등 다양한 소스로부터 데이터를 수집해야 하는데요. kafka connect 를 초반에 한번 구축해두면 kafka가 중간 브릿지로 유용하게 역할을 할 수 있습니다. 수집 데이터를 토픽에 쏘기만 하면 airflow task 개발 공수 없이 kafka connect task 만 새로 선언해서 자동으로 s3에 적재되도록 할 수 있습니다.
how?
1. 인프라 구성
현재 Kafka 클러스터는 두 개의 broker로 구축되어있습니다. Kafka Connect 컨테이너를 추가하여 간단하게 환경을 구성할 수 있습니다.
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.2.0
container_name: kafka-connect
depends_on:
- kafka1
- kafka2
ports:
- 8083:8083
environment:
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
CONNECT_BOOTSTRAP_SERVERS: "kafka1:19091"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "kafka-connect"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_CONFIG_STORAGE_TOPIC: "_connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "_connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "_connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
# ---------------
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- ./kafka-data/kafka-connect/connectors:/connectors
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:5.4.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
컨테이너 실행 시점에 s3 sink connector를 설치하고, AWS 자격증명 관련 환경변수가 설정됩니다.
한가지 주의할 점은 AWS_SECRET_ACCESS_KEY로 사용하는 문자열에 / 또는 % 가 포함되어있다면 시크릿키를 재생성해서 넣어줘야한다는 점입니다. 해당 문자가 escape 문자로 시크릿키를 변경시키기 때문에, AWS api request header에 추가되는 signiture가 다르게 생성되기 때문입니다.
2. task 정의
이제 Kafka connect에 S3 sink가 동작하는 task를 생성하는 API 를 전달하면 끝입니다. confluent의 rest 인터페이스 명세 공식 docs를 참고하여 connector 클래스, 토픽 이름, aws 설정 및 지역, flush 사이즈를 지정하여 task 구성을 설정합니다.
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://kafka-connect:8083/connectors/sink-s3-voluble/config -d '{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": 1,
"topics": "oliveyoung",
"aws.signing_region": "ap-northeast-2",
"s3.part.size": 5242880,
"s3.region": "ap-northeast-2",
"s3.bucket.name": "brickstudy",
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"topics.dir": "bronze/viral",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "86400000",
"timestamp.extractor": "Record",
"path.format": "yyyy-MM-dd",
"flush.size": 100,
"rotate.interval.ms": 60000,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"locale": "ko_KR",
"timezone": "Asia/Seoul"
}'
이로써 Kafka connect는 oliveyoung 토픽을 보고 있다가 해당 토픽으로 들어오는 레코드를 S3에 json 파일로 저장합니다.
다음 편 예고
X : 수 억을 주면 API 열어줄게
Instagram : IP 막을게 계정 차단할게
me : 죽을게
'Data' 카테고리의 다른 글
DDP(Data Discovery Platform) - 1편 (4) | 2024.10.06 |
---|---|
[viral 탐지 프로젝트] 데이터 파이프라인 구축기 #1 (1) | 2024.09.22 |
AWS lambda를 활용한 S3 적재된 데이터 카탈로그 확인 도구 개발기 (0) | 2024.08.08 |
brickstudy 공용 데이터 환경 구축기 - 1 (0) | 2024.07.28 |
spark local 환경구축하기 (0) | 2024.07.21 |