AI

[AI] Apache Spark 개념정리

usingsystem 2025. 12. 15. 21:31
728x90

1. Apache Spark 개념 및 특징 정리

Apache Spark는 대규모 데이터 분석과 분산 처리를 위해 만들어진 오픈소스 프레임워크입니다.
빅데이터 환경에서 빠른 속도, 다양한 데이터 처리 기능, 그리고 확장성을 제공합니다.

 

특징

1. High Performance (고성능)

  • In-Memory 연산을 통해 기존 Hadoop MapReduce보다 훨씬 빠른 속도를 제공합니다.
    • MapReduce는 디스크 기반으로 데이터를 저장하고 네트워크를 통해 데이터를 주고받는 과정이 많아 속도가 느립니다.
    • Spark는 데이터를 메모리에 올려서 연산을 수행하고, 꼭 필요할 때만 디스크나 네트워크를 사용합니다.
  • 덕분에 최대 100배 이상 빠른 성능을 보여주기도 합니다.
  • 머신러닝용 라이브러리(MLlib), 그래프 연산 라이브러리(GraphX) 등을 지원하여 데이터 분석을 더 쉽고 효율적으로 진행할 수 있습니다.

2. Scalability (확장성)

  • Spark는 수평 확장(Scale-Out) 구조로 설계되어 있습니다.
  • 동일하거나 유사한 사양의 서버(노드)를 여러 대 묶어 하나의 클러스터를 구성합니다.
  • 이를 통해 페타바이트(PB)~제타바이트(ZB)급의 대규모 데이터도 처리할 수 있습니다.
  • Spark는 다양한 클러스터 매니저 위에서 동작할 수 있습니다:
    • Apache Hadoop YARN
    • Apache Mesos
    • Kubernetes

3. Structured Data 지원

  • Spark는 정형 데이터뿐만 아니라 비정형(Unstructured) 데이터를 구조화(Structured)하여 분석할 수 있습니다.
  • SQL처럼 다루기 쉽게 만들어주는 Spark SQL을 제공하며, 다양한 소스(CSV, JSON, Parquet, Hive 등)에서 데이터를 가져올 수 있습니다.

4. Real-Time Data Processing (실시간 처리)

  • Spark Streaming 라이브러리를 사용하면 실시간 데이터 처리도 가능합니다.
    • 실시간 로그 모니터링
    • IoT 센서 데이터 분석
    • 클릭 스트림(Clickstream) 분석

2. Spark 아키텍처

  • Spark 클러스터는 크게 마스터 노드(Master Node)워커 노드(Worker Node) 로 구성됩니다.
  1. Master Node
    • 드라이버 프로그램(Driver Program)을 실행합니다. 마스터 노드는 스파크 컨텍스트와 드라이버 프로그램을 실행하며, 클러스터의 자원을 관리하고 작업 실행을 조율하는 역할을 합니다.
    • SparkContext가 존재하며, 클러스터 매니저(YARN, Mesos 등)에게 작업(Job)을 전달합니다.
  2. Worker Node
    • 여러 개의 Executor가 실행됩니다.
    • Executor는 실제 연산을 담당하며, 그 안에서 Task 단위로 나누어 실행합니다.
    • 필요에 따라 캐시(Cache)를 활용하여 성능을 높입니다.
  • 전체 작업 흐름:
    1. 사용자가 Job을 제출 → SparkContext가 클러스터 매니저에게 전달
    2. 클러스터 매니저가 워커 노드에 Job을 배분
    3. 각 워커 노드에서 Executor가 Task 실행
    4. 결과를 모아 사용자에게 반환

3. RDD (Resilient Distributed Dataset)

RDD는 대규모 데이터를 RAM에 올려서 클러스터 여러 노드에 분산 저장하고, 메모리 기반으로 빠르게 처리하며, 장애가 발생해도 연산 기록(lineage)을 이용해 복구할 수 있는 Spark의 기본이되는 핵심 데이터 단위입니다.

 

특징

  1. Immutable(불변성)
    • 한 번 생성된 RDD는 변경할 수 없습니다.
    • 데이터를 수정하려면 새로운 RDD가 생성됩니다.
  2. Fault-Tolerance(내결함성)
    • 데이터가 손실되거나 손상(Corrupt)되면 자동으로 다른 노드에 있는 복제본(Replica)을 통해 복구합니다.
  3. Parallel Processing(병렬 처리)
    • 데이터는 클러스터의 여러 노드에 분산되어 저장·처리되므로 병렬 연산이 가능합니다.
  4. Lazy Evaluation(지연 연산)
    • 지연 연산은 변환 작업을 즉시 실행하지 않고 기록해 두었다가, collect(), save() 같은 액션(Action)이 호출되기 전까지는 실제 연산이 수행되지 않습니다.
    • 이를 통해 Spark는 최적화된 실행 계획을 세워 효율적으로 연산을 수행합니다.
#가상환경일 경우 패스등록
import os
import sys
os.environ["PYSPARK_PYTHON"] = sys.executable
# 예전 방식
import pyspark
sc = pyspark.SparkContext('local[*]') # Spark 실행 엔진을 로컬 CPU 코어로 켠 것, 메모리 생성

rdd = sc.parallelize(range(1000)) # 0~999까지 숫자를 메모리(RAM)에 분산 저장한 RDD 생성
rdd.takeSample(False, 5)
# 최신방식
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() # PySpark의 통합 엔진 DataFrame, SQL, Hive, RDD 작업 모두 여기서 시작
sc = spark.sparkContext # RDD 작업을 할 때 필요

rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

 

4. Key Value Pair 명령

`reduceByKey`는 동일 키의 값들을 합산과 같은 연산으로 하나로 줄이는 반면, `groupByKey`는 단순히 동일 키의 값들을 하나의 목록으로 묶습니다. `reduceByKey`가 보통 더 효율적일 수 있습니다.

 

`filter` 변환은 제공된 조건(predicate) 함수를 각 요소에 적용하여 결과가 참인 요소들만 포함하는 새로운 RDD를 반환합니다. 데이터 정제 등에 유용합니다.

 

