Skip to content

[Feature] use mergeData for bolt NativeCelebornClient (#370)#590

Draft
afterincomparableyum wants to merge 1 commit into
bytedance:mainfrom
afterincomparableyum:celeborn-cpp-push-merged
Draft

[Feature] use mergeData for bolt NativeCelebornClient (#370)#590
afterincomparableyum wants to merge 1 commit into
bytedance:mainfrom
afterincomparableyum:celeborn-cpp-push-merged

Conversation

@afterincomparableyum

@afterincomparableyum afterincomparableyum commented May 25, 2026

Copy link
Copy Markdown

What problem does this PR solve?

Bolt's CelebornPartitionWriter routed every per partition payload through RssClient::pushPartitionData --> ShuffleClient::pushData, regardless of size. Celeborn's Java client (and Gluten's RSS integration) instead uses mergeData / pushMergedData for small payloads, letting the client coalesce many small batches into one network flush per worker. This significantly reduces RPC count and improves throughput for workloads with many small partitions.

The C++ NativeCelebornClient did not expose mergeData, so Bolt's native shuffle missed this optimization. This PR adds the missing path, gated by pushBufferMaxSize, and matches the Java client's behavior.

Issue Number: close #370

Type of Change

  • 🐛 Bug fix (non-breaking change which fixes an issue)
  • ✨ New feature (non-breaking change which adds functionality)
  • [] 🚀 Performance improvement (optimization)
  • ⚠️ Breaking change (fix or feature that would cause existing functionality to change)
  • 🔨 Refactoring (no logic changes)
  • 🔧 Build/CI or Infrastructure changes
  • 📝 Documentation only

Description

Production code

  • RssClient.h: added virtual mergePartitionData(partitionId, bytes, size)
    to the abstract base. Default implementation delegates to pushPartitionData, so non-Celeborn backends keep working unchanged.
  • NativeCelebornClient.{h,cpp}: implemented mergePartitionData by calling the upstream cpp-client's ShuffleClient::mergeData. Like pushPartitionData, it rejects calls after stop().
  • CelebornPartitionWriter.cpp: per partition payloads are now routed by size and a new celebornMergeEnabled option. Payloads <= pushBufferMaxSize go through mergePartitionData (coalesced), larger payloads bypass the merge buffer and go through pushPartitionData directly. Matches Gluten's Java client behavior.
  • Options.h: added PartitionWriterOptions::celebornMergeEnabled (default true). Lets callers opt out to the legacy push-only path for debugging or A/B comparison.

Dependency

  • Bumped the celeborn-cpp-client Conan recipe to a version that exposes ShuffleClient::mergeData and the updated CelebornInputStream / WorkerPartitionReader interfaces.

Tests

  • CelebornClientTest.cpp:
    • FakeShuffleClient now implements mergeData (counted separately) and pushMergedData, plus the excludeFailedFetchLocation override required by the cpp-client bump.
    • Added NativeCelebornClientTest.mergePartitionDataRoutesToMergeData verifies the C++ wrapper calls ShuffleClient::mergeData (not pushData) and rejects calls after stop().
    • Added RssClientTest.defaultMergeDelegatesToPush which verifies non-overriding backends fall through to pushPartitionData.
    • Added a custom main() that calls folly::init so the cpp-client's new folly-singleton use in WorkerPartitionReader::initAndCheck initializes correctly. Replaces the linked GTest::gtest_main.
  • ShuffleMatrixTest.cpp / ShuffleTestBase.{h,cpp}:
    • Added bool celebornMergeEnabled to ShuffleTestParam.
    • Matrix now sweeps {true, false} for kCeleborn writer cases (kLocal stays at the default), giving end-to-end parity coverage of the merge vs legacy push paths against a real Celeborn cluster.
  • tests/celeborn/scripts/run_e2e.sh: defaults updated

Release Note

  • NativeCelebornClient now uses mergeData for shuffle payloads <= pushBufferMaxSize, matching the Java/Gluten client and reducing per-partition RPC overhead. Gated by the new celebornMergeEnabled option (default true).

Release Note:

  Release Note:
  - NativeCelebornClient now uses mergeData for shuffle payloads <= pushBufferMaxSize, matching the Java/Gluten client and reducing per-partition RPC overhead. Gated by the new celebornMergeEnabled option (default true).

Checklist (For Author)

  • [X ] I have added/updated unit tests (ctest).
  • [X ] I have verified the code with local build (Release/Debug).
  • [X ] I have run clang-format / linters.
  • (Optional) I have run Sanitizers (ASAN/TSAN) locally for complex C++ changes.
  • No need to test or manual test.

@CLAassistant

CLAassistant commented May 25, 2026

Copy link
Copy Markdown

CLA assistant check
All committers have signed the CLA.

@afterincomparableyum

Copy link
Copy Markdown
Author

Note, I will remove the boolean flag that gates push merged and have it turned on by default, I will also update the Conan files, e2e files, etc with the latest commit on celeborn main once my PR to fix a bug on celeborn cpp client gets merged. Overall, the core logic of this PR is ready though

@afterincomparableyum

afterincomparableyum commented May 26, 2026

Copy link
Copy Markdown
Author

Sorry, I squashed the commits with my fork celeborn repo (hence why CI is failing now), let me update this draft PR with the squashed commit hash.

@afterincomparableyum afterincomparableyum force-pushed the celeborn-cpp-push-merged branch from 5b9206b to 7d4b1fc Compare May 26, 2026 02:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] use mergeData for bolt NativeCelebornClient

2 participants