Skip to content

feat(PartitionedOutput): Add nested RowVector support in the serializer#2045

Open
yingsu00 wants to merge 25 commits into
IBM:optimized_partitionedoutputfrom
yingsu00:NestedRowVec
Open

feat(PartitionedOutput): Add nested RowVector support in the serializer#2045
yingsu00 wants to merge 25 commits into
IBM:optimized_partitionedoutputfrom
yingsu00:NestedRowVec

Conversation

@yingsu00

Copy link
Copy Markdown
Collaborator

No description provided.

yingsu00 and others added 19 commits April 17, 2026 13:12
This commit introduces `PartitionedVector` - a low-level execution
abstraction that provides an in-place, partition-aware layout of a
vector based on per-row partition IDs.

1. **In-place rearrangement**: Rearrange vector data in memory without
   creating multiple copies
2. **Buffer reuse**: Allow reuse of temporary buffers across multiple
   partitioning operations
3. **Minimal abstraction**: Similar to `DecodedVector`, focus on
   efficient execution rather than operator semantics
4. **Thread-unsafe by design**: Optimized for single-threaded execution
   contexts

For more information please see IBM#1703

Alchemy-item: (ID = 1150) Introducing PartitionedVector commit 1/1 - 960f41b
Signed-off-by: Xin Zhang <xin-zhang2@ibm.com>

Alchemy-item: (ID = 1167) Add PartitionedRowVector commit 1/1 - f2af427
PartitionedFlatVector::partition() and PartitionedRowVector::partition()
called mutableRawNulls() unconditionally. mutableRawNulls() allocates a
null buffer if one does not exist, causing mayHaveNulls() to return true
for every vector after partitioning, even when the original had no nulls.

Fix both sites to check rawNulls() first and only call mutableRawNulls()
when a null buffer already exists.

Add noNullBufferAllocatedForNullFreeFlat and
noNullBufferAllocatedForNullFreeRow tests to PartitionedVectorTest to
cover this case.

# Conflicts:
#	velox/vector/PartitionedVector.cpp
This commit introduces PrestoIterativePartitioningSerializer, which
buffers RowVectors across multiple append() calls, partitions rows
in-place using PartitionedVector, and on flush() serializes each
non-empty partition into a Presto wire-format IOBuf. The serializer has
no dependency on velox_exec: it returns raw folly::IOBuf objects,
leaving SerializedPage creation to the caller.
This commit introduces OptimizedPartitionedOutput, a PartitionedOutput
operator backed by PrestoIterativePartitioningSerializer. Enabled via query
config key "optimized_repartitioning" (default off). LocalPlanner
selects it over the standard PartitionedOutput when the flag is set.

TODO: replicateNullsAndAny is not yet supported and raises a user error.
…geBenchmark

- Added normal vs optimized PartitionedOutput comparison by running each
  exchange case twice with kOptimizedPartitionedOutputEnabled=false/true.
- Added per-mode benchmark names:
  - exchange<Case>_normalPartitionedOutput
  - exchange<Case>_optimizedPartitionedOutput in ExchangeBenchmark.cpp.
- Refactored result printing into shared helpers and fixed output
  consistency in ExchangeBenchmark.cpp.
…mark

Split the local partition exchange benchmark out of ExchangeBenchmark
into its own executable and CMake target, while keeping the local
benchmark logic and statistics reporting available in a dedicated binary.
…fferManager listeners

Pass an OutputBufferManager-backed listener factory into
PrestoIterativePartitioningSerializer so the optimized path uses the same
listener source as normal PartitionedOutput. Create per-partition listeners
during flush, set the checksum bit only when a listener is present, and
compute the page checksum only for PrestoOutputStreamListener instances.
Also add tests that verify checksum headers are written and that the
serialized pages round-trip through the standard deserializer.
…rting