`countByValue` 액션은 RDD의 모든 고유 요소와 해당 요소의 발생 횟수를 계산하여 (값, 개수) 쌍의 Dictionarry형태로 편리하게 반환해 줍니다. 워드 카운트 같은 작업에 적합해요.

 

`collect` 액션은 분산된 RDD의 모든 데이터를 드라이버 프로그램으로 수집하여 로컬 메모리에 파이썬 리스트 형태로 반환합니다. RDD 크기가 클 때는 주의해야 합니다.

import pyspark

sc = pyspark.SparkContext.getOrCreate()
# Key / Value RDD

# creating Key / Value RDD
total_by_brand = rdd.map(lambda brand: (brand, 1))

# # reduceByKey(): Merge the values for each key using an associative and commutative reduce function.
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]


# groupByKey(): Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(f=len).collect())
[('a', 2), ('b', 1)]
sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]


# sortByKey(): Sorts this RDD, which is assumed to consist of (key, value) pairs.
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().first()
('1', 3)


# keys(), values(): Create a RDD of keys or just values
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.keys()
['a', 'b', 'a']

# join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]


# Efficiency is the key for performance!!!
# if you only need values, use mapValues() or flatMapValues()

`countByValue` 액션은 RDD의 모든 고유 요소와 해당 요소의 발생 횟수를 계산하여 (값, 개수) 쌍의 사전 형태로 편리하게 반환해 줍니다. 워드 카운트 같은 작업에 적합해요.

spark는 yeild와 궁합이 좋다.

  • flatMap은 각 원소가 여러 개의 결과를 만들 수 있도록 설계됨
  • 함수에서 yield를 쓰면 이터레이터(iterator)를 반환
  • flatMap은 이 이터레이터를 받아 결과를 펼쳐(flatten) RDD에 넣음
  • 장점: 메모리를 많이 쓰지 않고 대용량 데이터 처리 가능
def get_data(line, header):
    if line != header:
        col = line.split(',')
        city = col[6].strip("\"")
        avg_temp_fahr = col[4]
        yield (city, avg_temp_fahr)
		return [(city, avg_temp_fahr)]  # 이렇게도 가능하지만 느림
        
parsed_line = lines.flatMap(lambda line: get_data(line, header))

map vs flatMap

`flatMap`은 각 입력 항목에서 0개 이상의 여러 출력을 생성하고 그 결과를 단일 목록으로 평면화합니다. 반면 `map`은 항상 각 입력 항목에 대해 정확히 하나의 출력 항목만 생성한다.

  • map: 각 원소를 단순 변환할 때
    • 예: 모든 숫자에 10 더하기, 문자열 소문자로 변환 등
  • flatMap: 각 원소를 여러 개로 나누거나 토큰화할 때
    • 예: 문장 → 단어, CSV 행 → 여러 값 등
함수 입력 RDD 원소 출력 RDD 원소 특징
map 1개 1개 각 원소를 1:1로 변환
flatMap 1개 0개 이상 각 원소를 0~n개로 변환 후 평탄화(flatten)
rdd = sc.parallelize(["hello world", "hi spark"])

rdd_map = rdd.map(lambda x: x.split(" "))
print(rdd_map.collect())

출력 : [['hello', 'world'], ['hi', 'spark']]

rdd_flat = rdd.flatMap(lambda x: x.split(" "))
print(rdd_flat.collect())

출력 : ['hello', 'world', 'hi', 'spark']

5. 데이타프레임(Dataframe)과 SparkSQL

Spark를 제대로 쓰려면 RDD → DataFrame → SparkSQL 흐름을 이해해야 합니다.

1) 데이터프레임(DataFrame) 이란?

Spark의 DataFrame은 RDD(기본 데이터 구조)위에서 동작하는 더 고수준의 구조화된 데이터 모델로 더 빠르고 구조적 이다. 형태는 Pandas DataFrame과 비슷하지만 엄청 큰 데이터(수십 GB~TB)를 클러스터에서 병렬 처리할 수 있다는 점이 다릅니다.

DataFrame의 특징

(1) 스키마(Schema)를 가진다 ( structure)

name age city
Bob 20 Seoul

RDD는 구조를 모르지만, DataFrame은 구조를 알기 때문에 Spark가 최적화할 수 있음.

 

(2) Catalyst 옵티마이저로 자동 최적화됨(카탈리스트 최적화 도구)

Spark SQL 엔진이 DataFrame 연산을 분석해서 가장 빠른 방식으로 실행 계획을 자동으로 만들어줌.

  • 쓸데없는 연산 제거
  • 병렬 실행 최적화
  • 디스크/메모리 접근 최소화

→ RDD보다 훨씬 빠르다

 

(3) SQL처럼 다룰 수 있다

DataFrame은 SQL 테이블처럼 사용할 수 있어서 df.select("name", "age").where(df.age > 20) 이런 식으로 직관적인 표 형태의 연산이 가능.

2) Spark SQL 이란?

DataFrame을 SQL 문법으로 처리할 수 있게 해주는 인터페이로 Spark SQL 엔진이 돌아가면서 DataFrame과 SQL 사이를 연결해줌.

(1) SQL 문법 그대로 사용 가능

데이터를 분석하는 사람들이 SQL을 잘 알기 때문에 복잡한 분석을 바로 SQL로 작성 가능.

(2) 표현력이 쉬움

코드로 하면 길어지는 작업도 SQL 한 줄이면 끝.

(3) 자동 최적화

SQL도 Catalyst 엔진이 알아서 최적화해줌.

 

DataFrame ↔ SparkSQL 예시( SparkSession)

from pyspark.sql import SparkSession

# SparkContext + SQL + Hive + DataFrame + Catalog 모두 포함한 통합 엔트리포인트
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()  

# DataFrame 생성
lines = spark.sparkContext.textFile("file:///")
#data = spark.read.option("header", "true")\ csv 읽을 때 사용
            .option("inferSchema", "true")\ 데이터를 추론해서 타입맞춰줌
            .csv(csv_file_path)
#data.printSchema()

