일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- Spark
- Ai
- 블록체인
- 웹3.0
- vision-language navigation
- discord
- video understanding
- ChatGPT
- airflow
- vln
- MLFlow
- databricks
- Hexagonal Architecture
- GPT
- 챗봇
- backend
- bricksassistant
- 디스코드
- 디스코드봇
- embodied ai
- datahub
- frontend
- data discovery
- embodied
- lambda
- Golang
- web3.0
- Rust
- s3
- 디스코드챗봇
- Today
- Total
BRICKSTUDY
[Spark] 데이터 엔지니어링에 테스트 코드가 필요한가? 본문
📌 Intro
Spark 데이터 처리 과정에서 Test code는 필요할까?
🗂️ Table of Contents
- intro
- Background
- 데이터 엔지니어링에 테스트 코드가 필요한가?
- Spark 테스트 코드
- Conclusion
- References
💡 Background
일반적인 서비스 개발에서 테스트 코드는 유지/보수/관리 측면에서 매우 중요합니다. 특히 소프트웨어 결함을 찾아내고 수정하는 과정을 통해 지속가능한 코드를 작성하는 데 매우 중요한 역할을 수행합니다.
이러한 상황에서 Spark로 데이터 처리하는 과정에서도 테스트 코드를 고민해야합니다.
데이터 엔지니어링에 테스트 코드가 필요한가?
Test Code?
소프트웨어 기능과 동작을 테스트하는 데 사용되는 코드
단위 테스트, 통합 테스트, 시스템 테스트, 사용자 인수 테스트 등 여러가지 테스트가 존재
대부분의 테스트는 기대한 입력값과 출력값을 반환하는지 확인하는데 사용
백엔드 개발에서 테스트 코드는 어떻게 작성해나?
*아래 예시는 설명을 위해 가져온 코드로 일반적인 테스트 코드의 흐름을 보기 위한 용도입니다.
- 테스트 시나리오 작성
- 백엔드의 경우 시스템 상에 문제가 있을 때, 문제로 인해 피해를 사용자가 직접적인 영향을 줄 수 있어 실패한 케이스를 포함한 최대한 마주할 가능성이 있는 모든 경우를 테스트 시나리오 작성합니다.
import pytest
@pytest.mark.asyncio
async def test_account_repository_cannot_insert_user_account():
# given : DB에 테이블이 생성되지 않을 때(DB 연결 오류)
# when : DB 데이터 입력 요청
# then : DB 연결 오류 메시지
pass
@pytest.mark.asyncio
async def test_account_repository_cannot_get_all_user_account():
# given : 테이블에 아무 데이터도 없을 때
# when : DB 데이터 조회 요청
# then : 빈 리스트 출력
pass
@pytest.mark.asyncio
async def test_account_repository_can_insert_user_account():
# given : 유효한 유저 정보
# when : DB에 데이터 입력 요청
# then : 데이터 입력 확인(id)
pass
- 해당 시나리오를 기준으로 코드 작성
여기서 핵심은 완벽한 코드를 작성하기 보단 동작하는 코드 작성
별도의 파일을 만들지 말고 본인이 작성한 시나리오 파일에다가 작성
import os
import pytest
import json
from sqlalchemy.orm import Session, DeclarativeBase
from sqlalchemy.exc import NoResultFound
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String
from sqlalchemy import select
# Mock data
EMAIL = "test@naver.com"
PASSWORD = "test1234"
GENDER = "male"
AGE = "20대"
# db_manager.py
class PostgreManager:
def __init__(self) -> None:
user: str = POSTGRE_CONNECTION["user"]
password: str = POSTGRE_CONNECTION["password"]
host: str = POSTGRE_CONNECTION["host"]
port: str = POSTGRE_CONNECTION["port"]
db: str = POSTGRE_CONNECTION["db"]
DATABASE_URL = f'postgresql://{user}:{password}@{host}:{port}/{db}'
self.engine = create_engine(
DATABASE_URL, pool_size=5, pool_recycle=100, max_overflow=10
)
def get_session(self):
return Session(self.engine)
# model.py
class Base(DeclarativeBase):
pass
class Account(Base):
__tablename__ = 'user_account'
seq = Column(Integer, primary_key=True)
email = Column(String(500), nullable=False)
password = Column(String(500), nullable=False)
gender = Column(String(20), nullable=False)
age = Column(String(200), nullable=False)
generate_count = Column(Integer, nullable=False)
@pytest.fixture
def mockup():
yield {
"email": EMAIL,
"password": PASSWORD,
"gender": GENDER,
"age": AGE
}
@pytest.fixture
def session():
yield PostgreManager().get_session()
@pytest.mark.order(1)
@pytest.mark.asyncio
async def test_account_repository_can_insert_user_account(mockup, session):
# given : 유효한 유저 정보
obj = Account(
email=mockup["email"],
password=mockup["password"],
gender=mockup["gender"],
age=mockup["age"],
generate_count=0
)
# when : DB에 데이터 입력 요청
with session:
session.add(obj)
session.commit()
# then : 입력완료되면 email 반환
# then : 데이터 정상적으로 입력 되었는지 확인
with session:
sql = select(Account).filter(Account.email == EMAIL)
obj = session.execute(sql).scalar_one()
result = {
"email": obj.email,
"password": obj.password,
"gender": obj.gender,
"age": obj.age,
"generate_count": obj.generate_count
}
assert result["email"] == EMAIL
assert result["password"] == PASSWORD
assert result["gender"] == GENDER
assert result["age"] == AGE
assert result["generate_count"] == 0
@pytest.mark.order(1)
@pytest.mark.asyncio
async def test_account_repository_cannot_get_user_account(session):
# given : DB에 없는 조회할 유저 ID
WRONG_EMAIL = "wrong_email"
# then : NoRsultFound error
with pytest.raises(NoResultFound):
with session:
# when : DB에 데이터 조회 요청
sql = select(Account).filter(Account.email == WRONG_EMAIL)
session.execute(sql).scalar_one()
- 모든 테스트 시나리오를 다 작성한 후 실제 프로덕트 코드를 작성
경로 : github
결론. 데이터 엔지니어링에서 테스트 코드 필요한가?
유지보수 관점에서 보았을 때, 필요하다.
일반적으로 배치로 돌리는 작업이 많은 데이터 엔지니어링 특성 상 기존의 파이프라인을 관리하는 것이 중요하다
특히, 한 번 만든 파이프라인을 문제가 있기 전까지는 다시 보지 않기 때문에, 유지 보수를 위해서 테스트 코드를 작성하는 작업은 필요.
다만, 백엔드 개발과는 다른 관점에서 데이터 엔지니어링에서 테스트 코드의 고민이 필요하다!!
백엔드에서 개발은 기능 동작과 예외적 사용에 대해 주로 시나리오를 작성한다면, 데이터 엔지니어링은 예외적인 경우보다 데이터 정합성이 중요한 이슈
데이터 엔지니어링에 맞는 테스트 방법에 고민이 필요하다!!!
Spark 테스트 코드 예시
테스트 코드 원칙
입력 데이터에 대한 유연성
- 비즈니스 요구사항이 변하면 데이터도 변함
- 따라서 애플리케이션과 파이프라인은 입력 데이터 중 일부가 변하더라도 유연하게 대처할 수 있어야함
비즈니스 로직 변경에 대한 유연성
- 입력 데이터뿐만 아니라 파이프라인 내부의 비즈니스 로직이 바뀔수도 있음
- 비즈니스 로직을 테스트해 복잡한 비즈니스 파이프라인이 의도한 대로 동작하는지 확인해야함
- 예상했던 원형 데이터의 형태가 실제 원형 데이터와 같은지 확인하는 것이 중요!!
결과의 유연성과 원자성
- 결과를 의도한 대로 반환하는지 확인
- 데이터가 스키마에 맞는 적절한 형태로 반환될 수 있도록 제어해야함
책 기준 예시 코드
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# SparkSession을 설정하는 함수
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder \
.master("local[2]") \
.appName("pytest-pyspark-local-testing") \
.getOrCreate()
# 데이터 처리 함수를 정의
def process_data(df):
return df.withColumn("new_column", col("value") * 2)
# 테스트 함수
def test_process_data(spark):
# 테스트 데이터 생성
input_data = [("Alice", 1), ("Bob", 2), ("Carol", 3)]
input_df = spark.createDataFrame(input_data, ["name", "value"])
# 데이터 처리 함수 호출
result_df = process_data(input_df)
# 예상 결과 생성
expected_data = [("Alice", 1, 2), ("Bob", 2, 4), ("Carol", 3, 6)]
expected_df = spark.createDataFrame(expected_data, ["name", "value", "new_column"])
# 결과 비교
assert result_df.collect() == expected_df.collect()
Conclusion
데이터 플랫폼을 운영하며, 여러 데이터 파이프라인을 직접 만드는 과정에서 테스트 코드 작성에 대해 한 번쯤은 고민해보는 과정이 필요합니다.
특히, 데이터 처리 관점에서 어떤 테스트 코드가 필요하며, 테스트 절차를 팀 차원에서 고민해보고 적용하면 좋을 것이라는 생각에 데이터 엔지니어링에서 테스트 코드의 필요성에 대한 주제와 예시 코드를 docs에 작성합니다.
실제 프로젝트를 진행하면서 Brickstudy의 테스트 코드에 대한 논의를 하나씩 진행해봅시다!!
References
'Data' 카테고리의 다른 글
[viral 탐지 프로젝트] 데이터 파이프라인 구축기 #1 (1) | 2024.09.22 |
---|---|
AWS lambda를 활용한 S3 적재된 데이터 카탈로그 확인 도구 개발기 (0) | 2024.08.08 |
brickstudy 공용 데이터 환경 구축기 - 1 (0) | 2024.07.28 |
spark local 환경구축하기 (0) | 2024.07.21 |
Spark, Airflow, mlflow, databricks Let's go! (1) | 2024.07.21 |