[databricks] Watermark
A A
728x90

 

Streaming 데이터는 종종 지연(late arrival)이 발생한다.

예를 들어 네트워크 지연, 데이터 지연 수집 등. 이런 환경에서 Watermark는 얼마나 오래 기다릴지 설정해주는 기준이다.

워터마크(Watermark)란?
  • event_time 컬럼을 기준으로
  • 예: “10분 워터마크” 설정 → = `withWatermark("event_time", "10 minutes")`
  • 이 기준 안에 들어온 데이터만 확실히 처리하고,
  • 기준보다 늦게 온 데이터는 처리 보장하지 않음

즉, +10분까지는 버리고 기다려주고, 이후엔 마감하고 간편하게 상태를 정리하는 개념이다.

 

Window 기반 스트리밍과 Watermark
from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)
  • 5분마다 집계(`window`)하고,
  • 10분 워터마크 = 창 끝나고 10분 더 기다려줌.
  • 10분 넘은 상태(window보다 늦게 오는 데이터) → 무시되고 상태 정리됨.
Stream-to-Stream Join에서의 Watermark 동작

Stream과 Stream을 조인할 때는 양쪽 스트림 모두에 watermark 설정이 필요하며, 시간 조건이 포함된 join이어야 한다.

# 왼쪽 스트림
streamA = spark.readStream.format("...").load() \
    .withWatermark("event_time", "10 minutes")

# 오른쪽 스트림
streamB = spark.readStream.format("...").load() \
    .withWatermark("event_time", "20 minutes")

# 조인
joined = streamA.join(
    streamB,
    expr("""
      streamA.id = streamB.id AND
      streamA.event_time BETWEEN streamB.event_time - interval 15 minutes AND streamB.event_time + interval 5 minutes
    """)
)

이 예시에서는 streamA의 워터마크는 10분, streamB의 워터마크는 20분이며, 조건을 만족하는 레코드만 join 결과에 포함된다.</p

⚙️ 워터마크와 출력 모드의 관계
Output Mode 설명 상태 정리 시점
Append 완료된 창만 출력 워터마크 이후
Update 변경된 자료를 계속 업데이트 워터마크 이후 불필요한 상태 정리
Complete 모든 집계값 계속 출력 워터마크와 상관없이 모든 상태 유지

→ 워터마크는 특히 Append / Update 모드에서 상태 정리와 처리 타이밍을 관리한다.

 

  •  Append 모드
stream_df = spark.readStream.format("...").load() \
    .withWatermark("event_time", "10 minutes")

agg_df = stream_df.groupBy(
    window("event_time", "5 minutes")
).count()

query = agg_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
window 시작 window 종료 count 출력 시점
12:00 12:05 100 워터마크 이후 (예: 12:15)

👉 집계 윈도우가 완전히 끝난 후에만 결과가 출력된다.

  •  Update 모드
stream_df = spark.readStream.format("...").load() \
    .withWatermark("event_time", "10 minutes")

agg_df = stream_df.groupBy(
    window("event_time", "5 minutes")
).count()

query = agg_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()
window 시작 window 종료 count 출력 시점
12:00 12:05 10 이벤트 수신 시 즉시
12:00 12:05 50 중간에 계속 업데이트
12:00 12:05 100 최종 결과

👉 집계 결과가 바뀔 때마다 지속적으로 출력된다.

Watermark 가 중요한 이유
  • 상태가 무한정 커지지 않도록 메모리·성능 최적화
  • 지연 데이터 처리를 일정 수준으로 보장, 너무 늦은 건 잘라냄
  • 지연 vs 성능의 트레이드오프 조정 가능
요약 
  • withWatermark("timeCol", "X minutes") 로 “X분 동안 기다렸다가” 상태 정리를 함
  • 지연 데이터는 X분 내에 들어와야만 집계에 포함됨
  • "언제 데이터를 방출할지", "언제 상태를 제거할지"를 자동으로 관리 → 지속적 Streaming 쿼리 안정성 향상
참고 자료

https://docs.databricks.com/aws/en/structured-streaming/watermarks

Copyright 2024. GRAVITY all rights reserved