file_data = lines.map(parse_line)

# SQL로 사용하기 위해 등록
schema_income = spark.createDataFrame(data=file_data).cache()# ram 메모리에 프레임생성고 ㅏ올림
schema_income.createOrReplaceTempView("file_data")# sql을 사용하기 위해 테이블이름 등록

# SQL 사용 가능
result = spark.sql("""
    SELECT city, AVG(temperature)
    FROM table
    GROUP BY city
""")

result.show()

 

RDD vs DataFrame vs SparkSQL 차이 (정리)

방식 특징 장점 단전
RDD 구조 없는 분산 데이터 유연함 느림, 최적화 X
DataFrame 스키마 + 구조화된 데이터 Catalyst 최적화로 빠름 복잡한 로직은 제한적
SparkSQL SQL 문법으로 DataFrame 조작 SQL만 알면 분석 가능 SQL에 익숙해야 함

6. Dataframe StructType

dataframe structType이란 만약 csv파일을 읽었을 경우 헤더가 있다면 inferSchema=True를 해서 사용하면 될 것 이다. 하지만  헤더가 없을 경우엔 StructType을 사용해 직접 헤더를 만들 수 있다.

헤더가 있다면 

  • DataFrame의 컬럼 구조를 직접 정의할 때 사용하는 타입
  • 각 컬럼은 StructField로 구성됨
  • 컬럼의 이름, 데이터 타입, NULL 허용 여부 등을 설정할 수 있음

즉, "이 DataFrame은 어떤 컬럼을 어떤 타입으로 가지고 있다" 를 정의하는 것.

# 헤더가 없는 csv일경우 직접 structType 구현
table_schema = t.StructType([
    t.StructField("xxx1", t.StringType(), True),
    t.StructField("xxx2", t.FloatType(), True),
    t.StructField("xxx3", t.StringType(), True)])

csv_file_path = "file:/data.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)

df.printSchema()

# 헤더 있는 csv는 inferSchema true로 사용
data = spark.read.option("header", "true")\ csv 읽을 때 사용
            .option("inferSchema", "true")\ 데이터를 추론해서 타입맞춰줌
            .csv(csv_file_path)
            
data.printSchema()

7. BrodCast와 BroadCast 조인

BrodCast란 모든 워커(worker)에게 동일한 데이터를 네트워크 오버헤드 없이 빠르게 전달하기 위한 캐시 메커니즘으로 Spark에서 작은 DataFrame을 클러스터 전체의 모든 Executor 메모리에 복사해서 배포하는 기능.

 

Spark는 액션을 실행할 때 워커 노드들이 데이터를 받을 때 네트워크를 통해 계속 전송해야 할 수 있음.

하지만 broadcast 변수를 쓰면 드라이버(driver)가 데이터를 한 번만 보냄 -> 워커들이 로컬 메모리에 저장한 뒤 -> 모든 task에서 재사용 즉, 네트워크 비용 대폭 절감, 속도 엄청 빨라짐

 

BroadCast 조인이란 작은 테이블을 전체 워커 노드에 브로드캐스트하여, 큰 테이블과 조인을 빠르게 수행하는 방식으로 일반 조인은 큰 테이블끼리 shuffle(데이터 섞기로 데이터를 다른 파티션으로 재배치하는 과정 )을 해서 노드 사이에 엄청난 네트워크 비용이 발생함. 하지만 작은 테이블을 broadcast하면

  • 작은 테이블은 모든 노드에 미리 복제됨
  • 큰 테이블은 shuffle 없이 "로컬"에서 바로 매칭
  • 네트워크 비용 거의 없음

즉, 대규모 데이터 조인을 매우 빠르게 처리하는 최적화

 

Spark는 기본적으로 작은 테이블이면 자동으로 broadcast한다. 작은 테이블만 broadcast해야 한다. Python UDF 내부에서 broadcast.value를 사용하면 워커가 Python 프로세스를 두 번 넘나들기 때문에 조금 느릴 수 있음.

meta = {"A": 1, "B": 2}

occupation_dict = spark.sparkContext.broadcast(meta)

def get_name(occupation_id: str) -> str:
    return occupation_dict.value[occupation_id]

occupation_lookup_udf = f.udf(get_occupation_name) # 함수를 넣을 수 있게

occupation_with_name = interviewer_count.withColumn("occupation_name", occupation_lookup_udf(f.col("occupation_id")))

occupation_with_name.show(10)

 

  • broadcast_meta.value → 실제로 담겨 있는 데이터
  • 워커에서 접근해도 네트워크로 다시 전송되지 않음

8. coalesce와 repartition

1) coalesce

coalesce는 빠르게 파티션 수를 줄이는 최적화 함수로 파티션을 줄일 때 사용한다. 즉 기존 파티션 (데이터를 여러 조각으로 나눠서 여러 워커 노드가 병렬로 처리할 수 있게 만든 단위)중 일부를 병합하는 방식이다. 

 

shuffle(데이터를 다른 파티션으로 재배치하는 과정)이 발생하지 않아 매우 빠르다.

df.coalesce(1)

 

2) repartition

repartition는 반대로 파티션을 늘리는 용도로 사용하지만 shuffle 발생한다.

 

repartitionByRange는 특정 키값으로 파티션을 늘리는걸로  repartition, coalesce와 다르게 DataFrame에서만 가능

df.repartition(10)#repartition
df.repartition(200, "city", "job") #repartitionByRange

 

 

1) 작업 병렬성을 늘리고 싶을 때 (파티션 ↑)

처음 로딩된 데이터가 파티션이 너무 적으면 CPU 코어가 놀게 됨
→ repartition(n)을 통해 병렬 처리 향상

2) 조인/집계 성능 향상을 위해 파티션을 균등하게 맞추고 싶을 때

데이터 스큐( 데이터가 균등하게 분배되지 않아 특정 Task가 병목이 되는 문제. )가 있을 때 사용
예) 특정 키만 몰려 있을 때 명시적으로 repartition("key")

