当前位置: 首页 > news >正文

Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九) ...

Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)

配置

  • The default number of partitions to use when shuffling data for joins or aggregations.
spark.sql.shuffle.partitions=200

QueryExecution.executedPlan

  • 调用 QueryExecution.prepareForExecution
 // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

QueryExecution.prepareForExecution

  • 调用函数 QueryExecution.preparations
  /**
   * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
   * row format conversions as needed.
   */
  protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
    preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
  }

QueryExecution.preparations

  • 调用EnsureRequirements.apply
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
    PlanSubqueries(sparkSession),
    EnsureRequirements(sparkSession.sessionState.conf),
    CollapseCodegenStages(sparkSession.sessionState.conf),
    ReuseExchange(sparkSession.sessionState.conf),
    ReuseSubquery(sparkSession.sessionState.conf))

EnsureRequirements.apply

  • 调用 EnsureRequirements.ensureDistributionAndOrdering
  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
    // TODO: remove this after we create a physical operator for `RepartitionByExpression`.
    case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
      child.outputPartitioning match {
        case lower: HashPartitioning if upper.semanticEquals(lower) => child
        case _ => operator
      }
    case operator: SparkPlan =>
      ensureDistributionAndOrdering(reorderJoinPredicates(operator))
  }

EnsureRequirements.ensureDistributionAndOrdering

  • SQLConf,默认sql.shuffle的并行度为200,可以通过spark.sql.shuffle.partitions来进行配置
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
  val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
    .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
    .intConf
    .createWithDefault(200)
  • defaultPartitioning
private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
  • val numPartitions = distribution.requiredNumPartitions

          .getOrElse(defaultNumPreShufflePartitions) 得到numPartitions
  • 调用ShuffleExchangeExec(defaultPartitioning, c)
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
    val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
    val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
    var children: Seq[SparkPlan] = operator.children
    assert(requiredChildDistributions.length == children.length)
    assert(requiredChildOrderings.length == children.length)

    // Ensure that the operator's children satisfy their output distribution requirements.
    children = children.zip(requiredChildDistributions).map {
      case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
        child
      case (child, BroadcastDistribution(mode)) =>
        BroadcastExchangeExec(mode, child)
      case (child, distribution) =>
        val numPartitions = distribution.requiredNumPartitions
          .getOrElse(defaultNumPreShufflePartitions)
        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
    }

    // Get the indexes of children which have specified distribution requirements and need to have
    // same number of partitions.
    val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
      case (UnspecifiedDistribution, _) => false
      case (_: BroadcastDistribution, _) => false
      case _ => true
    }.map(_._2)

    val childrenNumPartitions =
      childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet

    if (childrenNumPartitions.size > 1) {
      // Get the number of partitions which is explicitly required by the distributions.
      val requiredNumPartitions = {
        val numPartitionsSet = childrenIndexes.flatMap {
          index => requiredChildDistributions(index).requiredNumPartitions
        }.toSet
        assert(numPartitionsSet.size <= 1,
          s"$operator have incompatible requirements of the number of partitions for its children")
        numPartitionsSet.headOption
      }

      val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)

      children = children.zip(requiredChildDistributions).zipWithIndex.map {
        case ((child, distribution), index) if childrenIndexes.contains(index) =>
          if (child.outputPartitioning.numPartitions == targetNumPartitions) {
            child
          } else {
            val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
            child match {
              // If child is an exchange, we replace it with a new one having defaultPartitioning.
              case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
              case _ => ShuffleExchangeExec(defaultPartitioning, child)
            }
          }

        case ((child, _), _) => child
      }
    }

    // Now, we need to add ExchangeCoordinator if necessary.
    // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
    // However, with the way that we plan the query, we do not have a place where we have a
    // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
    // at here for now.
    // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
    // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
    children = withExchangeCoordinator(children, requiredChildDistributions)

    // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
    children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
      // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
      if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
        child
      } else {
        SortExec(requiredOrdering, global = false, child = child)
      }
    }

    operator.withNewChildren(children)
  }

end

相关文章:

  • CentOS6 Shell脚本/bin/bash^M: bad interpreter错误解决方法
  • 搭建gitbook 和 访问权限认证
  • 测试开发系类之接口自动化测试
  • Chrome 控制台报错Unchecked runtime.lastError: The message port closed before a response was received...
  • 读vue源码看前端百态2--打包工具
  • NoSQL是什么?
  • [ES-5.6.12] x-pack ssl
  • 20190220w
  • 怎么将电脑中的声音录制成WAV格式
  • 你的微博也被盗赞?试试HSTS强制HTTPS加密
  • Linux或UNIX系统配置检查
  • NutzWk 5.1.5 发布,Java 微服务分布式开发框架
  • 17-成员访问权限
  • 警报:线上事故之CountDownLatch的威力
  • Linux基金会施行安全关键系统打造共享工具、流程
  • 【407天】跃迁之路——程序员高效学习方法论探索系列(实验阶段164-2018.03.19)...
  • 【个人向】《HTTP图解》阅后小结
  • 【前端学习】-粗谈选择器
  • Date型的使用
  • Golang-长连接-状态推送
  • Hibernate【inverse和cascade属性】知识要点
  • JavaSE小实践1:Java爬取斗图网站的所有表情包
  • XML已死 ?
  • 创建一个Struts2项目maven 方式
  • 番外篇1:在Windows环境下安装JDK
  • 观察者模式实现非直接耦合
  • 每天10道Java面试题,跟我走,offer有!
  • 区块链将重新定义世界
  • 使用common-codec进行md5加密
  • 试着探索高并发下的系统架构面貌
  • 推荐一个React的管理后台框架
  • 小而合理的前端理论:rscss和rsjs
  • 异步
  • 再次简单明了总结flex布局,一看就懂...
  • Hibernate主键生成策略及选择
  • kubernetes资源对象--ingress
  • ​第20课 在Android Native开发中加入新的C++类
  • # include “ “ 和 # include < >两者的区别
  • #pragam once 和 #ifndef 预编译头
  • #Spring-boot高级
  • (02)Hive SQL编译成MapReduce任务的过程
  • (C语言)fgets与fputs函数详解
  • (附源码)springboot学生选课系统 毕业设计 612555
  • (附源码)springboot助农电商系统 毕业设计 081919
  • (十八)devops持续集成开发——使用docker安装部署jenkins流水线服务
  • (五)关系数据库标准语言SQL
  • (续)使用Django搭建一个完整的项目(Centos7+Nginx)
  • (源码版)2024美国大学生数学建模E题财产保险的可持续模型详解思路+具体代码季节性时序预测SARIMA天气预测建模
  • .NET 3.0 Framework已经被添加到WindowUpdate
  • .net core 控制台应用程序读取配置文件app.config
  • .net core使用RPC方式进行高效的HTTP服务访问
  • .Net 代码性能 - (1)
  • .NET 的程序集加载上下文
  • .Net 高效开发之不可错过的实用工具
  • .net 设置默认首页