Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bolt/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,16 @@ StopReason Driver::runInternal(
"bytedance::bolt::exec::Driver::runInternal::addInput",
nextOp);

// Lazy-complex input dispatch β€” see Operator::inputLazyModes().
// Runs inside the timer above so the cost lands in
// nextOp's addInputTiming.
if (LazyComplexCodec::activeCodec() != nullptr) {
intermediateResult = applyLazyInputModes(
intermediateResult,
nextOp->inputLazyModes(),
nextOp->pool());
}

CALL_OPERATOR(
nextOp->addInput(intermediateResult),
nextOp,
Expand Down
23 changes: 15 additions & 8 deletions bolt/exec/FilterProject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "bolt/core/Expressions.h"
#include "bolt/expression/Expr.h"
#include "bolt/expression/FieldReference.h"
#include "bolt/vector/LazyComplexCodec.h"
#include "bolt/vector/VectorEncoding.h"
namespace bytedance::bolt::exec {
namespace {
Expand Down Expand Up @@ -145,21 +146,27 @@ void FilterProject::initialize() {
numExprs_ = allExprs.size();
exprs_ = makeExprSetFromFlag(std::move(allExprs), operatorCtx_->execCtx());

if (numExprs_ > 0 && !identityProjections_.empty()) {
const auto inputType = project_ ? project_->sources()[0]->outputType()
: filter_->sources()[0]->outputType();
std::unordered_set<uint32_t> distinctFieldIndices;
const auto inputType = project_ ? project_->sources()[0]->outputType()
: filter_->sources()[0]->outputType();
std::unordered_set<uint32_t> distinctFieldIndices;
if (numExprs_ > 0) {
for (auto field : exprs_->distinctFields()) {
auto fieldIndex = inputType->getChildIdx(field->name());
distinctFieldIndices.insert(fieldIndex);
}
for (auto identityField : identityProjections_) {
if (distinctFieldIndices.find(identityField.inputChannel) !=
distinctFieldIndices.end()) {
multiplyReferencedFieldIndices_.push_back(identityField.inputChannel);
if (!identityProjections_.empty()) {
for (auto identityField : identityProjections_) {
if (distinctFieldIndices.find(identityField.inputChannel) !=
distinctFieldIndices.end()) {
multiplyReferencedFieldIndices_.push_back(identityField.inputChannel);
}
}
}
}
inputLazyModes_ = makeInputLazyModes(
inputType->size(),
{distinctFieldIndices.begin(), distinctFieldIndices.end()},
InputLazyMode::kForceDecoded);
filter_.reset();
project_.reset();
}
Expand Down
4 changes: 4 additions & 0 deletions bolt/exec/Generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "bolt/functions/prestosql/json/JsonExtractor.h"
#include "bolt/vector/BaseVector.h"
#include "bolt/vector/FlatVector.h"
#include "bolt/vector/LazyComplexCodec.h"
namespace bytedance::bolt::exec {

Generator::Generator(
Expand Down Expand Up @@ -53,6 +54,9 @@ Generator::Generator(
identityProjections_.emplace_back(
inputType->getChildIdx(repCol->name()), outputChannel++);
}

inputLazyModes_ = makeInputLazyModes(
inputType->size(), generateChannels_, InputLazyMode::kForceDecoded);
}

void Generator::initialize() {
Expand Down
1 change: 1 addition & 0 deletions bolt/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ void HashAggregation::initialize() {
BOLT_CHECK(pool()->trackUsage());

auto inputType = aggregationNode_->sources()[0]->outputType();
inputLazyModes_.assign(inputType->size(), InputLazyMode::kForceDecoded);

auto hashers =
createVectorHashers(inputType, aggregationNode_->groupingKeys());
Expand Down
7 changes: 7 additions & 0 deletions bolt/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ void HashBuild::setupTable() {
lookup_->reset(1);
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;

{
std::vector<column_index_t> channels = keyChannels_;
channels.insert(
channels.end(), dependentChannels_.begin(), dependentChannels_.end());
inputLazyModes_ = table_->rows()->inputLazyModes(channels);
}

if (hybridJoin_) {
table_->hybridData()->setId(static_cast<uint8_t>(driverId_));
// Initialize allContainers_ with itself so spilling can work before table
Expand Down
18 changes: 13 additions & 5 deletions bolt/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "bolt/exec/Task.h"
#include "bolt/expression/FieldReference.h"
#include "bolt/vector/BaseVector.h"
#include "bolt/vector/LazyComplexCodec.h"
namespace bytedance::bolt::exec {

namespace {
Expand Down Expand Up @@ -140,11 +141,18 @@ void extractColumns(
BOLT_CHECK_LT(resultChannel, resultVectors.size())

auto& child = resultVectors[resultChannel];
// TODO: Consider reuse of complex types.
if (!child || !BaseVector::isVectorWritable(child) ||
!child->isFlatEncoding()) {
child =
BaseVector::create(resultTypes[resultChannel], rows.size(), pool);
// `allocateLazyAwareChild` returns a pre-sized LazyComplexVector when a
// codec is active and the type is complex; otherwise delegates to
// BaseVector::create. This matches the lazy configuration of
// table->rows(), so extractColumn's lazy check passes. A cached lazy
// child (LAZY_COMPLEX encoding) is also reusable since extractColumn
// overwrites the inner FlatVector<StringView> in place.
const bool reusable = child && BaseVector::isVectorWritable(child) &&
(child->isFlatEncoding() ||
child->encoding() == VectorEncoding::Simple::LAZY_COMPLEX);
if (!reusable) {
child = allocateLazyAwareChild(
resultTypes[resultChannel], rows.size(), pool);
}
child->resize(rows.size());
table->rows()->extractColumn(
Expand Down
6 changes: 3 additions & 3 deletions bolt/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
#include "bolt/exec/Task.h"
#include "bolt/expression/FieldReference.h"
#include "bolt/vector/BaseVector.h"
#include "bolt/vector/LazyComplexCodec.h"

#include <iostream>
#include <utility>
namespace bytedance::bolt::exec {

Expand Down Expand Up @@ -484,7 +484,7 @@ bool MergeJoin::prepareOutput(
std::vector<VectorPtr> localColumns(outputType_->size());
if (newLeft == nullptr) {
for (const auto& projection : leftProjections_) {
localColumns[projection.outputChannel] = BaseVector::create(
localColumns[projection.outputChannel] = allocateLazyAwareChild(
outputType_->childAt(projection.outputChannel),
outputBatchSize_,
operatorCtx_->pool());
Expand All @@ -502,7 +502,7 @@ bool MergeJoin::prepareOutput(
// Create right side projection outputs.
if (right == nullptr) {
for (const auto& projection : rightProjections_) {
localColumns[projection.outputChannel] = BaseVector::create(
localColumns[projection.outputChannel] = allocateLazyAwareChild(
outputType_->childAt(projection.outputChannel),
outputBatchSize_,
operatorCtx_->pool());
Expand Down
5 changes: 3 additions & 2 deletions bolt/exec/NestedLoopJoinBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "bolt/exec/NestedLoopJoinBuild.h"
#include "bolt/exec/Task.h"
#include "bolt/vector/LazyComplexCodec.h"
namespace bytedance::bolt::exec {

void NestedLoopJoinBridge::setData(std::vector<RowVectorPtr> buildVectors) {
Expand Down Expand Up @@ -103,8 +104,8 @@ std::vector<RowVectorPtr> NestedLoopJoinBuild::mergeDataVectors() const {
if (j == i + 1) {
merged.push_back(dataVectors_[i++]);
} else {
auto batch = BaseVector::create<RowVector>(
dataVectors_[i]->type(), batchSize, pool());
auto batch = allocateLazyAwareRowVector(
asRowType(dataVectors_[i]->type()), batchSize, pool());
batchSize = 0;
while (i < j) {
auto* source = dataVectors_[i++].get();
Expand Down
7 changes: 5 additions & 2 deletions bolt/exec/NestedLoopJoinProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "bolt/exec/OperatorUtils.h"
#include "bolt/exec/Task.h"
#include "bolt/expression/FieldReference.h"
#include "bolt/vector/LazyComplexCodec.h"

namespace bytedance::bolt::exec {
namespace {
Expand Down Expand Up @@ -553,9 +554,11 @@ void NestedLoopJoinProbe::prepareOutput() {
buildVector->childAt(projection.inputChannel));
}
} else {
// Multiple build vectors: use FlatVector with flat copy.
// Multiple build vectors: use FlatVector with flat copy. When the lazy
// codec is active, use a LazyComplexVector for complex columns so
// copyRanges from lazy build inputs stays a byte copy
for (const auto& projection : buildProjections_) {
localColumns[projection.outputChannel] = BaseVector::create(
localColumns[projection.outputChannel] = allocateLazyAwareChild(
outputType_->childAt(projection.outputChannel),
outputBatchSize_,
operatorCtx_->pool());
Expand Down
30 changes: 30 additions & 0 deletions bolt/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "bolt/exec/OperatorStats.h"
#include "bolt/exec/OperatorTraceWriter.h"
#include "bolt/type/Filter.h"
#include "bolt/vector/LazyComplexCodec.h"
namespace bytedance::bolt::exec {

// Represents a column that is copied from input to output, possibly
Expand Down Expand Up @@ -232,6 +233,29 @@ class Operator : public BaseRuntimeStatWriter {
/// @param input Non-empty input vector.
virtual void addInput(RowVectorPtr input) = 0;

/// Per-input-column lazy-encoding preference. Consulted by the Driver at
/// the addInput seam when a `LazyComplexCodec` is active:
/// - kAny : column passes through unchanged.
/// - kForceDecoded : if the arriving child is `LazyComplexVector`,
/// decode it to its original complex vector first.
/// - kForceLazy : if the arriving complex child is not yet lazy,
/// encode it to `LazyComplexVector` first.
///
/// Return an empty vector (the default) if the operator has no
/// preference β€” the Driver skips all dispatch in that case. Otherwise
/// the size must equal the number of children in the input RowVector.
/// When a `LazyComplexCodec` is NOT active the Driver skips dispatch
/// regardless of the declared modes (kForceLazy is a no-op then).
///
/// Operators populate `inputLazyModes_` in their constructor or
/// `initialize()` and leave the accessor alone β€” the default
/// implementation returns the member. Operators with no lazy policy
/// simply leave `inputLazyModes_` empty.
using InputLazyMode = bytedance::bolt::InputLazyMode;
virtual const std::vector<InputLazyMode>& inputLazyModes() const {
return inputLazyModes_;
}

/// Informs 'this' that addInput will no longer be called. This means
/// that any partial state kept by 'this' should be returned by
/// the next call(s) to getOutput. Not used if operator is a source operator,
Expand Down Expand Up @@ -530,6 +554,12 @@ class Operator : public BaseRuntimeStatWriter {
static std::vector<std::unique_ptr<PlanNodeTranslator>>& translators();
friend class NonReclaimableSection;

// Per-input-column lazy-encoding preference returned by the default
// `inputLazyModes()` accessor. Populated by each operator in its
// constructor or `initialize()` when a policy is needed; empty
// otherwise (then the Driver skips dispatch).
std::vector<InputLazyMode> inputLazyModes_;

class MemoryReclaimer : public memory::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
Expand Down
4 changes: 4 additions & 0 deletions bolt/exec/OperatorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ vector_size_t processFilterResults(
return processConstantFilterResults(filterResult, rows);
case VectorEncoding::Simple::FLAT:
return processFlatFilterResults(filterResult, rows, filterEvalCtx, pool);
case VectorEncoding::Simple::LAZY_COMPLEX:
BOLT_FAIL(
"OperatorUtils::processFilterResults is not supported for "
"LAZY_COMPLEX; call decode() first");
default:
return processEncodedFilterResults(
filterResult, rows, filterEvalCtx, pool);
Expand Down
1 change: 1 addition & 0 deletions bolt/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ OrderBy::OrderBy(
operatorCtx_.get(),
hybridSortEnabled,
scatteredModeEnabled);
inputLazyModes_ = sortBuffer_->inputLazyModes();

this->setRuntimeMetric(
OperatorMetricKey::kCanUsedToEstimateHashBuildPartitionNum, "true");
Expand Down
68 changes: 65 additions & 3 deletions bolt/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* --------------------------------------------------------------------------
*/

#include <algorithm>
#include <cstring>
#include <sstream>
#include <utility>
Expand All @@ -37,12 +38,15 @@
#include "bolt/type/StringView.h"
#include "bolt/type/Timestamp.h"
#include "bolt/vector/DecodedVector.h"
#include "bolt/vector/LazyComplexCodec.h"
#include "bolt/vector/LazyComplexVector.h"

#include "bolt/common/memory/ByteStream.h"
#include "bolt/common/memory/RawVector.h"
#include "bolt/exec/Aggregate.h"
#include "bolt/exec/ContainerRowSerde.h"
#include "bolt/exec/Operator.h"
#include "bolt/exec/RowToColumnVector.h"
#include "bolt/type/Type.h"

#ifdef ENABLE_BOLT_JIT
Expand Down Expand Up @@ -338,6 +342,26 @@ RowContainer::RowContainer(
(nullableKeys_ || i >= keyTypes_.size()) ? nullOffsets_[i]
: RowColumn::kNotNullOffset);
}

// Lazy-complex metadata β€” populated only for NON-KEY complex columns.
// Keys (sort keys, hash keys, partition keys) always retain their original
// complex form so that compare/hash paths can read values. Lazy encoding
// is strictly a payload-side optimisation.
// TODO(unknown): since ComplexType data is also store as string for key, we
// may also encoding on keys and support compare direct in row format
const auto numCols = types_.size();
lazyOriginalTypes_.assign(numCols, nullptr);
lazyCodec_ = LazyComplexCodec::activeCodec();
if (lazyCodec_ != nullptr) {
const auto numKeys = keyTypes.size();
for (size_t i = numKeys; i < numCols; ++i) {
const auto& t = types_[i];
if (t->isRow() || t->isArray() || t->isMap()) {
lazyOriginalTypes_[i] = t;
typeKinds_[i] = TypeKind::VARBINARY;
}
}
}
}

RowContainer::~RowContainer() {
Expand Down Expand Up @@ -671,17 +695,39 @@ int32_t RowContainer::storeVariableSizeAt(
void RowContainer::store(const RowVectorPtr& input) {
BOLT_CHECK_EQ(input->childrenSize(), types_.size());
for (auto i = 0; i < types_.size(); ++i) {
BOLT_CHECK_EQ(input->childAt(i)->type(), types_[i]);
// Compare structurally (via Type::operator==) rather than by pointer, so
// that lazily-encoded columns whose type is stored as the original complex
// type still pass when the input uses a freshly-constructed TypePtr.
BOLT_CHECK(
*input->childAt(i)->type() == *types_[i],
"Column {} type mismatch: input={} expected={}",
i,
input->childAt(i)->type()->toString(),
types_[i]->toString());
}
SelectivityVector allRows(input->size());
std::vector<char*> rows(input->size());
for (int row = 0; row < input->size(); ++row) {
rows[row] = this->newRow();
}

// Keep encoded lazy vectors alive for the duration of the store loop
// so their FlatVector<StringView>'s backing buffers don't drop.
std::vector<LazyComplexVectorPtr> lazyKeepalive(input->childrenSize());

auto* inputRow = input->as<RowVector>();
for (size_t colIdx = 0; colIdx < inputRow->childrenSize(); ++colIdx) {
DecodedVector decoded(*inputRow->childAt(colIdx), allRows);
auto kind = inputRow->childAt(colIdx)->type()->kind();
VectorPtr child = inputRow->childAt(colIdx);
if (lazyCodec_ != nullptr && colIdx < lazyOriginalTypes_.size() &&
lazyOriginalTypes_[colIdx] != nullptr) {
lazyKeepalive[colIdx] =
encodeToLazy(child, stringAllocator_->pool(), *lazyCodec_);
child = lazyKeepalive[colIdx]->encoded();
}
DecodedVector decoded(*child, allRows);
// Use typeKinds_[colIdx] for dispatch: lazy-complex columns have their
// kind overridden to VARBINARY in the constructor.
auto kind = typeKinds_[colIdx];
BOLT_DYNAMIC_TYPE_DISPATCH(
this->storeColumn, kind, decoded, input->size(), rows, colIdx);
}
Expand Down Expand Up @@ -839,6 +885,22 @@ void RowContainer::extractString(
values->setNoCopy(index, StringView(rawBuffer, value.size()));
}

std::vector<InputLazyMode> RowContainer::inputLazyModes(
const std::vector<column_index_t>& inputChannels) const {
if (lazyCodec_ == nullptr) {
return {};
}
column_index_t maxCol =
*std::max_element(inputChannels.begin(), inputChannels.end());
std::vector<InputLazyMode> out(maxCol + 1, InputLazyMode::kAny);
for (size_t rc = 0; rc < lazyOriginalTypes_.size(); ++rc) {
if (lazyOriginalTypes_[rc] != nullptr && rc < inputChannels.size()) {
out[inputChannels[rc]] = InputLazyMode::kForceLazy;
}
}
return out;
}

void RowContainer::storeComplexType(
const DecodedVector& decoded,
vector_size_t index,
Expand Down
Loading