3) 대규모 저장(write) 작업 전에 필요한 파티션 개수 맞추기

  • 파일 출력을 1개의 파일로 만들고 싶을 때 → coalesce(1)
  • HDFS에 적당한 파일 개수로 저장하고 싶을 때 → repartition(200)
함수 파티션 셔플 테이터 재분배 방식 사용 목적
repartition(n) 증가 발생 해시 기반 균등 재배치 병렬성 확보, 조인/집계 정확한 분배
repartitionByRange(n, cols) 증가 발생 Range 정렬 기반 파티션 분배 정렬 기반 작업 전 최적화, 스키마 정렬
coalesce(n) 줄임 없음 기존 파티션 일부만 합침 빠른 파티션 감소, 최소 비용

9. 빈 데이터와 시간 포멧 다루는 방법

na.drop 키워드 

 

  • how : any - 데이터가 하나라도 비어있으면 삭제, all - row에 모든 데이터가 비어있으면 삭제
  • threash : 하나의 row에 데이터 빈칸의 숫자를 지정해 지정한 숫자만큼 비어있으면 삭제
  • subset : 특정 컬럼을 지정해 해당 컬럼이 비어있으면 삭제
df.na.drop(how="any").show() 
df.na.drop(thresh=2).show()
df.na.drop(subset=["salary"]).show()

 

na.fill 키워드

null인 데이터를 채우기위해

# # fill string
df.na.fill("engineer").show()

# # fill integer
df.na.fill(0).show()

# # fill the subset
df.na.fill("NA", subset=["occupation"]).show()

# # fill the mean value
mean_value = df.select(f.mean(df['salary'])).collect()
df.na.fill(mean_value[0][0], subset=["salary"]).show()

 

date

# # show year
df.select(f.year('date')).show()

# # show month
df.select(f.month('date')).show()

# # show day
df.select(f.dayofmonth('date').alias('day')).show()
df.select(f.dayofyear('date').alias('day')).show()

10. YARN (Yet Another Resource Negotiator) 이란?

하둡의 클러스터 자원을 관리하고, 애플리케이션 실행을 스케줄링해주는 시스템

  • 누가 CPU를 얼마나 쓰는지
  • 메모리를 얼마만큼 배분할지
  • 어떤 노드에서 작업을 실행할지

모두 YARN이 관리한다.

11. Dataframe의 API 카테고리

1) Transformations

스파크 특성상 데이터를 수정할 수 없다. 이 때 Transformations는 RDD/DataFrame을 입력 → 새로운 RDD/DataFrame을 만드는 작업으로 즉시 실행되지 않고, DAG(작업 계획)에만 기록되며 Action을 만나면 실행된다.

DAG는 만들어두면 Spark가 뒤에서 최적화(scheduler)

Transformation  설명  Dependency  이유
map 각 요소를 1:1로 변환 Narrow 파티션 내에서만 처리됨
flatMap 요소 하나 → 여러 요소 Narrow 파티션 내에서만 처리됨
filter 조건에 맞는 요소만 통과 Narrow 파티션 내부만 사용
mapPartitions 파티션 단위 변환 Narrow 부모 파티션 1개에만 의존
sample 샘플링 Narrow shuffle 없음
coalesce(n, shuffle = false) 파티션 수 줄임 Narrow 데이터 이동 거의 없음
union 두 RDD 합침 Narrow shuffle 없음 (그냥 두 파티션 나열) 
Transformation  설명  Dependency  이유
distinct 중복 제거 Wide 같은 값을 모으기 위해 전체 파티션 재배치(shuffle) 필요
repartition(n) 파티션 수 재조정 Wide 모든 데이터를 섞어서 파티션 재분배
sortBy / orderBy 전체 정렬 Wide global ordering 위해 shuffle
groupByKey key 기준 그룹화 Wide 모든 동일 key를 한 파티션으로 모아야 함
reduceByKey key 기준 reduce Wide shuffle 발생하지만 groupByKey보다 효율적
join RDD/DataFrame join Wide key 기준 서로 섞어야 함
cogroup 여러 RDD key-group Wide shuffle
distinct 중복 제거 Wide 같은 데이터를 모아야 하기 때문에 shuffle

(1) Narrow Dependency(좁은 의존성) 과 Wide Dependency(넓은 의존성)

Dependency란 RDD/DataFrame Transformation이 이전 Transformation의 어떤 파티션에 의존하는지를 나타내는 개념.

 

Narrow Dependency는 자식 RDD의 하나의 파티션이 부모 RDD의 소수(일반적으로 하나)의 파티션만 참조하는 경우로 파티션 이동이 없고 네트워크 통신이 없다. 즉 Executor가 자기 파티션만 보면되고 Shuffle이 없어 빠르다.

 

Wide Dependency는 자식 RDD의 하나의 파티션이 부모 RDD의 여러 파티션에 의존하는 경우로 여러 부모 파티션에서 데이터를 가져와야하고 네트워크 shuffle이 발생해서 비용이 큰 작업이므로 느리다.

textFile → flatMap → map → reduceByKey → sortByKey

flatMap		Narrow	한 파티션이 그대로 변환
map		Narrow	동일
reduceByKey	Wide	같은 key를 하나로 모아야 함 (shuffle)
sortByKey	Wide	전 파티션에서 key 정보를 모아야 함

2) Actions

Transformations로 정의한 DAG를 실제로 실행시키는 명령으로 즉, 결과를 반환하거나 저장할 때 실행된다.

Spark의 모든 동작은 Transformation으로 DAG를 만들고 Action에서 한 번에 실행된다.

Action 설명
collect() 전체 데이터를 드라이버로 가져옴
show() DataFrame 출력
take(n) n개만 가져오기
count() 개수 세기
first() 첫 요소
reduce() 모든 요소를 하나로 합침
saveAsTextFile() 파일로 저장
foreach() 각 요소에 대해 함수 실행
rdd = sc.textFile("file.txt")

