From 6ef2337f4872a22386fb8e93df94571808bd1e1b Mon Sep 17 00:00:00 2001 From: rebecca-liu66 <764276434@qq.com> Date: Thu, 24 Aug 2023 09:52:14 +0000 Subject: [PATCH 1/2] =?UTF-8?q?!392=20=E3=80=90spark-extension=E3=80=91use?= =?UTF-8?q?=20spark=20origin=20hash=20for=20special=20case=20*=20use=20spa?= =?UTF-8?q?rk=20original=20hash=20if=20hash=20key=20size=20is=20larger=20t?= =?UTF-8?q?han=206?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spark/sql/execution/ColumnarShuffleExchangeExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index f1b9d2115..3638f865f 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -303,7 +303,7 @@ object ColumnarShuffleExchangeExec extends Logging { newIter }, isOrderSensitive = isOrderSensitive) case h@HashPartitioning(expressions, numPartitions) => - if (containsRollUp(expressions)) { + if (containsRollUp(expressions) || expressions.length > 6) { rdd.mapPartitionsWithIndexInternal((_, cbIter) => { val partitionKeyExtractor: InternalRow => Any = { val projection = @@ -401,4 +401,4 @@ object ColumnarShuffleExchangeExec extends Logging { } } -} \ No newline at end of file +} -- Gitee From 006d3a8a6dd9ca9d9b17ecc27420e9a0e8c83cc3 Mon Sep 17 00:00:00 2001 From: guoxintong Date: Thu, 24 Aug 2023 15:14:24 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E3=80=90spark-extension=E3=80=91Deduplicat?= =?UTF-8?q?eRightSideOfLeftSemiJoin=20Rule=20implementation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../boostkit/spark/ColumnarPlugin.scala | 198 +++++++++++++++++- .../boostkit/spark/ColumnarPluginConfig.scala | 7 + .../aggregate/ExtendedAggUtils.scala | 99 +++++++++ 3 files changed, 302 insertions(+), 2 deletions(-) create mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/aggregate/ExtendedAggUtils.scala diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index fa64c4516..c4b5e87b2 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -21,18 +21,21 @@ import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.expressions.{Ascending, DynamicPruningSubquery, SortOrder} -import org.apache.spark.sql.catalyst.expressions.aggregate.Partial +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Partial} import org.apache.spark.sql.catalyst.optimizer.{DelayCartesianProduct, HeuristicJoinReorder} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowToOmniColumnarExec, _} import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ColumnarCustomShuffleReaderExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.aggregate.{DummyLogicalPlan, ExtendedAggUtils, HashAggregateExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.ColumnarBatchSupportUtil.checkColumnarBatchSupport import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalAggregation +import org.apache.spark.sql.catalyst.plans.LeftSemi +import org.apache.spark.sql.catalyst.plans.logical.Aggregate case class ColumnarPreOverrides() extends Rule[SparkPlan] with PredicateHelper{ val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf @@ -59,6 +62,9 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] with PredicateHelper{ val enableColumnarProjectFusion: Boolean = columnarConf.enableColumnarProjectFusion val enableColumnarTopNSort: Boolean = columnarConf.enableColumnarTopNSort val topNSortThreshold: Int = columnarConf.topNSortThreshold + val enableDedupLeftSemiJoin: Boolean = columnarConf.enableDedupLeftSemiJoin + val dedupLeftSemiJoinThreshold: Int = columnarConf.dedupLeftSemiJoinThreshold + def apply(plan: SparkPlan): SparkPlan = { replaceWithColumnarPlan(plan) } @@ -425,6 +431,194 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] with PredicateHelper{ plan.condition, left, right) + // DeduplicateRightSideOfLeftSemiJoin Rule works only for Spark 3.1. + case plan: SortMergeJoinExec if enableColumnarSortMergeJoin && enableDedupLeftSemiJoin => { + plan.joinType match { + case LeftSemi => { + if (plan.condition.isEmpty && plan.left.isInstanceOf[SortExec] && plan.right.isInstanceOf[SortExec] + && plan.right.asInstanceOf[SortExec].child.isInstanceOf[ShuffleExchangeExec]) { + val nextChild = plan.right.asInstanceOf[SortExec].child.asInstanceOf[ShuffleExchangeExec].child + if (nextChild.output.size >= dedupLeftSemiJoinThreshold) { + nextChild match { + case ProjectExec(_, BroadcastHashJoinExec(_, _, _, _, _, _, _, _)) => { + val left = replaceWithColumnarPlan(plan.left) + val val1 = replaceWithColumnarPlan(nextChild.asInstanceOf[ProjectExec]) + val partialAgg = PhysicalAggregation.unapply(Aggregate(nextChild.output, nextChild.output, + new DummyLogicalPlan)) match { + case Some((groupingExpressions, aggExpressions, resultExpressions, _)) + if aggExpressions.forall(expr => expr.isInstanceOf[AggregateExpression]) => + ExtendedAggUtils.planPartialAggregateWithoutDistinct( + ExtendedAggUtils.normalizeGroupingExpressions(groupingExpressions), + aggExpressions.map(_.asInstanceOf[AggregateExpression]), + resultExpressions, + val1) + } + + if (partialAgg.isInstanceOf[HashAggregateExec]) { + val newHashAgg = new ColumnarHashAggregateExec( + partialAgg.asInstanceOf[HashAggregateExec].requiredChildDistributionExpressions, + partialAgg.asInstanceOf[HashAggregateExec].groupingExpressions, + partialAgg.asInstanceOf[HashAggregateExec].aggregateExpressions, + partialAgg.asInstanceOf[HashAggregateExec].aggregateAttributes, + partialAgg.asInstanceOf[HashAggregateExec].initialInputBufferOffset, + partialAgg.asInstanceOf[HashAggregateExec].resultExpressions, + val1) + + val newShuffle = new ColumnarShuffleExchangeExec( + plan.right.asInstanceOf[SortExec].child.asInstanceOf[ShuffleExchangeExec].outputPartitioning, + newHashAgg, + plan.right.asInstanceOf[SortExec].child.asInstanceOf[ShuffleExchangeExec].shuffleOrigin + ) + val newSort = new ColumnarSortExec( + plan.right.asInstanceOf[SortExec].sortOrder, + plan.right.asInstanceOf[SortExec].global, + newShuffle, + plan.right.asInstanceOf[SortExec].testSpillFrequency) + ColumnarSortMergeJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + newSort, + plan.isSkewJoin) + } else { + logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") + if (enableSortMergeJoinFusion && plan.left.isInstanceOf[SortExec] + && plan.right.isInstanceOf[SortExec]) { + val left = replaceWithColumnarPlan(plan.left.asInstanceOf[SortExec].child) + val right = replaceWithColumnarPlan(plan.right.asInstanceOf[SortExec].child) + new ColumnarSortMergeJoinFusionExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } else { + val left = replaceWithColumnarPlan(plan.left) + val right = replaceWithColumnarPlan(plan.right) + new ColumnarSortMergeJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } + } + } + case _ => { + logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") + if (enableSortMergeJoinFusion && plan.left.isInstanceOf[SortExec] + && plan.right.isInstanceOf[SortExec]) { + val left = replaceWithColumnarPlan(plan.left.asInstanceOf[SortExec].child) + val right = replaceWithColumnarPlan(plan.right.asInstanceOf[SortExec].child) + new ColumnarSortMergeJoinFusionExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } else { + val left = replaceWithColumnarPlan(plan.left) + val right = replaceWithColumnarPlan(plan.right) + new ColumnarSortMergeJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } + } + } + } else { + logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") + if (enableSortMergeJoinFusion && plan.left.isInstanceOf[SortExec] && plan.right.isInstanceOf[SortExec]) { + val left = replaceWithColumnarPlan(plan.left.asInstanceOf[SortExec].child) + val right = replaceWithColumnarPlan(plan.right.asInstanceOf[SortExec].child) + new ColumnarSortMergeJoinFusionExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } else { + val left = replaceWithColumnarPlan(plan.left) + val right = replaceWithColumnarPlan(plan.right) + new ColumnarSortMergeJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } + } + } else { + logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") + if (enableSortMergeJoinFusion && plan.left.isInstanceOf[SortExec] && plan.right.isInstanceOf[SortExec]) { + val left = replaceWithColumnarPlan(plan.left.asInstanceOf[SortExec].child) + val right = replaceWithColumnarPlan(plan.right.asInstanceOf[SortExec].child) + new ColumnarSortMergeJoinFusionExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } else { + val left = replaceWithColumnarPlan(plan.left) + val right = replaceWithColumnarPlan(plan.right) + new ColumnarSortMergeJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } + } + } + case _ => { + logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") + if (enableSortMergeJoinFusion && plan.left.isInstanceOf[SortExec] && plan.right.isInstanceOf[SortExec]) { + val left = replaceWithColumnarPlan(plan.left.asInstanceOf[SortExec].child) + val right = replaceWithColumnarPlan(plan.right.asInstanceOf[SortExec].child) + new ColumnarSortMergeJoinFusionExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } else { + val left = replaceWithColumnarPlan(plan.left) + val right = replaceWithColumnarPlan(plan.right) + new ColumnarSortMergeJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.condition, + left, + right, + plan.isSkewJoin) + } + } + } + } case plan: SortMergeJoinExec if enableColumnarSortMergeJoin => logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") if (enableSortMergeJoinFusion && plan.left.isInstanceOf[SortExec] && plan.right.isInstanceOf[SortExec]) { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index d59978a88..fb45820be 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -204,6 +204,13 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val enableOmniExpCheck : Boolean = conf.getConfString("spark.omni.sql.omniExp.check", "true").toBoolean val enableColumnarProjectFusion : Boolean = conf.getConfString("spark.omni.sql.columnar.projectFusion", "true").toBoolean + + // enable or disable deduplicate the right side of left semi join + val enableDedupLeftSemiJoin: Boolean = + conf.getConfString("spark.omni.sql.columnar.dedupLeftSemiJoin", "true").toBoolean + + val dedupLeftSemiJoinThreshold: Int = + conf.getConfString("spark.omni.sql.columnar.dedupLeftSemiJoinThreshold", "3").toInt } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/aggregate/ExtendedAggUtils.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/aggregate/ExtendedAggUtils.scala new file mode 100644 index 000000000..c8ec22e0b --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/aggregate/ExtendedAggUtils.scala @@ -0,0 +1,99 @@ +package org.apache.spark.sql.execution.aggregate + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Partial} +import org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.execution.SparkPlan + +object ExtendedAggUtils { + def normalizeGroupingExpressions(groupingExpressions: Seq[NamedExpression]) = { + groupingExpressions.map { e => + NormalizeFloatingNumbers.normalize(e) match { + case n: NamedExpression => n + case other => Alias(other, e.name)(exprId = e.exprId) + } + } + } + + def planPartialAggregateWithoutDistinct( + groupingExpressions: Seq[NamedExpression], + aggregateExpressions: Seq[AggregateExpression], + resultExpressions: Seq[NamedExpression], + child: SparkPlan): SparkPlan = { + val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) + createAggregate( + requiredChildDistributionExpressions = None, + groupingExpressions = groupingExpressions.map(_.toAttribute), + aggregateExpressions = completeAggregateExpressions, + aggregateAttributes = completeAggregateExpressions.map(_.resultAttribute), + initialInputBufferOffset = groupingExpressions.length, + resultExpressions = resultExpressions, + child = child) + } + + private def createAggregate( + requiredChildDistributionExpressions: Option[Seq[Expression]] = None, + isStreaming: Boolean = false, + groupingExpressions: Seq[NamedExpression] = Nil, + aggregateExpressions: Seq[AggregateExpression] = Nil, + aggregateAttributes: Seq[Attribute] = Nil, + initialInputBufferOffset: Int = 0, + resultExpressions: Seq[NamedExpression] = Nil, + child: SparkPlan): SparkPlan = { + val useHash = HashAggregateExec.supportsAggregate( + aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) + if (useHash) { + HashAggregateExec( + requiredChildDistributionExpressions = requiredChildDistributionExpressions, + groupingExpressions = groupingExpressions, + aggregateExpressions = mayRemoveAggFilters(aggregateExpressions), + aggregateAttributes = aggregateAttributes, + initialInputBufferOffset = initialInputBufferOffset, + resultExpressions = resultExpressions, + child = child) + } else { + val objectHashEnabled = child.sqlContext.conf.useObjectHashAggregation + val useObjectHash = ObjectHashAggregateExec.supportsAggregate(aggregateExpressions) + + if (objectHashEnabled && useObjectHash) { + ObjectHashAggregateExec( + requiredChildDistributionExpressions = requiredChildDistributionExpressions, + groupingExpressions = groupingExpressions, + aggregateExpressions = mayRemoveAggFilters(aggregateExpressions), + aggregateAttributes = aggregateAttributes, + initialInputBufferOffset = initialInputBufferOffset, + resultExpressions = resultExpressions, + child = child) + } else { + SortAggregateExec( + requiredChildDistributionExpressions = requiredChildDistributionExpressions, + groupingExpressions = groupingExpressions, + aggregateExpressions = mayRemoveAggFilters(aggregateExpressions), + aggregateAttributes = aggregateAttributes, + initialInputBufferOffset = initialInputBufferOffset, + resultExpressions = resultExpressions, + child = child) + } + } + } + + private def mayRemoveAggFilters(exprs: Seq[AggregateExpression]): Seq[AggregateExpression] = { + exprs.map { ae => + if (ae.filter.isDefined) { + ae.mode match { + case Partial | Complete => ae + case _ => ae.copy(filter = None) + } + } else { + ae + } + } + } +} + +case class DummyLogicalPlan() extends LeafNode { + override def output: Seq[Attribute] = Nil + + override def computeStats(): Statistics = throw new UnsupportedOperationException +} \ No newline at end of file -- Gitee