Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 85 additions & 8 deletions bolt/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,17 +378,15 @@ void HashBuild::setupSpiller(
HashBitRange(startBit, startBit + spillConfig.joinRepartitionBits);
}

bool firstLevelSpill = (hashBits.begin() == spillConfig.startPartitionBit);
const bool useRangePartition = canUseRangePartition(hashBits);
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kHashJoinBuild,
table_->rows(),
tableType_,
std::move(hashBits),
&spillConfig,
spillConfig.maxFileSize,
(firstLevelSpill && supportSkewPartition() &&
joinBridge_->numBuilders() == 1 &&
operatorCtx_->task()->numDrivers(operatorCtx_->driverCtx()) == 1));
useRangePartition);
spiller_->setSpillConfig(&spillConfig);
spiller_->setSkewThreshold(
skewFileSizeRatioThreshold_, skewRowCountRatioThreshold_);
Expand Down Expand Up @@ -1229,20 +1227,99 @@ bool HashBuild::finishHashBuild() {
}
}

auto tableSpillFunc =
spillPartitions.empty() ? createTableSpillFunc() : nullptr;
// Release the unused reservation before publishing the built table to probe.
// This drops the temporary reservation while keeping the actual hash table
// memory as used reservation.
pool()->release();

if (joinBridge_->setHashTable(
std::move(table_),
std::move(spillPartitions),
joinHasNullKeys_,
&offsetTojoinBits_)) {
&offsetTojoinBits_,
std::move(tableSpillFunc))) {
intermediateStateCleared_ = true;
spillGroup_->restart();
}
// Release the unused memory reservation since we have finished the merged
// table build.
pool()->release();
return true;
}

HashJoinBridge::HashJoinTableSpillFunc HashBuild::createTableSpillFunc() {
if (!spillEnabled() || isInputFromSpill() || spiller_ == nullptr ||
exceededMaxSpillLevelLimit_) {
VLOG(1) << name()
<< " createTableSpillFunc disabled: spillEnabled=" << spillEnabled()
<< ", isInputFromSpill=" << isInputFromSpill()
<< ", hasSpiller=" << (spiller_ != nullptr)
<< ", exceededMaxSpillLevelLimit=" << exceededMaxSpillLevelLimit_;
return nullptr;
}

auto tableSpillConfig = std::make_shared<common::SpillConfig>(*spillConfig());
auto tableSpillSpiller = std::make_shared<Spiller>(
Spiller::Type::kHashJoinBuild,
table_->rows(),
tableType_,
spiller_->hashBits(),
tableSpillConfig.get(),
tableSpillConfig->maxFileSize,
canUseRangePartition(spiller_->hashBits()));
tableSpillSpiller->setSpillConfig(tableSpillConfig.get());
tableSpillSpiller->setSkewThreshold(
skewFileSizeRatioThreshold_, skewRowCountRatioThreshold_);
if (hybridJoin_ && table_->hybridData()) {
tableSpillSpiller->setHybridMode(true, table_->hybridData());
}
auto operatorName = name();
tableSpillStats_ =
std::make_shared<folly::Synchronized<common::SpillStats>>();
auto tableSpillStats = tableSpillStats_;

return [tableSpillConfig, tableSpillSpiller, tableSpillStats, operatorName](
std::shared_ptr<BaseHashTable> table) {
SpillPartitionSet spillPartitions;
tableSpillSpiller->spill();
tableSpillSpiller->finishSpill(spillPartitions);
auto iter = spillPartitions.begin();
while (iter != spillPartitions.end()) {
if (iter->second->numFiles() > 0) {
++iter;
} else {
iter = spillPartitions.erase(iter);
}
}
const auto spillStats = tableSpillSpiller->stats();
BOLT_CHECK_EQ(spillStats.spillSortTimeUs, 0);
*tableSpillStats->wlock() += spillStats;
LOG(INFO) << operatorName << " bridge table spill callback: finish, "
<< "spilledPartitions=" << spillPartitions.size()
<< ", spilledBytes=" << succinctBytes(spillStats.spilledBytes)
<< ", spilledFiles=" << spillStats.spilledFiles
<< ", spillTotalTimeUs=" << spillStats.spillTotalTimeUs;
return spillPartitions;
};
}

OperatorStats HashBuild::stats(bool clear) {
auto stats = Operator::stats(clear);
if (tableSpillStats_ != nullptr) {
auto lockedTableSpillStats = tableSpillStats_->wlock();
stats.addSpillStats(*lockedTableSpillStats);
if (clear) {
lockedTableSpillStats->reset();
}
}
return stats;
}

bool HashBuild::canUseRangePartition(const HashBitRange& hashBits) const {
return hashBits.begin() == spillConfig()->startPartitionBit &&
supportSkewPartition() && joinBridge_->numBuilders() == 1 &&
operatorCtx_->task()->numDrivers(operatorCtx_->driverCtx()) == 1;
}

void HashBuild::recordSpillStats() {
recordSpillStats(spiller_.get());
}
Expand Down
11 changes: 11 additions & 0 deletions bolt/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class HashBuild final : public Operator {

bool isFinished() override;

using Operator::stats;
OperatorStats stats(bool clear) override;

void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

Expand Down Expand Up @@ -147,6 +150,10 @@ class HashBuild final : public Operator {
// barrier for the next round of hash table build operation if it needs.
bool finishHashBuild();

// Creates a spill callback for the finished build table held by the join
// bridge before probe starts.
HashJoinBridge::HashJoinTableSpillFunc createTableSpillFunc();

// [Morsel-driven] same as HashProbe::skipProbeOnEmptyBuild
bool skipProbeOnEmptyBuild() const {
return isInnerJoin(joinType_) || isLeftSemiFilterJoin(joinType_) ||
Expand Down Expand Up @@ -174,6 +181,8 @@ class HashBuild final : public Operator {
void recordSpillStats(Spiller* spiller);
void recordSpillReadStats();

bool canUseRangePartition(const HashBitRange& hashBits) const;

// Indicates if the input is read from spill data or not.
bool isInputFromSpill() const;

Expand Down Expand Up @@ -449,6 +458,8 @@ class HashBuild final : public Operator {
int32_t maxHashTableBucketCount_{std::numeric_limits<int32_t>::max()};
std::shared_ptr<RowFormatInfo> rowFormatInfo_{nullptr};

std::shared_ptr<folly::Synchronized<common::SpillStats>> tableSpillStats_;

// For hybrid join
bool hybridJoin_{false};
bool scatteredMode_{false}; // Use scattered (non-coalesced) mode
Expand Down
Loading
Loading