# Transformation: DAG에만 기록됨
words = rdd.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda w: (w, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

# Action: 실제 실행
print(counts.collect())
액션 메모리 사용 위치 설명
collect() 드라이버 메모리 모든 데이터를 드라이버로 가져옴 → 위험
count(), reduce(), show() Executor 메모리 계산은 Executor 메모리에서 수행 후 결과만 드라이버로

12. Logical Plan (논리적 실행 계획)과 Physical Plan (물리적 실행 계획)

Catalyst Optimizer가 SQL/DataFrame 연산을 처리할 때 거치는 핵심 개념으로 왜 Spark가 자동 최적화되고, 어떻게 실행 계획이 만들어지는지 이해할 수 있어 튜닝을 할 때 필요하다.

 1) Logicl Plan

사용자가 작성한 쿼리/코드가 무엇을 해야 하는지를 표현한 “논리적” 계획으로 아직 실행 방식(executor, shuffle)은 고려하지 않는다. 즉, 사용자 쿼리를 더 빠르게 실행할 수 있도록 논리 최적화를 수행한 상태이다. Spark는 아래와 같이  Logical Plan을 3단계로 구분한다.

Unresolved Logical Plan (미해결 계획)

  • 테이블/컬럼이 존재하는지 아직 모름
  • 단지 문자열 기반으로 "filter", "select"만 적혀 있음

Resolved Logical Plan (해결됨)

  • 컬럼이 실제 존재하는지 검증 완료
  • 스키마 분석 끝
  • 함수(DataFrame API)도 실체로 매핑됨

Catalyst Optimized Logical Plan (최적화됨)

  • predicate pushdown (필터 아래로 내리기)
  • projection pruning (필요 없는 컬럼 제거)
  • constant folding (상수는 미리 계산)
  • join reordering (join 순서 최적화)
  • filter merge (필터 병합)

2) Physical Plan

Logical Plan을 실제 실행하는 방법으로 바꾼 것 으로 실제 Spark 클러스터에서 어떻게 연산할지, 어떤 방식으로 수행할지를 포함한다.

Spark는 여러 개의 physical plan을 생성한 뒤그중 가장 비용(cost)이 낮은 것을 선택함.

  • HashAggregate vs SortAggregate
  • ShuffleHashJoin vs BroadcastHashJoin
  • Whole Stage Codegen 적용 여부
  • Exchange(Shuffle) 전략
df.explain()		# Physical Plan 요약
df.explain(True)	# Parsed / Analyzed / Optimized Logical Plan + Physical Plan (전체)
df.explain("formatted")	# 사람이 읽기 편한 트리 형태의 출력
df.explain("cost")	# Cost-based optimizer 비용 정보 포함 (Spark 3+)

 

13. Spark Memory Allocation

Spark Job을 동작시키는데 포퍼먼스에 중요한 부분을 담당하고 있다. DriverMemory와 ExecutorMemory로 구성되어 있다.

1) Executor Memory

(1) Execution Memory(Handle Computations)

Shuffle, Join, Sort, Aggregate 같은 연산의 중간 결과를 저장하는 공간. 즉 계산용 메모리로 실시간 연산을 위한 메모리 이다. 

부족할 경우 성능 저하일어 날 수 있음. Storage memory보다 우선권 있으며 storage가 점유한 메모리를 빌려올 수 있다.

 

  • 캐싱(Storage Memory)
  • 연산(Execution Memory)
  • Shuffle 작업
  • RDD/DataFrame 저장

(2) Storage Memory(Caching)

RDD/DataFrame Cache 메모리 (사용자가 df.cache() 또는 RDD .persist() 했을 때 저장)  즉 캐시된 데이터를 보관하는 공간

(3) User Memory

UDF / 사용자 코드 객체 저장

2) Driver Memory

Spark의 두뇌(플랜 생성/스케줄링/collect 데이터 수신)와 같은 역할을 수행한다.

  • SparkSession / SparkContext 유지
  • Logical Plan / Physical Plan 생성
  • Job / Stage / Task 스케줄링
  • collect(), show() 시 데이터가 Driver로 모임
  • 메타데이터 관리 (catelog정보, broadcast 변수관리, shuffle파일 위치 정보)
+------------------------------------------------------------+     +-----------------------------------------------------------+
|                        Driver Memory                       |     |                      Executor Memory                      |
|                   (spark.driver.memory)                    |     |                   (spark.executor.memory)                  |
|                                                            |     |                                                           |
|  +------------------------------------------------------+  |     |  +--------------------+   +-----------------------------+ |
|  |  SparkSession / SparkContext                         |  |     |  | Execution Memory    |   | Storage Memory            | |
|  |  Logical Plan / Physical Plan 생성                   |  |     |  | (연산 중간 결과)     |   | (캐시/RDD/Persist)        | |
|  |  DAG 스케줄링(Job / Stage / Task)                   |  |     |  +--------------------+   +-----------------------------+ |
|  |  collect/show 결과 저장                              |  |     |                                                           |
|  |  메타데이터 관리                                      |  |     |   +-----------------------------+                        |
|  |                                                      |  |     |   |        User Memory          |                        |
|  +------------------------------------------------------+  |     |   |   (UDF 객체, 기타 JVM 객체) |                        |
|                                                            |     |   +-----------------------------+                        |
+------------------------------------------------------------+     +-----------------------------------------------------------+
                                                                   |    Memory Overhead (Off-Heap)                           |
                                                                   |    (DirectBuffer, PySpark Worker, Tungsten)              |
                                                                   |    spark.executor.memoryOverhead                         |
                                                                   +-----------------------------------------------------------+

 

14. AQE(Adaptive Query Execution)

Spark SQL이 실행 도중에 실제 런타임 통계를 보고 쿼리 플랜을 자동으로 최적화하는 기능으로 즉, 쿼리 실행 전에 만든 플랜을 그대로 따르는 게 아니라, 실행 중에 더 좋은 플랜을 찾아서 바꿔버리는 것이다.

 

Spark은 원래 쿼리 실행 전에 Logical Plan → Physical Plan 을 만든 후, 그 Physical Plan을 그대로 실행했음. 하지만 조인 전략이 잘못 선택이 된다거나 파티션이 너무 작거나 너무 클 수 있다. 이런 경우 초기 플랜이 비효율적이 되어 성능이 팍 떨어지게 된다. 이 때 AQE는 실행 도중에 ”어? 이 파티션 너무 큰데?” “이 조인 작은데 브로드캐스트 가능한데?” 하고 감지함 → 즉시 플랜 수정한다.

 

