1 Star 0 Fork 0

CSOpenTech/bigdatatech

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Spark 串接流输出数据计算

  1. 失败的 Task 不会发出累加器
  2. 同一个Stage中的不同Transform中的累加器,是同步更新的

原理

在串接流中的每个模块运行的结束后面,添加一个 Accum 收集。

  • 收集总的分区数,以及每个分区数的处理数据的量

如何避免重算:

  • 失败的 Task 不会发送累加器的信息
  • 成功的 Task 当超过分区总数时(即RDD重新执行),不会累加

如何避免多次计算:

  • 如果是Stage 多次计算,因为会触发所有分区的整体重算,因此只需要记录总分区数即可;

  • 对于 take, first 等 action,可能会触发部分分区的计算,只记录整体分区数会出错;

  • Input + Map + SortByKey + Map + Output 的显示结果

    • Input 和 Map 被计算两次,因为 SortByKey 会触发一次计算 RangePartition 导致的 Action;
    • Output 没有,因为是在计算出来 OutRDD 的后面,再进行更新 Accum
      • Output 执行,是进行 Write,无法无感知做更新 Accum(硬码)
watch: 1@MapModule/(16,0), 0@InputModule/(16,0)
... ...
###############Array((1,[I@416d56f2), (2,[I@6e31d989))
... ... (触发个别分区的计算,会累计在内,当 BitSet 只计一次)
watch: 1@MapModule/(8,4), 0@InputModule/(8,4)
watch: 1@MapModule/(8,6), 0@InputModule/(8,6)
watch: 1@MapModule/(8,15), 0@InputModule/(8,15)
watch: 1@MapModule/(8,21), 0@InputModule/(8,21)
watch: 1@MapModule/(8,28), 0@InputModule/(8,28)
... ...(重算,总 Tasks数,超过分区数)
watch: 1@MapModule/(8,36), 2@ShuffleModule/(8,0), 0@InputModule/(8,36)
watch: 1@MapModule/(8,36), 2@ShuffleModule/(8,0), 0@InputModule/(8,36)
watch: 1@MapModule/(8,36), 2@ShuffleModule/(8,0), 0@InputModule/(8,36)
... ...(重算结束,继续执行新的模块)
watch: 1@MapModule/(8,36), 3@MapModule/(8,0), 2@ShuffleModule/(8,0), 0@InputModule/(8,36)
watch: 1@MapModule/(8,36), 3@MapModule/(8,9), 2@ShuffleModule/(8,9), 0@InputModule/(8,36)
watch: 1@MapModule/(8,36), 3@MapModule/(8,18), 2@ShuffleModule/(8,18), 0@InputModule/(8,36)
watch: 1@MapModule/(8,36), 3@MapModule/(8,23), 2@ShuffleModule/(8,23), 0@InputModule/(8,36)
... ...
watch: 1@MapModule/(8,36), 3@MapModule/(8,36), 2@ShuffleModule/(8,36), 0@InputModule/(8,36)


马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Scala
1
https://gitee.com/oscsc/bigdatatech.git
git@gitee.com:oscsc/bigdatatech.git
oscsc
bigdatatech
bigdatatech
master

搜索帮助