feat: optimize row based shuffle with DenseRow#637
Open
zhangxffff wants to merge 3 commits into
Open
Conversation
| const uint8_t*& in, | ||
| const uint8_t* end, | ||
| bool& isNull, | ||
| int64_t& value) { |
Collaborator
There was a problem hiding this comment.
check bounds(in) before accessing *in
if (FOLLY_UNLIKELY(in >= end)) { return false}
| forEachLivePos(out, r, [&](uint32_t p) { | ||
| uint64_t v{0}; | ||
| BOLT_USER_CHECK( | ||
| readVarint(c.cur, c.end, v), |
Collaborator
There was a problem hiding this comment.
need to check c.cur < c.end?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
Row-based shuffle serializes each row with CompactRow, which pads
values to fixed widths, carries a per-row null bitmap, and stores integers at
full width. For the common Spark case of small / narrow / nullable integer
columns this wastes a large fraction of shuffle bytes.
This PR adds DenseRow, a no-waste variable-length row format, and wires it
into the Spark row-based shuffle path behind a pluggable
RowFormatoption.The existing CompactRow path is unchanged and remains the default.
Issue Number: close #540
Type of Change
Description
DenseRow codec (
bolt/row/dense/) — a column-batched serializer, sibling toCompactRow/UnsafeRowFast, that processes all rows at once. The on-wirelayout is the "dense" form:
Implementation is split into:
IntVarint.h— varint/zigzag + nullable-int wire codec, with a SIMD(xsimd/AVX2) batched size pass and a BMI2 (
pdep) varint writer for longvalues; scalar fast path otherwise.
DenseRowScalar*.cpp— flat fast path (identity / constant / dictionarydecoded inputs).
DenseRowGeneral*.cpp— general path for arbitrary nested typed data via a two-pass(size, then write) slot-tree walk shared by both passes.
DenseRow.{h,cpp}— public API mirroringCompactRow(rowSizes()/serialize()/deserialize()), grammar documented at the top of the .cpp.Shuffle integration (
bolt/shuffle/sparksql/) — newrow::RowFormatenum({DENSE, COMPACT}) added to
ShuffleWriterOptions/ShuffleReaderOptions(default
COMPACT). The columnar↔row converters and the reader/writer dispatchon it; the wire framing (
rowSize | rowData) is unchanged, so only the row bodyencoding differs. Writer and reader must be configured with the same format.
Tests —
DenseRowTest.cpp(17 cases: scalars, wide/mixed rows, bigint &hugeint edges, arrays / nested arrays / maps, empty containers, dictionary &
constant encoded inputs, non-contiguous serialize offsets, and golden-byte wire
assertions). The shuffle matrix test is extended to exercise the DENSE format.
Benchmark —
DenseRowSerializeBenchmark.cppcompares DenseRow againstUnsafeRow/CompactRow for serialize, deserialize, and serialized size across 20
cases
Benchmark Result
— Intel i7-12700KF,
-O3AVX2+BMI2, VectorFuzzervectorSize=1000,pinned core. All times are ns/row; size is bytes/row. Compared against the
two existing Spark row formats (UnsafeRow, CompactRow).
Highlights
bigint < 2⁸: 9→1),5× on small-int arrays/nested arrays. On par for full-random int64 and wide strings.
(
10 scalars: 89→32 ns), faster on all flat scalars.(CompactRow would do buckcopy for array/map) — smaller payload for higher encode cost.
Serialized size (bytes/row)
Note
We use the columnar CompactRow serde interface in this benchmark. In row-based shuffle, the row-based CompactRow serde interface is still used, which means CompactRow would see even greater performance improvement in real row-based shuffle scenarios. The array/map use bulk copy to serde in CompactRow, so it is fast than DenseRow for these type.
Performance Impact
No Impact: This change does not affect the critical path (e.g., build system, doc, error handling).
Positive Impact: I have run benchmarks.
Click to view Benchmark Results
Negative Impact: Explained below (e.g., trade-off for correctness).
Release Note
Please describe the changes in this PR
Release Note:
Checklist (For Author)
Breaking Changes
No
Yes (Description: ...)
Click to view Breaking Changes