AQE는 Physical Plan 단계에서 발생하는 실행 중 최적화하기 때문에 Logical Plan이 아닌 Physical Plan이다.
즉,  실행 중에 Physical Plan을 재작성함(Re-Optimize)

 

AQE가 하는 핵심 최적화 3가지

1)동적 조인 전략 변경 (Dynamic Join Rewriting)

실행 중에 조인 대상 데이터 크기를 보고 Broadcast Join이 가능하면 자동으로 변경함

가장 큰 효과. 특히 sort-merge join같은 경우 엄청난 비요이듬. 이런걸 자동으로  Broadcast join으로 변경

 

2) 파티션 자동 병합 / 분할 (Coalescing Shuffle Partitions)

AQE는 실제 셔플된 파티션 크기를 보고 자동으로 너무 작은 파티션들 → 합치고 너무 큰 파티션 → 쪼갬

즉, 파티션 크기를 균일하게 맞춰줌.

 

3) 데이터 스큐 처리 (Skew Join Optimization)

특정 키에 너무 많은 데이터가 몰린 스큐( 데이터가 균등하게 분배되지 않아 특정 Task가 병목이 되는 문제. )상황을 AQE가 감지해서 스큐 키만 작은 청크로 나눠 처리, 나머지는 정상 처리 조인 스캔이 막혀서 느려지는 것을 방지함.

# spark3.x 대는 자동으로 켜져있음
spark.conf.set("spark.sql.adaptive.enabled", "true")

 

15. DPP (Dynamic Partition Pruning)

실행 중에 불필요한 파티션을 자동으로 건너뛰도록(Skip) 하는 Spark SQL 최적화 기능이다. 즉, 필요한 파티션만 읽고, 나머지는 읽지도 않는 런타임 파티션 필터링 기능

 

DPP는 파티션 키 필터가 쿼리 실행 시점에 결정될 때 유용하다. 필요한 파티션만 읽어 I/O와 네트워크 부하를 줄여 성능을 높인다.

16. Spark Cache(재사용)

Spark의 Cache는 자주 사용되는 DataFrame이나 RDD를 메모리(또는 디스크)에 저장해서 나중에 동일한 데이터를 다시 사용할 때 빠르게 읽어오는 기능이다. 즉 비싼 연산(Shuffle, Scan, Join 등)을 매번 다시 계산하지 않고 결과를 저장해두는 메모리 최적화 기능

캐싱은 중간 계산 결과를 메모리에 저장하여 재사용하고 불필요한 재계산이나 I/O를 줄여 성능을 높여요. 데이터 분배는 Repartition, 느린 태스크 복제는 Speculative Execution, 변수 공유는 Accumulator에 해당한다.

 

Cache는 메모리를 사용하므로 사용이 끝나면 꼭 해제해야 함.

 

Cache 꼭 써야 하는 경우

  • 동일한 DataFrame을 여러 번 사용할 때
  • 복잡한 ETL 파이프라인에서 중간 결과를 재활용할 때
  • 반복 계산(ML 알고리즘, iterative jobs)
  • Shuffle 이후 결과를 여러 Action에서 재사용할 때

사용하면 안 되는 경우

  • DataFrame이 너무 커서 메모리에 못 들어갈 때
  • 한 번만 사용하는 데이터
  • IO가 비싸지 않은 작은 데이터
  • 필터로 작은 Subset만 쓸 때 (필요한 부분만 캐시하는 게 더 효율적)
df.cache()
df.count()   # Action → 캐시가 실제로 저장되는 시점

df.unpersist() # 해제
Level  설명
MEMORY_ONLY 기본 Cache. 메모리에만 저장, 부족하면 재계산
MEMORY_AND_DISK 메모리 부족하면 디스크에도 저장
MEMORY_ONLY_SER 메모리를 더 절약하기 위해 Serialized 형태
DISK_ONLY 디스크에만 저장 (I/O 느림)

17. sql hint(강제 주입)

SQL 쿼리 안에 힌트를 넣어서 조인 전략, 파티션 방식, Broadcast 여부, Shuffle 동작 등을 강제로 제어할 수 있음.

리소스등을 더 잘 활용가능함.

SQL Hint는 사용자가 쿼리 옵티마이저에게 물리적 실행 계획(예: 조인 전략, 파티션 수)에 대한 힌트를 제공하여 성능을 개선하는 방법으로 결과 데이터 변경과는 무관하다.

/*+ BROADCAST(dim) */ 이런식으로 사용함.

 

자주사용하는 hint

 

  • BROADCAST(dim) — 작은 테이블 broadcast
  • REPARTITION(200) — write 전 파티션 조정
  • REPARTITION(50, user_id) — 조인/집계 최적화
  • MERGE 또는 SHUFFLE_HASH — 조인 전략 강제

힌트 종류

hint 설명
BROADCAST / MAPJOIN 작은 테이블을 Broadcast Join으로 강제
MERGE / SHUFFLE_HASH / SHUFFLE_REPLICATE_NL 조인 전략 직접 지정
COALESCE / REPARTITION / REPARTITION_BY_RANGE 실행 시 파티션 수 조절
JOIN HINTS build side 선택 등 고급 제어

 

예시

SELECT /*+ BROADCAST(dim) */
       f.user_id, dim.level
FROM fact f
JOIN dim ON f.id = dim.id;
df.hint("broadcast")
df.hint("repartition", 200)
df.hint("repartition_by_range", 50, "age")

18. Accumulator(디버깅)

Spark에서 작업(Task)들이 실행되면서 발생하는 값을 안전하게 누적하기 위해 만든 변수로 디버깅 할때 활용 하면 좋다.

즉 분산 Task들이 생성하는 로그성 통계 모으기에 유용함 ( 예) 에러 로우가 몇 개였는지 count, null 데이터 개수, 필터링된 행 개수

, 어떤 조건에 맞지 않는 값 누적 )

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# accumulator 생성
acc = sc.accumulator(0)

rdd = sc.parallelize([1, 2, 3, 4, 5])

def process(x):
    if x % 2 == 0:
        acc.add(1)
    return x

rdd.map(process).collect()

print("짝수 개수:", acc.value)

19. Speculative execution

Speculative Execution(추측 실행)은 Spark에서 느리게 실행되는(Task straggler) 작업을 빠르게 하기 위해 뒤에서 같은 Task를 하나 더 실행시키는 최적화 기능이다.

 

Spark는 클러스터 전체에서 많은 Executor·노드에 작업을 분배함.
그중 하나가…

  • 머신 성능 저하
  • GC 과다
  • 네트워크 문제가 있는 노드
  • 디스크 I/O 병목
  • 데이터 스큐(skew)
  • 하드웨어 이상

…등의 이유로 특정 Task가 유독 느리게 실행될 수 있음.

한 개의 느린 Task 때문에 전체 Stage가 대기 → 결국 전체 Job 전체가 느려짐.

이걸 해결하려고 같은 Task를 하나 더 실행해서 누가 먼저 끝나나 경쟁시키는 게 Speculative Execution.

spark.speculation=true # 설정 옆에꺼 써도됨 spark.conf.set("spark.speculation",true)

#성능 옵션
spark.speculation.quantile=0.75   # 하위 25%가 느리면 speculative 실행
spark.speculation.multiplier=1.5  # 평균보다 1.5배 느리면 대상
spark.speculation.interval=100ms  # 검사 간격

20. Job Scheduling 

드라이버(Driver)가 여러 Job을 어떤 순서로, 어떤 리소스로, 어떻게 실행할지 관리하는 메커니즘이다.

Spark Job Scheduling은 여러 Job을 FIFO 또는 FAIR 방식으로 Executor 리소스를 나눠 실행하는 방식이며, TaskScheduler가 Executor에 Task를 배정해 전체 실행 흐름을 관리하는 시스템이다.

 

Job Scheduling 핵심 구조

Spark 드라이버 내부에서 스케줄링은 다음 구성 요소가 참여해:

  1. Job - 하나의 Action(count(), collect(), save 등)이 실행될 때 생성됨.
  2. Stage - Job은 Shuffle 경계를 기준으로 여러 Stage로 나뉨.
  3. Task - Stage는 여러 Task로 분리되어 Executor에서 병렬로 실행됨.

1) FIFO(First-In First-Out) — 기본 스케줄러

Spark 기본 모드

  • 먼저 제출된 Job이 먼저 실행됨
  • 후순위 Job은 앞 Job이 끝날 때까지 기다림
    (단, Executor가 남아 있으면 병렬 수행 가능)

장점: 단순, 예측 가능
단점: 긴 작업이 있으면 뒤 Job이 오래 기다림

2) FAIR Scheduler — 공정 스케줄링 (협업 환경에서 많이 씀)

여러 사용자가 동시에 Job 실행할 때 사용됨.

  • Job들끼리 Pool(리소스 그룹)로 나누고
  • 각 Pool에 weight 또는 minShare 값을 설정
  • 여러 Job을 동시에 공정하게 병렬 실행

개발자 A job 3개, 개발자 B job 1개 실행
→ Pool weight에 따라 CPU/Executor를 나눠서 공정하게 실행됨

#spark-defaults.conf
spark.scheduler.mode=FAIR

21. Spark Streming

Spark Streaming은 Apache Spark에서 실시간(스트리밍) 데이터 처리를 위한 기능으로 요즘은 주로 Spark Structured Streaming을 의미한다고 보면 된다.

 

  • DataFrame / Dataset 기반
  • SQL, Catalyst Optimizer, AQE 등 Spark SQL 생태계 그대로 사용
  • Structured Streaming은 무한히 들어오는 데이터를 마치 배치 DataFrame처럼 처리하는 엔진

장애 발생 시

Executor 장애 - Task 재실행, state 복구

Driver 장애 - checkpoint 기반 query 재시작, offset / state 복원

from pyspark.sql.functions import window

df = spark.readStream \
    .format("json") \
    .schema("eventTime TIMESTAMP, userId STRING") \
    .load("/logs/events")

result = df \
    .withWatermark("eventTime", "10 minutes") \
    .groupBy(
        window("eventTime", "10 minutes"),
        "userId"
    ) \
    .count()

query = result.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

1) ouputmode 

append mode는 결과가 더 이상 바뀌지 않는 시점이 명확할 때 새로들어온 데이터만.(데이터 중복 x)

update mode는 실시간 집계 중간 결과가 필요할 때 사용한다.(현재까지 집계 상황을 보고 싶을 때)

complete는 테스트나 디버깅 용도로 전체 상태를 한눈에 보고 싶을 때 사용한다.(직관적이지만 리소스 비용이 비쌈)

mode 설명
append 확정된 결과만 새로 추가
update 변경된 결과만 갱신
complete 전체 결과를 매번 다시 출력

2) Window & Watermark

  • Trigger → 언제 실행할지 (시간 / 조건)
  • Window → 어떻게 묶을지 (시간 구간)
  • Watermark → 언제 버릴지 (지연 허용 기준)

Window는 시간 구간으로 데이터를 묶어서 집계하는 방법 ex) 10분 단위로 잘라서 집계

window를 사용할 때 지연이 되면 지연된 데이터 상태를 계속 들고 있어야 해서 oom이 뜰 확률이 높음

 

window의 종류는 tumbling와 sliding 두개가 존재한다.

tumbing은 시간이 겹치지않고 sliding은 overlab시간을 부여한다. 즉 Tumbling Window는 시간을 중복되지 않는, 고정된 크기의 윈도우로 나누고 Sliding Window는 중복이 발생하는 윈도우 방식이다.

from pyspark.sql.functions import window

window_duration = "10 minutes" # tumbing 의미
sliding_duration = "5 minutes" # sliding 의미

