728x90
Streaming 데이터는 종종 지연(late arrival)이 발생한다.
예를 들어 네트워크 지연, 데이터 지연 수집 등. 이런 환경에서 Watermark는 얼마나 오래 기다릴지 설정해주는 기준이다.
워터마크(Watermark)란?
- event_time 컬럼을 기준으로
- 예: “10분 워터마크” 설정 → = `withWatermark("event_time", "10 minutes")`
- 이 기준 안에 들어온 데이터만 확실히 처리하고,
- 기준보다 늦게 온 데이터는 처리 보장하지 않음
즉, +10분까지는 버리고 기다려주고, 이후엔 마감하고 간편하게 상태를 정리하는 개념이다.
Window 기반 스트리밍과 Watermark
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
- 5분마다 집계(`window`)하고,
- 10분 워터마크 = 창 끝나고 10분 더 기다려줌.
- 10분 넘은 상태(window보다 늦게 오는 데이터) → 무시되고 상태 정리됨.
Stream-to-Stream Join에서의 Watermark 동작
Stream과 Stream을 조인할 때는 양쪽 스트림 모두에 watermark 설정이 필요하며, 시간 조건이 포함된 join이어야 한다.
# 왼쪽 스트림
streamA = spark.readStream.format("...").load() \
.withWatermark("event_time", "10 minutes")
# 오른쪽 스트림
streamB = spark.readStream.format("...").load() \
.withWatermark("event_time", "20 minutes")
# 조인
joined = streamA.join(
streamB,
expr("""
streamA.id = streamB.id AND
streamA.event_time BETWEEN streamB.event_time - interval 15 minutes AND streamB.event_time + interval 5 minutes
""")
)
이 예시에서는 streamA의 워터마크는 10분, streamB의 워터마크는 20분이며, 조건을 만족하는 레코드만 join 결과에 포함된다.</p
⚙️ 워터마크와 출력 모드의 관계
Output Mode | 설명 | 상태 정리 시점 |
---|---|---|
Append | 완료된 창만 출력 | 워터마크 이후 |
Update | 변경된 자료를 계속 업데이트 | 워터마크 이후 불필요한 상태 정리 |
Complete | 모든 집계값 계속 출력 | 워터마크와 상관없이 모든 상태 유지 |
→ 워터마크는 특히 Append / Update 모드에서 상태 정리와 처리 타이밍을 관리한다.
- Append 모드
stream_df = spark.readStream.format("...").load() \
.withWatermark("event_time", "10 minutes")
agg_df = stream_df.groupBy(
window("event_time", "5 minutes")
).count()
query = agg_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
window 시작 | window 종료 | count | 출력 시점 |
---|---|---|---|
12:00 | 12:05 | 100 | 워터마크 이후 (예: 12:15) |
👉 집계 윈도우가 완전히 끝난 후에만 결과가 출력된다.
- Update 모드
stream_df = spark.readStream.format("...").load() \
.withWatermark("event_time", "10 minutes")
agg_df = stream_df.groupBy(
window("event_time", "5 minutes")
).count()
query = agg_df.writeStream \
.outputMode("update") \
.format("console") \
.start()
window 시작 | window 종료 | count | 출력 시점 |
---|---|---|---|
12:00 | 12:05 | 10 | 이벤트 수신 시 즉시 |
12:00 | 12:05 | 50 | 중간에 계속 업데이트 |
12:00 | 12:05 | 100 | 최종 결과 |
👉 집계 결과가 바뀔 때마다 지속적으로 출력된다.
Watermark 가 중요한 이유
- 상태가 무한정 커지지 않도록 메모리·성능 최적화
- 지연 데이터 처리를 일정 수준으로 보장, 너무 늦은 건 잘라냄
- 지연 vs 성능의 트레이드오프 조정 가능
요약
withWatermark("timeCol", "X minutes")
로 “X분 동안 기다렸다가” 상태 정리를 함- 지연 데이터는 X분 내에 들어와야만 집계에 포함됨
- "언제 데이터를 방출할지", "언제 상태를 제거할지"를 자동으로 관리 → 지속적 Streaming 쿼리 안정성 향상
참고 자료
https://docs.databricks.com/aws/en/structured-streaming/watermarks
'Data Engineering > Databricks' 카테고리의 다른 글
[databricks] spark UI Summary Metrics 분석 (0) | 2025.06.10 |
---|---|
[databricks] Dynamic View, Access Control (0) | 2025.06.09 |
[databricks] 파일 최적화(자동 최적화와 수동 최적화) (0) | 2025.06.09 |
[databricks] 테이블 복사(CTAS , Shallow Clone, Deep Clone) (1) | 2025.06.09 |
[databricks]Change Data Capture(CDC), Change Data Feed(CDF) (1) | 2025.06.05 |