DAG(Directed Acyclic Graph,有向无环图)是Spark等大数据处理框架的核心概念。
1. 数学定义与特性
图论基础
有向性:边(Edge)具有方向(如A→B)
无环性:不存在循环路径(如A→B→C→A)
拓扑排序:所有顶点排成线性序列,满足前驱关系
关键性质
顶点数V,边数E满足:E ≤ V(V-1)/2
至少存在一个入度为0的顶点(起点)
至少存在一个出度为0的顶点(终点)
2. 在Spark中的实现
DAG生成流程(Scheduler是调度程序)
Stage划分规则
依赖类型示例操作Stage划分影响窄依赖map, filter合并到同一Stage宽依赖reduceByKey触发新Stage划分
示例:WordCount的DAG结构
val text = sc.textFile("hdfs://data") // Stage0
val words = text.flatMap(_.split(" ")) // Stage0
val pairs = words.map((_, 1)) // Stage0
val counts = pairs.reduceByKey(_ + _) // Stage1(宽依赖)
counts.saveAsTextFile("hdfs://output") // Stage1
3. 优化原理与优势
对比MapReduce模型
特性MapReduce模型Spark DAG模型任务调度多阶段独立调度全局优化调度中间数据存储必须写磁盘内存优先执行效率高延迟(秒级)低延迟(毫秒级)复杂算法支持迭代计算效率低适合迭代和交互式查询
优化技术
流水线执行(Pipelining):
合并连续的窄依赖操作,避免中间结果落盘
任务合并:
// 优化前:两个Stage
rdd.map(f1).groupByKey().map(f2)
// 优化后:合并操作
rdd.map(record => f2(f1(record)))
数据本地化调度:
优先将Task调度到数据所在节点
本地化级别:PROCESS_LOCAL → NODE_LOCAL → RACK_LOCAL → ANY
4. 容错机制
血统(Lineage)恢复
val rdd = sc.textFile("hdfs://data")
.map(parse) // Lineage记录1
.filter(_.isValid) // Lineage记录2
.cache()
丢失分区时,根据血统重新计算
通过checkpoint()切断过长血统链
Checkpoint机制
spark.sparkContext.setCheckpointDir("hdfs://checkpoint")
rdd.checkpoint()
将RDD持久化到可靠存储
常用于迭代算法(如PageRank)
5. 企业级应用
性能调优指标
指标健康范围调优手段DAG深度<10合并窄依赖操作Shuffle写数据量<1TB/Stage增加分区数或优化业务逻辑Stage并行度≥集群核心数调整spark.default.parallelism
诊断命令
# 查看DAG可视化
http://driver-node:4040/stages/
# 导出DAG事件日志
spark-submit --conf spark.eventLog.enabled=true ...
6. 扩展应用场景
机器学习:
val model = MLPipeline.fit(trainingData) // 生成优化后的DAG
流处理:
val stream = spark.readStream.format("kafka")...
val query = stream.groupBy(window($"time")).count()
图计算:
val graph = GraphLoader.edgeListFile(sc, "hdfs://edges")
val ranks = graph.pageRank(tol=0.01)
理解DAG的重要性体现在:
执行优化:通过分析依赖关系实现最优调度
故障恢复:精确控制重新计算范围
资源利用:最大化并行度同时减少网络传输
开发指导:帮助开发者设计高效的数据处理流程
这种基于DAG的执行模型,使得Spark能够在大规模数据处理中实现比传统MapReduce快100倍的性能,成为现代大数据生态系统的核心引擎。