- add explicit simple-schema benchmark cases by type and column count
- register normal and optimized runs as separate named benchmark cases
- make `dictPct` apply per generated vector and recurse into nested types
- generate benchmark input vectors directly with optional nulls
- replace ad hoc flat input generation with explicit input specs
- return `ExchangeRunStats` from benchmark runs and centralize query config
- group printed results by dataset with normal vs. optimized stats
The new OptimizedVectorHasher is up to 2-3x faster than VectorHasher.
BatchMaker always allocates a null buffer even when no rows are null.
This commit removes it so benchmarks measure the non-nullable path
faithfully. Plus some minor format cleanups.
Currently Velox never passes CMAKE_BUILD_TYPE into Folly's own configure
step, while cmake_install only forwards arbitrary caller flags, so Folly
was not built in release mode when Velox is built in release mode. This
commit adds -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" to FOLLY_FLAGS, so
a release Velox dependency setup now builds release Folly on macOS and
Linux.
@yingsu00 yingsu00 requested a review from xin-zhang2 May 19, 2026 05:29
@yingsu00 yingsu00 self-assigned this May 19, 2026
- Remove benchmarks with vector size 1000000 and only keep vector size
  10000.
- Add tinyint and smallint benchmarks.
@libianoss

Copy link
Copy Markdown
Collaborator

@xin-zhang2 xin have you reviewed the PR?