df.groupBy(
    window(df.eventTime, window_duration, sliding_duration)
).count()

 

Watermark는 Structured Streaming 전용으로 이 시간보다 늦게 오는 데이터는 안 받겠다 는 선언

df.withWatermark("eventTime", "10 minutes")

 

window와 wtermark를 함께 쓰면 메모리 관리에 용의 함. “Watermark 없는 stateful streaming은 위험하다”

  • Window 상태를 유지
  • watermark 시점 지나면
    • 결과 확정
    • state 정리 (GC)
df \
  .withWatermark("eventTime", "10 minutes") \
  .groupBy(
      window("eventTime", "10 minutes")
  ).count()

 

3) Streaming Fault Tolerance란?

스트리밍 처리 중 장애가 발생해도, 데이터 유실·중복 없이 계속 처리하도록 보장하는 메커니즘Spark Structured Streaming은 장애를 전제로 설계돼 있다. 각 micro-batch를 완전히 처리했을 때만 커밋 , 체크포인트 등 저장해서 재시작 확인 가능

 

체크포인트는 내결함성의 핵심 목표는 시스템 장애 발생 시 데이터 레코드를 누락하거나 중복 없이 정확히 한 번만 처리하는 End-to-End Exactly Once Semantic을 보장하는 것이다.

4) Stateless와 stateful

서버에서 stateless와 stateful은 session이 유지되냐 안되냐의 뜻 으로 통한다. ex) stateless = rest api, stateful = socket

spark에서의 statelsess와 stateful은 연산이 이전 배치(또는 이전 레코드)의 결과를 기억하느냐 기준으로 나뉜다.

 

먼저 statelsess이전 데이터에 대한 상태를 저장하지 않고 배치 / 마이크로배치를 독립적으로 처리한다.

  • 상태 저장 없음
  • 재시작 시 부담 적음
  • 성능, 확장성 좋음
  • Checkpoint 불필요

select, filter, map, flatMap, expolde, append, update ...

df.select("col")
df.filter($"age" > 30)
df.map(...)
df.withColumn("x", $"a" + 1)

 

반면 statefull은 이전 데이터의 결과(상태)를 저장하고 다음 배치에서 누적/참조한다.

  • 상태를 메모리/디스크에 유지
  • Checkpoint 필수
  • 상태 크기 관리 중요 (OOM 위험)
  • 데이터 스큐에 민감

groupBy, count, sum, avg, window, mapGroupsWithState, complete ...

streamDF
  .withWatermark("eventTime", "10 minutes")
  .groupBy(
    window($"eventTime", "5 minutes"),
    $"key"
  )
  .count()

5) streaming join

streaming join에는 2가지가 있다.

(1) streaming dataframe -> static dataframe ( Stream–Batch )

스트리밍-정적 조인은 실시간 스트림에 변화가 적은 기존 데이터를 붙일 때 유용하다. 스트리밍-스트리밍 조인보다 간단하며 워터마크가 필수는 아니다.

 

스트리밍 Outer Join은 매칭되지 않은 데이터도 결과를 내보내야 하므로, 무한정 기다리지 않도록 Watermark와 이벤트 시간 제약이 반드시 필요합니다.

 

설비 이벤트 + 설비 마스터, 센서 로그 + 기준 정보 등에 사용 가능

  • 스트리밍 데이터에 대해 “고정된 기준 테이블”을 조회
  • Static DF는 스냅샷처럼 메모리에 올라간 참조 데이터
  • 거의 Stateless
  • Streaming 쪽의 이전 상태를 기억할 필요 없음
val userInfo = spark.read.parquet("/users")  // Static

val events = spark.readStream
  .format("kafka")
  .load()

events
  .join(userInfo, "userId")
  .select("userId", "eventType", "userName")

2) streaming dataframe -> streaming dataframe ( Stream-Stream )

이벤트 상관 분석에 사용이 자주됨.

두 스트림에서 온 데이터를 조인하려면 한쪽 또는 양쪽 데이터를 상태 저장소에 임시로 저장하고 기다려야 합니다. 이는 상태를 관리하는 Stateful 변환이다.

  • 두 스트림이 시간 축을 기준으로 매칭
  • 양쪽 모두 계속 들어옴 → 상태 유지 필수
  • 강한 Stateful
  • 양쪽 스트림의 과거 데이터 보관
  • Watermark 없으면 상태 무한 증가
val impressions = spark.readStream
  .withWatermark("eventTime", "10 minutes")

val clicks = spark.readStream
  .withWatermark("eventTime", "10 minutes")

impressions.join(
  clicks,
  expr("""
    impressions.adId = clicks.adId AND
    impressions.eventTime BETWEEN
      clicks.eventTime - interval 5 minutes AND
      clicks.eventTime + interval 5 minutes
  """)
)

22. Spark MLlib

Spark MLlib는 Apache Spark에서 제공하는 분산 머신러닝 라이브러리로 RDD / DataFrame 기반으로 대규모 데이터를 분산 처리하며 머신러닝 알고리즘을 제공하는 라이브러리이다.

1) 알고리즘

분류 (Classification)

  • Logistic Regression, Decision Tree, Random Forest, Gradient-Boosted Trees, Naive Bayes

회귀 (Regression)

  • Linear Regression, Generalized Linear Regression, Decision Tree Regressor, Random Forest Regressor

군집 (Clustering)

  • K-Means, Gaussian Mixture

추천 (Recommendation)

  • ALS (Collaborative Filtering)

차원 축소

  • PCA, SVD
lr = LogisticRegression()
model = lr.fit(train_df) # 모델생성

result = model.transform(test_df) # 데이터 변환

2) Feature Engineering

기능 종류
문자열 인코딩 StringIndexer, OneHotEncoder
텍스트 Tokenizer, NGram, TF-IDF
스케일링 StandardScaler, MinMaxScaler
벡터화 VectorAssembler

3) Pipeline

전처리 + 학습 + 예측을 하나의 DAG로 관리

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    tokenizer,
    hashingTF,
    lr
])

model = pipeline.fit(train_df)
pred = model.transform(test_df)

 

728x90