Apache Spark 第 13 章:Real-Time Mode 实时计算

张开发
2026/5/22 7:52:23 15 分钟阅读
Apache Spark 第 13 章:Real-Time Mode 实时计算
本章导读实时计算不是更快的批处理——它是一种全新的数据处理哲学。本章从为什么需要实时出发深入 Spark Streaming 与 Structured Streaming 的架构内核拆解 Watermark、状态管理、触发模式等关键机制最终落地生产级实战配置。13.1 设计思想为什么需要实时计算批处理的天花板想象你经营一家超市每天晚上关门后统计今天的销售额。这就是批处理——数据攒够了再算。但有个问题如果今天下午 3 点某件商品被大量盗窃你要等到晚上 12 点才知道损失已经无法挽回。实时计算解决的核心问题是让数据的价值在产生的瞬间就被捕获和利用。场景批处理的代价实时计算的价值风控反欺诈每小时批处理欺诈交易已经完成毫秒级拦截可疑交易推荐系统T1 更新用户兴趣基于用户当前行为实时推荐监控告警次日才能发现服务异常秒级感知并触发告警实时报表数据有延迟决策滞后管理层看到现在的业务状态Spark 的选择微批 vs 真流Spark 在实时计算领域走了一条独特的路它没有像 Flink 那样从零构建流式引擎而是选择了微批Micro-Batch——把无限的流切成一个个极小的批次用批处理引擎高速处理。这个选择带来了✅天然的 Exactly-Once 语义批处理的事务性直接继承✅极高的吞吐量批处理优化成熟⚠️延迟下限约 100ms真正的毫秒级延迟需要用 Continuous Processing 模式实战口诀Spark 流 极速批处理的流式包装吞吐优先选 Spark延迟优先选 Flink。13.2 核心架构全景两代引擎的对比Spark 提供了两套实时计算 API理解它们的关系是入门的第一步。下图展示了 Spark 实时计算的整体架构全景包括两代引擎与数据流向架构核心结论两代引擎选新不选旧Spark StreamingDStream API已进入维护模式Structured Streaming 是官方推荐的唯一生产级选择。它底层复用了批处理的 Catalyst Tungsten 优化栈性能更强、语义更清晰。状态是实时计算的心脏所有有意义的流式计算聚合、Join、去重都需要维护状态。State Store 的选型RocksDB vs 内存直接决定了任务的稳定性上限。Checkpoint 是生命线没有 Checkpoint任何实时任务在重启后都会从头开始数据要么丢失要么重复。实战口诀新项目只用 Structured StreamingState 用 RocksDBCheckpoint 必须配。13.3 第一代引擎Spark Streaming 与 DStream虽然 DStream 已是遗产理解它能帮助你看清 Structured Streaming 解决了哪些痛点。DStreamDiscretized Stream离散化流的核心思想是把时间轴切成等长的批次窗口每个窗口内的数据形成一个 RDD。DStream 核心结论DStream 本质是 RDD 的时间序列每个 batchDuration 窗口内的数据形成一个 RDDDStream 就是这些 RDD 按时间排列的集合。你对 DStream 的map、filter操作实际上是对每个 RDD 做相同操作。batchDuration 是一把双刃剑设得小如 500ms延迟低但调度开销大设得大如 10s吞吐高但实时性差。这个矛盾无法根本解决是 DStream 的结构性缺陷。没有原生的事件时间支持DStream 只知道数据到达 Spark 的时间处理时间无法正确处理乱序数据这是它被 Structured Streaming 取代的核心原因之一。实战口诀DStream 看懂即可生产环境不要用面试能讲清楚它解决了什么又留下了什么坑就够了。13.4 第二代引擎Structured Streaming 核心架构Structured Streaming 的核心设计哲学是把流式数据看作一张无界的表Unbounded Table每批新数据就是追加到这张表的新行所有对静态 DataFrame 的 SQL 操作都可以直接用于流式处理。无界表模型无界表模型核心结论流是表的语法糖Structured Streaming 最聪明的设计是让开发者用写批处理 SQL 的方式写流处理。df.writeStream.start()和df.write.save()在语法上几乎相同极大降低了学习成本。三种输出模式决定数据语义Append只输出新增行不可变行适合简单转换Update只输出本次批次中被更新的行适合聚合Complete每次触发都输出全部结果表适合小结果集排序Result Table 是逻辑概念Spark 不会真的把全表存在内存里它只维护产生 Result Table 所需的最小状态。实战口诀Structured Streaming 批处理 SQL 触发器 状态管理三位一体缺一不可。13.5 关键机制一Watermark水印与乱序处理乱序数据是流式计算的头号敌人。手机弱网环境下产生的事件可能延迟几分钟甚至几小时才到达 Kafka。没有 Watermark你永远不知道一个时间窗口是否已经收到了所有数据。类比Watermark 就像快递的截单时间——快递公司说今天 23:59 之前下单的算今日订单。晚于这个时间的单子就算进明天。Watermark 告诉 Spark“比水位线更早的数据即使迟到也不再接受。”Watermark 核心结论Watermark 事件时间最大值 − 容忍延迟这是一个滑动下界它告诉 Spark“比我更早的数据我不再等了。” 窗口结束时间小于 Watermark 时该窗口关闭并输出结果。延迟参数是业务决策不是技术参数withWatermark(event_time, 2 minutes)意味着你愿意牺牲 2 分钟的结果延迟换取对最多 2 分钟乱序数据的容忍。延迟设得越大内存中需要维护的待处理窗口越多。超过 Watermark 的数据会被静默丢弃这是 Watermark 的代价生产中必须监控丢弃率指标numRowsDroppedByWatermark。实战口诀水印延迟 业务能接受的等迟到同学的时长设太短丢数据设太长压内存。13.6 关键机制二触发模式Trigger触发模式控制 Structured Streaming何时处理数据是调节延迟与吞吐的核心旋钮。触发模式核心结论90% 的生产任务用ProcessingTime指定一个合理的间隔如10 seconds或1 minute在延迟与系统开销之间取得平衡。间隔越短Kafka offset commit 越频繁Driver 调度压力越大。AvailableNow是新时代的调度流它让你用 Structured Streaming 的 Checkpoint 语义替代传统的批处理调度Airflow SparkSQL实现从上次停下来的地方继续非常适合数仓增量更新场景。Continuous不要在聚合场景使用它的限制极多生产级毫秒延迟请考虑迁移到 Apache Flink。实战口诀实时流选 ProcessingTime 合理间隔调度批选 AvailableNowContinuous 只是个 demo。13.7 关键机制三状态管理与 State Store有状态操作Stateful Operations是区分玩具级和生产级流处理任务的分水岭。任何需要跨批次记住信息的操作都是有状态的。常见的有状态操作窗口聚合groupBy window流-流 JoindeduplicateWithinWatermark去重mapGroupsWithState/flatMapGroupsWithState自定义状态状态管理核心结论没有 Watermark 的有状态操作是定时炸弹状态永远不会被清理随着时间推移必然导致 OOM内存溢出。无论如何有状态操作必须配合withWatermark。大状态必须用 RocksDB超过 10GB 的状态用内存 HashMapState 必然 OOMRocksDB 把状态落到本地磁盘并做增量 Checkpoint内存占用降低 80% 以上。状态与 Partition 强绑定每个 Executor 的每个 Partition 维护独立的 State Store这意味着repartition会破坏状态关联。改变并行度前必须通过 Checkpoint 重置。实战口诀有状态必配 Watermark大状态必用 RocksDB改并行度必须重置 Checkpoint。13.8 新老对比Spark Streaming vs Structured Streaming13.9 横向对比Spark Structured Streaming vs Apache Flink生产中常见的选型困惑什么时候用 Spark什么时候用 Flink维度Spark Structured StreamingApache Flink处理模型微批默认/ 连续实验性真流式事件驱动延迟100ms ~ 秒级毫秒级吞吐量极高批处理优化成熟高略低于 Spark 大批次状态管理RocksDB State StoreRocksDB更成熟SQL 支持Spark SQL完整Flink SQL功能追赶中生态整合与 Spark 批处理无缝衔接独立生态运维复杂度中依托 Spark 集群高独立 JobManager/TaskManager适用场景吞吐优先、与批处理混合延迟优先、复杂事件处理(CEP)选型口诀延迟 100ms 选 Flink其他大多数场景 Spark 更省心。13.10 生产实战完整可运行示例场景描述从 Kafka 消费用户行为日志统计每 5 分钟内每个用户的点击次数结合 Watermark 处理 2 分钟内的乱序数据将结果写入 Kafka 和控制台。依赖配置build.sbt / pom.xml// build.sbtlibraryDependenciesSeq(org.apache.spark%%spark-sql%3.5.0,org.apache.spark%%spark-sql-kafka-0-10%3.5.0,// ⭐ 生产必须RocksDB State Storeorg.apache.spark%%spark-sql-kafka-0-10%3.5.0)完整代码示例importorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.functions._importorg.apache.spark.sql.streaming.{OutputMode,Trigger}importorg.apache.spark.sql.types._objectUserClickAggregation{defmain(args:Array[String]):Unit{// 1. SparkSession 配置 valsparkSparkSession.builder().appName(UserClickRealtime)// ⭐ 生产关键启用 RocksDB State Store避免大状态 OOM.config(spark.sql.streaming.stateStore.providerClass,org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider)// ⭐ RocksDB 增量 Checkpoint减少 S3/HDFS 写入量.config(spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled,true)// ⭐ 调整 shuffle partition 数避免状态碎片化根据数据量调整.config(spark.sql.shuffle.partitions,200)// ❌ 错误写法: 不配置 stateStore.providerClass默认 HashMap大状态必 OOM.getOrCreate()importspark.implicits._// 2. 定义 Kafka 消息的 Schema // 原始消息格式: {user_id:u001,event_time:2024-01-15T10:00:01,action:click}valeventSchemaStructType(Seq(StructField(user_id,StringType,nullablefalse),StructField(event_time,TimestampType,nullablefalse),StructField(action,StringType,nullabletrue)))// 3. 读取 Kafka Source valrawStreamspark.readStream.format(kafka)// ⭐ 生产建议: 指定 assign 而非 subscribe精确控制 partition 消费.option(kafka.bootstrap.servers,kafka-broker-1:9092,kafka-broker-2:9092).option(subscribe,user-events)// ⭐ 限速防止 Kafka 积压时突发大批次打垮下游.option(maxOffsetsPerTrigger,500000)// 首次启动从最新位置开始生产常用.option(startingOffsets,latest)// ⭐ Kafka 消费者配置指定消费者组.option(kafka.group.id,spark-realtime-click).load()// 4. 解析 JSON 消息 valparsedStreamrawStream.selectExpr(CAST(value AS STRING) as json_str,timestamp as kafka_ts).withColumn(data,from_json($json_str,eventSchema))// 展开嵌套字段.select($data.user_id,$data.event_time,// 使用业务事件时间而非 Kafka 入队时间$data.action,$kafka_ts// 保留 Kafka 时间用于监控)// ❌ 错误做法: 用 kafka_ts处理时间做窗口无法正确处理乱序// ✅ 正确做法: 用业务 event_time 做窗口// 5. 核心处理Watermark 窗口聚合 valaggregatedparsedStream// ⭐ 关键配置2分钟的 Watermark 延迟// 含义允许最多 2 分钟迟到的数据超时丢弃.withWatermark(event_time,2 minutes)// 过滤只统计 click 行为.filter($actionclick)// ⭐ 滑动窗口5分钟窗口每1分钟滑动一次// 如果只需要滚动窗口把第三个参数去掉即可.groupBy(window($event_time,5 minutes,1 minute),$user_id).agg(count(*).as(click_count),min($event_time).as(first_event_time),max($event_time).as(last_event_time))// 展开 window struct 便于下游使用.select($window.start.as(window_start),$window.end.as(window_end),$user_id,$click_count,$first_event_time,$last_event_time)// 6. 写出到 Kafka valkafkaSinkaggregated// 构造 Kafka valueJSON 格式.select(// ⭐ Kafka key user_id保证同一用户数据在同一 partition有序$user_id.as(key),to_json(struct($*)).as(value)).writeStream.format(kafka).option(kafka.bootstrap.servers,kafka-broker-1:9092,kafka-broker-2:9092).option(topic,user-click-aggregated)// ⭐ 必须配置Checkpoint 路径保证 Exactly-Once 和断点续传.option(checkpointLocation,s3a://my-bucket/checkpoints/user-click-agg)// ⭐ Update 模式只输出本批次中被更新的结果行窗口聚合的最佳选择// ❌ 错误: Append 模式不能与有聚合的窗口查询配合除非有 Watermark 且窗口已关闭.outputMode(OutputMode.Update())// ⭐ 每 30 秒触发一次平衡延迟与调度开销.trigger(Trigger.ProcessingTime(30 seconds)).start()// 7. 同时写到控制台开发调试用// ❌ 生产不要保留 Console Sink仅用于本地调试valconsoleSinkaggregated.writeStream.format(console).option(truncate,false).option(numRows,20).outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime(30 seconds)).start()// 8. 等待任务结束 // awaitAnyTermination: 任一 stream 失败即退出spark.streams.awaitAnyTermination()}}常见错误配置对比// // ❌ 错误 1: 忘记配置 Checkpoint// df.writeStream.format(kafka)// 没有 checkpointLocation// 后果重启后重复消费、状态丢失、无法保证 Exactly-Once.start()// ✅ 正确必须指定 checkpointLocationdf.writeStream.format(kafka).option(checkpointLocation,hdfs:///checkpoints/my-job).start()// // ❌ 错误 2: 有状态操作不配 Watermark// df.groupBy(window($event_time,5 minutes),// 有窗口聚合$user_id).count()// 后果状态永远不会被清理运行几小时后 OOM// ✅ 正确有状态操作必须先设 Watermarkdf.withWatermark(event_time,2 minutes).groupBy(window($event_time,5 minutes),$user_id).count()// // ❌ 错误 3: Output Mode 与操作不匹配// df.withWatermark(event_time,2 minutes).groupBy(window($event_time,5 minutes)).count().writeStream.outputMode(OutputMode.Append())// ❌ 聚合Watermark 不能用 Append// 错误信息: Append output mode not supported when there are streaming aggregations// ✅ 正确聚合操作用 Update 或 Complete.outputMode(OutputMode.Update())// // ❌ 错误 4: 在生产环境使用 Trigger.Once()// df.writeStream.trigger(Trigger.Once())// 问题Spark 3.3 已标记为 deprecated大积压时单批次超时风险高// ✅ 正确Spark 3.3 使用 AvailableNow.trigger(Trigger.AvailableNow())// // ❌ 错误 5: 忘记限速Kafka 积压时 OOM// spark.readStream.format(kafka).option(subscribe,events)// 没有 maxOffsetsPerTrigger积压 1 亿条数据时第一批全部加载 → OOM// ✅ 正确根据任务处理能力设置合理的限速.option(maxOffsetsPerTrigger,500000).load()13.11 排障指南常见问题排查思路问题 1任务运行一段时间后越来越慢Batch Duration 持续上升排查路径 1. Spark UI → Streaming tab → 查看 Input Rate vs Processing Rate - 如果 Processing Rate Input Rate → 消费跟不上生产需要扩容或降低计算复杂度 2. 查看 State Operators 标签 → stateMemoryUsedBytes 是否持续增长 - 如果是 → Watermark 没有正确配置或状态没有被清理 3. 检查 GC 时间 → Executor GC time 10% → 考虑切换到 RocksDB State Store问题 2重启后数据重复或丢失原因Checkpoint 配置有误或 Checkpoint 目录被删除 解决 - 确认 checkpointLocation 已配置且持久化到可靠存储HDFS/S3 - 确认 Sink 支持幂等写入Kafka 事务 Producer或数据库 upsert - 如果是 Schema 变更导致 Checkpoint 不兼容需要手动删除 Checkpoint 重置问题 3Watermark 不推进窗口迟迟不关闭原因某个 partition 的数据停止了Kafka partition 上没有新消息 解决 - 检查上游 Kafka 各 partition 的消费 lag - 设置 spark.sql.streaming.watermarkDelayMs强制推进水印的超时 - 或给每个 partition 定期发送心跳事件问题 4Exactly-Once 语义失效数据重复原因Sink 不支持幂等写入或 Checkpoint 与 Sink 状态不一致 解决 - Kafka Sink启用事务 Producer .option(kafka.transactional.id, spark-streaming-txn-001) - 数据库 Sink使用 upsert/merge 代替 insert - 文件 Sink使用 ForeachBatch 临时文件 原子 rename13.12 本章总结核心认知地图 实时计算本质 ├── 不是更快的批处理 └── 是数据价值的即时捕获 Spark 实时架构 ├── Spark Streaming (DStream) → 维护模式了解即可 └── Structured Streaming → 唯一的生产选择 ├── 无界表模型流 表的增量追加 ├── Watermark乱序容忍的截止时间 ├── Trigger何时处理的旋钮 └── State Store跨批次记忆的载体 生产三要素 ├── Checkpoint生命线 ├── Watermark乱序处理 └── RocksDB State Store大状态稳定性 选型原则 ├── 吞吐优先 / 与批处理混合 → Spark Structured Streaming └── 延迟 100ms / CEP → Apache Flink本章终极口诀新项目选 Structured状态用 RocksDBWatermark 防乱序Checkpoint 保语义延迟靠 Trigger 调吞吐靠并行控有状态必有 WM无 WM 必有 OOM。

更多文章