auto n = std::min<vector_size_t>(numRowsPerChunk, numRows);
outputStreams[p]->write(chunkBytes, n * sizeof(T));
numRows -= n;
if (!parentNulls) {

@xin-zhang2 xin-zhang2 May 22, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The surviving rows are already reflected in numRows, so there is no need to check parentNulls. We can simply use

        while (numRows > 0) {
          auto n = std::min<vector_size_t>(numRowsPerChunk, numRows);
          outputStreams[p]->write(chunkBytes, n * sizeof(T));
          numRows -= n;
        }

regareless of whether parentNulls is null or not.
Then parentNulls can be removed from the function parameters.

childContext.parentNulls[vectorIndex] = nulls;

vector_size_t begin = 0;
for (uint32_t p = 0; p < numPartitions_; ++p) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (uint32_t p : nonEmptyPartitions)

vector_size_t begin = 0;
for (uint32_t p = 0; p < numPartitions_; ++p) {
const vector_size_t end = partitionOffsets[p];
if (outputStreams[p] != nullptr && end > begin) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputStreams[p] != nullptr is unnecessary here.

std::vector<vector_size_t> rowCounts;
std::vector<BufferPtr> parentNulls;
std::vector<std::vector<vector_size_t>> parentNullCounts;
bool hasParentNulls{false};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks hasParentNulls is redudant.
parentNulls is always resized in the code, so parentNulls[vectorIndex] == nullptr is enough to check whether the parent has nulls.

int32_t end,
uint64_t* target,
uint64_t targetBitOffset) {
uint64_t outBit = targetBitOffset;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a few fast paths here. The following change shows a slight improvement in the benchmark for with-null cases.

const uint64_t numBits = end - begin;
  if (mask == nullptr) {
    if (source == nullptr) {
      bits::fillBits(target, targetBitOffset, targetBitOffset + numBits, bits::kNotNull);
    } else {
      bits::copyBits(source, begin, target, targetBitOffset, numBits);
    }
    return static_cast<int32_t>(numBits);
  }

  if (source == nullptr) {
    const int32_t count = bits::countBits(mask, begin, end);
    bits::fillBits(target, targetBitOffset, targetBitOffset + count, bits::kNotNull);
    return count;
  }

  uint64_t outBit = targetBitOffset;
  bits::forEachWord(begin, end, [&](int32_t index, uint64_t wordMask) {
    const uint64_t selected = mask[index] & wordMask;
    if (selected == 0) {
      return;
    }
    const uint64_t packed =
        bits::extractBits<uint64_t>(source ? source[index] : ~0ULL, selected);
    const uint32_t count = __builtin_popcountll(selected);
    appendLowBits(target, outBit, packed, count);
    outBit += count;
  });
  return static_cast<int32_t>(outBit - targetBitOffset);

childVectors.reserve(numVectors);
for (const auto& pv : partitionedVectors) {
childVectors.push_back(
std::dynamic_pointer_cast<PartitionedRowVector>(pv)->childAt(col));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to consider the encoding before casting. A ROW type vector can be encoded as CONSTANT, in which case pv would be a PartitionedConstantVector instead of PartitionedRowVector.
The same issue also applies at line 210.

// parent-live rows, which the parent recorded in context.rowCounts. Only
// partitions that have nulls at this level need a compacted bitmap; the
// rest use sequential offsets and no null section.
std::vector<BufferPtr> nullsByPartition(numPartitions_);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it make sense to split the footer writing into separate functions? Since different encodings may share the same wire-format pieces, such as offsets and null bitmaps, this logic might be reusable.

xin-zhang2 and others added 5 commits May 27, 2026 12:13
Introduce OptimizedHashPartitionFunction as a faster drop-in replacement
for HashPartitionFunction, gated behind a new query config flag
optimized_hash_partition_function_enabled (default false). partition()
is improved from 50% to over 200x.

Add HashPartitionFunctionBase as a common base exposing numPartitions(),
and createHashPartitionFunction() factories that select the
implementation based on the flag. Thread QueryConfig* through
PartitionFunctionSpec::create() and update callsites (LocalPartition,
PartitionedOutput, MarkDistinct, RowNumber, Window,
SubPartitionedSortWindowBuild, HiveConnector) to construct partition
functions via the factory.

Register CMake targets for the new test and benchmark binaries.
Reimplement PrestoIterativePartitioningSerializer::flushRowColumn around
bulk bit operations and fix two coupled null-handling defects in the
simple-column path. The three changes are interdependent: correct null
serialization requires all of them, so they land together.

flushRowColumn:
- AND the incoming parentNulls into this level's own nulls in place with
  bits::andBits, reusing the vector's own buffer (or sharing the parent's
  when there are no own nulls) instead of allocating a per-batch buffer.
- Count parent-live and live rows per partition with bits::countBits to
  drive child rowCounts/parentNullCounts and the footer numRows.
- Compact the live bitmap across batch boundaries with bits::extractBits
  via a branchless compactBits/appendLowBits helper; offsets become a
  prefix sum over the compacted bitmap, with a sequential fast path for
  null-free partitions.

flushFlatValues: stop calling mutableRawNulls() (which materialized an
all-not-null buffer and masked the no-null fast path, then dereferenced a
null parentNulls in andBits), and iterate each partition's own
[lastOffset, offset) range. Leaf flushers now take parentNulls directly;
flushSingleConstantVector also takes the per-partition parent-null counts.

flushNulls: replace the stub that always wrote hasNulls=0 with a real
implementation that compacts per-batch validity over parentNulls into a
per-partition bitmap sized to context.rowCounts, handling FLAT and
CONSTANT (including null constants) for both top-level and nested columns.
Removes the now-unused flushSimpleVectorNulls/flushConstantVectorNulls.

Add multiLevelNullsMultiAppendMultiPartition covering nulls at multiple
nested ROW levels across several appends and partitions. All 59 tests in
PrestoIterativePartitioningSerializerTest pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@ethanyzhang ethanyzhang force-pushed the optimized_partitionedoutput branch from cff9eff to 6944d36 Compare June 11, 2026 23:04
@ethanyzhang ethanyzhang force-pushed the optimized_partitionedoutput branch from 6944d36 to 974bb09 Compare June 11, 2026 23:17
@xin-zhang2 xin-zhang2 force-pushed the optimized_partitionedoutput branch from 974bb09 to ee25fa7 Compare June 25, 2026 20:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants