feat(sum-subop): Spark BIGINT sum AArch64 SVE on HashAgg#563
Open
LeiRui wants to merge 1 commit into
Open
Conversation
9cab253 to
d0e469f
Compare
Add SumAggregateSparkInt64SubOp (adapter updateGroupsFromDecoded, SVE kernel sveHashAggBatchUpdateGroupSums), DecodedVector hashAgg* layout APIs, env kill switch BOLT_SPARK_SUM_INT64_USE_SUBOP, and unit tests (DuckDB parity, env-off parity, SubOp vs Base, nullable gate, null constant, hashAgg layout modes). Co-authored-by: Old-Li883 <lichenhao9@huawei.com> Co-authored-by: helloxteen <zhangxin440@h-partners.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
Spark
sum(bigint)on HashAgg can spend significant time in per-group scalar updates when batches use nullable encodings and the aggregate table tracks null groups. This PR adds an AArch64 SVE batch path for that update loop, with fallback toSumAggregateBasefor other shapes and platforms.Issue Number: N/A
Type of Change
Description
What
SumAggregateSparkInt64SubOpfor Spark non-decimalsum(bigint) → bigint.updateGroupsFromDecodedmapsDecodedVectorhashAgg*andSelectivityVectorinto flat buffers, then callssveHashAggBatchUpdateGroupSums(SVE kernel in aarch64-only TU,-march=armv8-a+sve).DecodedVector:hashAggNullsLayoutMode,hashAggIndicesLayoutMode,hashAggMutable*.registerSum(SumAggregate.cpp): forBIGINT(non-decimal), the factory installsSumAggregateSparkInt64SubOpunless the process env opts out (see below).Env rollback (
BOLT_SPARK_SUM_INT64_USE_SUBOP)SumAggregateSparkInt64SubOp(default)0SumAggregate<int64_t, int64_t, int64_t>(SumAggregateBase)false,no, oroff(ASCII, case-insensitive)0Executor example (Gluten / Spark):
spark.executorEnv.BOLT_SPARK_SUM_INT64_USE_SUBOP=0.Registration
Runtime call chain (
addRawInputandaddIntermediateResultsare symmetric)mayPushdown &&top-level Lazy → BasenumNulls_ && Overflow? No → Base (Overflowis true when Spark sum registers withsetSumAggOverflowCheckFlag(false)— overflow check off, plain+=)mayPushdown &&inner Lazy → hook + load, returnmayHaveNulls()? No → BaseupdateGroupsFromDecoded→sveHashAggBatchUpdateGroupSums, return (no Base for this batch)Flowchart (mermaid — click to expand)
flowchart TD A[addRawInput / addIntermediateResults] --> B{mayPushdown && top-level Lazy?} B -->|yes| C[Base] B -->|no| D{numNulls_ && Overflow?} D -->|no| C D -->|yes| E[decode] E --> F{mayPushdown && inner Lazy?} F -->|yes| G[hook + load] F -->|no| H{mayHaveNulls?} H -->|no| C H -->|yes| I{Linux aarch64 && SVE?} I -->|no| C I -->|yes| J[updateGroupsFromDecoded → kernel]Source files
File roles (register + SubOp + Base):

Legend: green = new in this PR (SubOp
.h/.cpp/Sve.cpp); yellow = modified (SumAggregate.cpp); gray = existing unchanged (SumAggregateBase).SumAggregate.cppregisterSumfactory: env check → SubOp orSumAggregateBasefor Spark non-decimalBIGINT.SumAggregateSparkInt64SubOp.haddRawInput/addIntermediateResultsoverrides; privateupdateGroupsFromDecoded(declared on all platforms).SumAggregateSparkInt64SubOp.cpp!(numNulls_ && Overflow)); (B) decode + lazy hook (mayPushdown+ innerLAZY); (C) decode + SVE glue whenmayHaveNulls()and aarch64 runtime SVE probe passes; else (D) Base after decode. Steps (A–D) match the numbered call chain above. SVE probe: Linuxgetauxval(AT_HWCAP) & HWCAP_SVE; non-Linux aarch64 → (D). x86 never calls glue (#if __aarch64__).updateGroupsFromDecodedSumAggregateSparkInt64SubOpSve.cppon aarch64. Caller has already decoded. ReadshashAgg*layout/buffers and batch row mask; callssveHashAggBatchUpdateGroupSumswithgroupsandnullByte_/nullMask_/numNulls_. Returnstrue→ dispatch skipsBasefor this batch. On other ISAs,SumAggregateSparkInt64SubOp.cppprovides a stub that always returnsfalse(dispatch never calls it on x86).sveHashAggBatchUpdateGroupSumsinSumAggregateSparkInt64SubOpSve.cpp+=into the int64 accumulator and clears group-null flags when updating a previously null group. aarch64-only TU (-march=armv8-a+sve); not linked on non-aarch64.Tests
SumAggregationTest:sumInt64SubOpParity,sumInt64SubOpEnvOffParity,sumInt64SubOpNullableSveGate,sumInt64SubOpSveMatchesBase,sumInt64SubOpNullConstMatchesBaseDecodedVectorTest.hashAggLayoutModesPerformance Impact
Click to view Benchmark Results
Test the performance of bolt-main (commitID=e1745f71a5dc8985e6a5b872a84ba6253013fb7b) vs bolt-pr (i.e., this pr) for the following sql on TPC-DS 1T dataset:
Release Note
Please describe the changes in this PR
Release Note:
Checklist (For Author)
Breaking Changes