perf(PartitionedVector): Improve partitioning when number of partitions is small#2115
Open
yingsu00 wants to merge 28 commits into
Open
perf(PartitionedVector): Improve partitioning when number of partitions is small#2115yingsu00 wants to merge 28 commits into
yingsu00 wants to merge 28 commits into
Conversation
This commit introduces `PartitionedVector` - a low-level execution abstraction that provides an in-place, partition-aware layout of a vector based on per-row partition IDs. 1. **In-place rearrangement**: Rearrange vector data in memory without creating multiple copies 2. **Buffer reuse**: Allow reuse of temporary buffers across multiple partitioning operations 3. **Minimal abstraction**: Similar to `DecodedVector`, focus on efficient execution rather than operator semantics 4. **Thread-unsafe by design**: Optimized for single-threaded execution contexts For more information please see IBM#1703 Alchemy-item: (ID = 1150) Introducing PartitionedVector commit 1/1 - 960f41b
Signed-off-by: Xin Zhang <xin-zhang2@ibm.com> Alchemy-item: (ID = 1167) Add PartitionedRowVector commit 1/1 - f2af427
PartitionedFlatVector::partition() and PartitionedRowVector::partition() called mutableRawNulls() unconditionally. mutableRawNulls() allocates a null buffer if one does not exist, causing mayHaveNulls() to return true for every vector after partitioning, even when the original had no nulls. Fix both sites to check rawNulls() first and only call mutableRawNulls() when a null buffer already exists. Add noNullBufferAllocatedForNullFreeFlat and noNullBufferAllocatedForNullFreeRow tests to PartitionedVectorTest to cover this case. # Conflicts: # velox/vector/PartitionedVector.cpp
This commit introduces PrestoIterativePartitioningSerializer, which buffers RowVectors across multiple append() calls, partitions rows in-place using PartitionedVector, and on flush() serializes each non-empty partition into a Presto wire-format IOBuf. The serializer has no dependency on velox_exec: it returns raw folly::IOBuf objects, leaving SerializedPage creation to the caller.
This commit introduces OptimizedPartitionedOutput, a PartitionedOutput operator backed by PrestoIterativePartitioningSerializer. Enabled via query config key "optimized_repartitioning" (default off). LocalPlanner selects it over the standard PartitionedOutput when the flag is set. TODO: replicateNullsAndAny is not yet supported and raises a user error.
…geBenchmark - Added normal vs optimized PartitionedOutput comparison by running each exchange case twice with kOptimizedPartitionedOutputEnabled=false/true. - Added per-mode benchmark names: - exchange<Case>_normalPartitionedOutput - exchange<Case>_optimizedPartitionedOutput in ExchangeBenchmark.cpp. - Refactored result printing into shared helpers and fixed output consistency in ExchangeBenchmark.cpp.
…mark Split the local partition exchange benchmark out of ExchangeBenchmark into its own executable and CMake target, while keeping the local benchmark logic and statistics reporting available in a dedicated binary.
…tioningSerializer
…fferManager listeners Pass an OutputBufferManager-backed listener factory into PrestoIterativePartitioningSerializer so the optimized path uses the same listener source as normal PartitionedOutput. Create per-partition listeners during flush, set the checksum bit only when a listener is present, and compute the page checksum only for PrestoOutputStreamListener instances. Also add tests that verify checksum headers are written and that the serialized pages round-trip through the standard deserializer.
…rting - add explicit simple-schema benchmark cases by type and column count - register normal and optimized runs as separate named benchmark cases - make `dictPct` apply per generated vector and recurse into nested types - generate benchmark input vectors directly with optional nulls - replace ad hoc flat input generation with explicit input specs - return `ExchangeRunStats` from benchmark runs and centralize query config - group printed results by dataset with normal vs. optimized stats
The new OptimizedVectorHasher is up to 2-3x faster than VectorHasher.
BatchMaker always allocates a null buffer even when no rows are null. This commit removes it so benchmarks measure the non-nullable path faithfully. Plus some minor format cleanups.
Currently Velox never passes CMAKE_BUILD_TYPE into Folly's own configure
step, while cmake_install only forwards arbitrary caller flags, so Folly
was not built in release mode when Velox is built in release mode. This
commit adds -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" to FOLLY_FLAGS, so
a release Velox dependency setup now builds release Folly on macOS and
Linux.
- Remove benchmarks with vector size 1000000 and only keep vector size 10000. - Add tinyint and smallint benchmarks.
Introduce OptimizedHashPartitionFunction as a faster drop-in replacement for HashPartitionFunction, gated behind a new query config flag optimized_hash_partition_function_enabled (default false). partition() is improved from 50% to over 200x. Add HashPartitionFunctionBase as a common base exposing numPartitions(), and createHashPartitionFunction() factories that select the implementation based on the flag. Thread QueryConfig* through PartitionFunctionSpec::create() and update callsites (LocalPartition, PartitionedOutput, MarkDistinct, RowNumber, Window, SubPartitionedSortWindowBuild, HiveConnector) to construct partition functions via the factory. Register CMake targets for the new test and benchmark binaries.
…ormance
The old in-place partitioning followed cycles of swaps,
values[cursor[targetPartition]++] swapped with values[offset];
targetPartition = partitions[cursor[targetPartition] - 1];
so the next partition id depended on the previous swap's store. The
loop ran at one row per cache-latency cycle and could not be
vectorized.
Replace it with a contiguous scatter,
output[cursor[partitions[i]]++] = input[i];
into a scratch buffer (ctx.tempBuffer), then memcpy the scratch back
into the caller's buffer. The per-row chain through partitions[] is
broken: rows targeting distinct partitions have independent address
computations and can issue in parallel. The scatter loop is unrolled
by 4 and marked __restrict to expose that ILP, and to let the compiler
hoist the partition / cursor loads above the stores instead of
assuming aliasing.
The bool path takes the same shape via scatterBitsInPlace: a
sequential bit scatter into a scratch byte buffer, then memcpy back.
The old per-bit swap had the same data dependency as the value swap.
Benchmarks (release, M1 macOS, noNulls, time/iter, partitionedVectorBenchmark):
case before after speedup
---- ------ ----- -------
BOOLEAN_1Cols_P64 37.18us 11.41us 3.26x
BOOLEAN_10Cols_P64 338.63us 82.52us 4.10x
INTEGER_1Cols_P64 30.30us 9.37us 3.23x
INTEGER_10Cols_P64 267.21us 59.06us 4.52x
BIGINT_1Cols_P64 31.08us 10.26us 3.03x
BIGINT_10Cols_P64 281.92us 70.45us 4.00x
HUGEINT_1Cols_P64 32.61us 12.76us 2.56x
HUGEINT_10Cols_P64 280.05us 97.02us 2.89x
VARCHAR_1Cols_P64 30.46us 12.93us 2.36x
VARCHAR_10Cols_P64 280.05us 98.45us 2.84x
Mixed_1Cols_P64 37.17us 11.45us 3.25x
Mixed_10Cols_P64 281.84us 70.66us 3.99x
Across the full sweep (BOOLEAN/INTEGER/BIGINT/HUGEINT/VARCHAR/Mixed
x {1, 10} cols x {4, 16, 64, 256} partitions), the new path is
1.58x to 5.06x faster than the in-place version, with the largest
wins on narrow types and 10-column inputs where the unrolled scalar
scatter delivers the most parallelism.
The previous commit copied ctx.tempBuffer back into the caller's buffer on every partition() call. That memcpy scales with element width and is the dominant per-call cost for wide fixed-width types — ~40% overhead for HUGEINT/VARCHAR on the 10-column benchmark. BOOLEAN was unaffected because its bit path already memcpys via scatterBitsInPlace. Remove the memcpy by switching ctx.tempBuffer back to a std::swap. Cross-pool ownership — the reason the swap was originally given up — is avoided by allocating ctx.tempBuffer from inputBuffer->pool() instead of the operator's 'pool' argument. The swapped-in buffer then lives in the same pool the caller's FlatVector already belonged to, so the operator's pool sees no foreign reference at teardown. ctx.tempBuffer is reused across columns of the same input (which share a pool); when the input's pool changes between calls, the cached buffer is reset so the next allocation comes from the new pool. The setSize before the swap is also added — without it, FlatVector::slice() can read a stale size from a reused tempBuffer and underflow numValues = size / sizeof(T) to 0 when a narrower column preceded a wider one. Benchmarks (release, M1 macOS, noNulls, time/iter, partitionedVectorBenchmark): 10-column input, 64 partitions: type memcpy swap+inputPool delta BOOLEAN 82.52us 82.35us -0.2% INTEGER 59.06us 54.06us -8.5% BIGINT 70.45us 57.79us -18.0% HUGEINT 97.02us 68.54us -29.4% VARCHAR 98.45us 70.00us -28.9% Mixed 70.66us 60.93us -13.8% 1-column input, 64 partitions: type memcpy swap+inputPool delta INTEGER 9.37us 8.91us -4.9% BIGINT 10.26us 9.03us -12.0% HUGEINT 12.76us 9.95us -22.0% VARCHAR 12.93us 10.23us -20.9% The win tracks the byte count per element: every extra byte per element is an extra byte the previous memcpy had to write. Tests: - velox_vector_test --gtest_filter='*Partition*' (44 passed) - velox_serializer_test_PrestoIterativePartitioningSerializerTest (56 passed) - velox_exec_test_OptimizedPartitionedOutputTest (301 passed)
…ns is small
For small partition counts the scatter loop
output[cursor[partitions[i]]++] = input[i]
bottlenecks on the per-partition cursor counters: consecutive rows tend
to share a partition, and each iteration's store of cursor[p] feeds the
next iteration's load. The unrolled scalar scatter has no ILP to
extract because the iterations alias on the same cursor slot.
PartitionedVector::create() now accepts virtual partition ids in
ctx.numVirtualPartitions: when set to numPartitions * fanout (a power
of two > 1), the per-row ids are interpreted as virtual ids in
[0, numPartitions * fanout). The scatter then operates over the
expanded id space and consecutive iterations target distinct cursor
slots, breaking the dependency chain. Logical partition order is
preserved because p's rows occupy the contiguous virtual range
[p*fanout, (p+1)*fanout); create() folds the virtual end offsets back
to user-facing logical end offsets that partitionAt(p) indexes into.
Wiring (OptimizedPartitionedOutput-only — OptimizedHashPartitionFunction
itself stays unaware of virtual partitioning):
- OptimizedPartitionedOutput picks a fanout at construction from
kVirtualPartitionTarget (256) and stores it in fanout_ /
numVirtualDestinations_, but only when the partition function is an
OptimizedHashPartitionFunction. Other partition functions
(RoundRobinPartitionFunction, etc.) keep fanout_ == 1.
- addInput() stripes the logical ids returned by partition() into
virtual ids in a single per-row pass:
partitions_[i] = logicalId * fanout_ + (i & (fanout_ - 1))
- PrestoIterativePartitioningSerializer's constructor takes a
numVirtualPartitions parameter (default 0 == numPartitions) and
forwards it via ctx.numVirtualPartitions in append() so
PartitionedVector::create() takes the virtual scatter path
automatically.
Benchmarks (release, M1 macOS, INTEGER 1-column, 10000 rows, time/iter,
partitionedVectorBenchmark):
partitions before after speedup
---------- ------ ----- -------
P4 21.77us 8.52us 2.56x
P16 11.11us 8.61us 1.29x
P64 8.95us 8.76us 1.02x
P256 8.67us 8.87us 0.98x
P1024 9.14us 9.29us 0.98x
The win concentrates at small partition counts where cursor contention
dominates. At larger counts fanout collapses to 1 and the path is
identical to before — the only overhead is a single branch in
PartitionedVector::create() that selects between the logical and
virtual scatter offset buffers.
c828742 to
1508442
Compare
cff9eff to
6944d36
Compare
6944d36 to
974bb09
Compare
974bb09 to
ee25fa7
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
For small partition counts the scatter loop
output[cursor[partitions[i]]++] = input[i]bottlenecks on the per-partition cursor counters: consecutive rows tend to share a partition, and each iteration's store of cursor[p] feeds the next iteration's load. The unrolled scalar scatter has no ILP to extract because the iterations alias on the same cursor slot.
PartitionedVector::create() now accepts virtual partition ids in ctx.numVirtualPartitions: when set to numPartitions * fanout (a power of two > 1), the per-row ids are interpreted as virtual ids in [0, numPartitions * fanout). The scatter then operates over the expanded id space and consecutive iterations target distinct cursor slots, breaking the dependency chain. Logical partition order is preserved because p's rows occupy the contiguous virtual range [p*fanout, (p+1)*fanout); create() folds the virtual end offsets back to user-facing logical end offsets that partitionAt(p) indexes into.
Wiring (OptimizedPartitionedOutput-only — OptimizedHashPartitionFunction itself stays unaware of virtual partitioning):
Benchmarks (release, M1 macOS, INTEGER 1-column, 10000 rows, time/iter, partitionedVectorBenchmark):
partitions before after speedup
The win concentrates at small partition counts where cursor contention dominates. At larger counts fanout collapses to 1 and the path is identical to before — the only overhead is a single branch in PartitionedVector::create() that selects between the logical and virtual scatter offset buffers.
Tests: