diff --git a/bolt/exec/AggregateWindow.cpp b/bolt/exec/AggregateWindow.cpp index 186bd7ead..47fe0cac4 100644 --- a/bolt/exec/AggregateWindow.cpp +++ b/bolt/exec/AggregateWindow.cpp @@ -53,7 +53,8 @@ class AggregateWindowFunction : public exec::WindowFunction { bolt::memory::MemoryPool* pool, HashStringAllocator* stringAllocator, const core::QueryConfig& config) - : WindowFunction(resultType, pool, stringAllocator) { + : WindowFunction(resultType, pool, stringAllocator), + maxBatchSize_(std::max(1, config.maxOutputBatchRows())) { BOLT_USER_CHECK( !ignoreNulls, "Aggregate window functions do not support IGNORE NULLS"); argTypes_.reserve(args.size()); @@ -128,6 +129,8 @@ class AggregateWindowFunction : public exec::WindowFunction { partition_ = partition; previousFrameMetadata_.reset(); + cachedSameFrameResult_.reset(); + cachedSameFrameBounds_.reset(); } void resetAggregateGroup() { @@ -158,7 +161,14 @@ class AggregateWindowFunction : public exec::WindowFunction { FrameMetadata frameMetadata = analyzeFrameValues(validRows, rawFrameStarts, rawFrameEnds); - if (frameMetadata.incrementalAggregation) { + if (frameMetadata.sameFullPartitionFrame) { + sameFrameAggregation( + validRows, + frameMetadata.firstRow, + frameMetadata.lastRow, + resultOffset, + result); + } else if (frameMetadata.incrementalAggregation) { vector_size_t startRow; if (frameMetadata.usePreviousAggregate) { // If incremental aggregation can be resumed from the previous block, @@ -212,6 +222,11 @@ class AggregateWindowFunction : public exec::WindowFunction { // Resume incremental aggregation from the prior block. bool usePreviousAggregate; + + // True if all rows in this block have the full partition frame. This is + // common for UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. It is handled + // separately to avoid loading the whole partition into argument vectors. + bool sameFullPartitionFrame; }; bool handleAllEmptyFrames( @@ -244,6 +259,8 @@ class AggregateWindowFunction : public exec::WindowFunction { vector_size_t prevFrameEnds = lastRow; bool incrementalAggregation = true; + bool allSameFrame = true; + const auto fixedFrameEndRow = lastRow; validRows.applyToSelected([&](auto i) { firstRow = std::min(firstRow, rawFrameStarts[i]); lastRow = std::max(lastRow, rawFrameEnds[i]); @@ -254,6 +271,9 @@ class AggregateWindowFunction : public exec::WindowFunction { incrementalAggregation &= (rawFrameStarts[i] == fixedFrameStartRow); incrementalAggregation &= rawFrameEnds[i] >= prevFrameEnds; prevFrameEnds = rawFrameEnds[i]; + + allSameFrame &= rawFrameStarts[i] == fixedFrameStartRow; + allSameFrame &= rawFrameEnds[i] == fixedFrameEndRow; }); bool usePreviousAggregate = false; @@ -270,7 +290,15 @@ class AggregateWindowFunction : public exec::WindowFunction { } } - return {firstRow, lastRow, incrementalAggregation, usePreviousAggregate}; + const bool sameFullPartitionFrame = !partition_->supportRowsStreaming() && + allSameFrame && firstRow == 0 && lastRow == partition_->numRows() - 1; + + return { + firstRow, + lastRow, + incrementalAggregation, + usePreviousAggregate, + sameFullPartitionFrame}; } void fillArgVectors(vector_size_t firstRow, vector_size_t lastRow) { @@ -307,6 +335,47 @@ class AggregateWindowFunction : public exec::WindowFunction { aggregate_->extractValues(&rawSingleGroupRow_, 1, &aggregateResultVector_); } + void sameFrameAggregation( + const SelectivityVector& validRows, + vector_size_t firstRow, + vector_size_t lastRow, + vector_size_t resultOffset, + const VectorPtr& result) { + if (cachedSameFrameResult_ && cachedSameFrameBounds_.has_value() && + cachedSameFrameBounds_->first == firstRow && + cachedSameFrameBounds_->second == lastRow) { + validRows.applyToSelected([&](auto i) { + result->copy(cachedSameFrameResult_.get(), resultOffset + i, 0, 1); + }); + } else { + resetAggregateGroup(); + + for (auto batchStart = firstRow; batchStart <= lastRow; + batchStart += maxBatchSize_) { + const auto batchEnd = + std::min(batchStart + maxBatchSize_ - 1, lastRow); + fillArgVectors(batchStart, batchEnd); + + SelectivityVector rows(batchEnd - batchStart + 1); + aggregate_->addSingleGroupRawInput( + rawSingleGroupRow_, rows, argVectors_, false); + } + + BaseVector::prepareForReuse(aggregateResultVector_, 1); + aggregate_->extractValues( + &rawSingleGroupRow_, 1, &aggregateResultVector_); + + validRows.applyToSelected([&](auto i) { + result->copy(aggregateResultVector_.get(), resultOffset + i, 0, 1); + }); + + cachedSameFrameResult_ = aggregateResultVector_; + cachedSameFrameBounds_ = {firstRow, lastRow}; + } + + setEmptyFramesResult(validRows, resultOffset, emptyResult_, result); + } + void incrementalAggregation( const SelectivityVector& validRows, vector_size_t startFrame, @@ -442,6 +511,13 @@ class AggregateWindowFunction : public exec::WindowFunction { // return the default value of an aggregate (aggregation with no rows) for // empty frames. e.g. count for empty frames should return 0 and not null. VectorPtr emptyResult_; + + // Maximum number of partition rows to load into argument vectors while + // evaluating one full-partition frame. + const vector_size_t maxBatchSize_; + + VectorPtr cachedSameFrameResult_; + std::optional> cachedSameFrameBounds_; }; } // namespace diff --git a/bolt/exec/tests/SpillableWindowTest.cpp b/bolt/exec/tests/SpillableWindowTest.cpp index c52ffd7fc..166cd3009 100644 --- a/bolt/exec/tests/SpillableWindowTest.cpp +++ b/bolt/exec/tests/SpillableWindowTest.cpp @@ -14,8 +14,11 @@ * limitations under the License. */ +#include + #include "bolt/common/base/tests/GTestUtils.h" #include "bolt/common/file/FileSystems.h" +#include "bolt/exec/Aggregate.h" #include "bolt/exec/PlanNodeStats.h" #include "bolt/exec/Window.h" #include "bolt/exec/tests/utils/AssertQueryBuilder.h" @@ -28,6 +31,107 @@ namespace bytedance::bolt::exec { namespace { +std::atomic windowBatchProbeMaxRows{0}; + +void updateWindowBatchProbeMaxRows(vector_size_t rows) { + auto current = windowBatchProbeMaxRows.load(); + while (rows > current && + !windowBatchProbeMaxRows.compare_exchange_weak(current, rows)) { + } +} + +class WindowBatchProbeAggregate : public Aggregate { + public: + explicit WindowBatchProbeAggregate(TypePtr resultType) + : Aggregate(std::move(resultType)) {} + + int32_t accumulatorFixedWidthSize() const override { + return sizeof(int64_t); + } + + void initializeNewGroups( + char** groups, + folly::Range indices) override { + for (auto i : indices) { + groups[i][nullByte_] &= ~nullMask_; + *value(groups[i]) = 0; + } + } + + void addRawInput( + char** groups, + const SelectivityVector& rows, + const std::vector& args, + bool mayPushdown) override { + addSingleGroupRawInput(groups[0], rows, args, mayPushdown); + } + + void addIntermediateResults( + char** groups, + const SelectivityVector& rows, + const std::vector& args, + bool mayPushdown) override { + addRawInput(groups, rows, args, mayPushdown); + } + + void addSingleGroupRawInput( + char* group, + const SelectivityVector& rows, + const std::vector& /*args*/, + bool /*mayPushdown*/) override { + updateWindowBatchProbeMaxRows(rows.countSelected()); + *value(group) += rows.countSelected(); + } + + void addSingleGroupIntermediateResults( + char* group, + const SelectivityVector& rows, + const std::vector& args, + bool mayPushdown) override { + addSingleGroupRawInput(group, rows, args, mayPushdown); + } + + void extractValues(char** groups, int32_t numGroups, VectorPtr* result) + override { + BaseVector::ensureWritable( + SelectivityVector(numGroups), BIGINT(), pool_, *result); + auto flatResult = (*result)->as>(); + auto rawValues = flatResult->mutableRawValues(); + auto rawNulls = getRawNulls(flatResult); + for (auto i = 0; i < numGroups; ++i) { + clearNull(rawNulls, i); + rawValues[i] = *value(groups[i]); + } + } + + void extractAccumulators(char** groups, int32_t numGroups, VectorPtr* result) + override { + extractValues(groups, numGroups, result); + } + + static std::vector> signatures() { + return {AggregateFunctionSignatureBuilder() + .returnType("bigint") + .intermediateType("bigint") + .argumentType("bigint") + .build()}; + } +}; + +void registerWindowBatchProbeAggregate() { + registerAggregateFunction( + "window_batch_probe", + WindowBatchProbeAggregate::signatures(), + [](core::AggregationNode::Step /*step*/, + const std::vector& /*argTypes*/, + const TypePtr& resultType, + const core::QueryConfig& /*config*/) -> std::unique_ptr { + return std::make_unique(resultType); + }, + false, + true); +} + class SpillableWindowTest : public OperatorTestBase { public: void SetUp() override { @@ -392,6 +496,89 @@ TEST_F(SpillableWindowTest, noneSpillable) { } } +TEST_F(SpillableWindowTest, nonSpillableLastAggUnboundedFrame) { + const vector_size_t size = 2'048; + auto data = makeRowVector( + {"d", "p", "s"}, + { + makeFlatVector( + size, + [](auto row) { + return fmt::format("{}{}", row, std::string(8 * 1024, 'a')); + }), + makeFlatVector(size, [](auto row) { return row / 512; }), + makeFlatVector(size, [](auto row) { return row; }), + }); + + createDuckDbTable({data}); + + auto windowExpr = + "last(d) over (partition by p order by s ROWS BETWEEN UNBOUNDED " + "PRECEDING and UNBOUNDED FOLLOWING)"; + core::PlanNodeId windowId; + auto plan = PlanBuilder() + .values(split(data, 10)) + .window({windowExpr}) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "256") + .config(core::QueryConfig::kPreferredOutputBatchRows, "50") + .config(core::QueryConfig::kMaxOutputBatchRows, "50") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .config(core::QueryConfig::kTestingSpillPct, "100") + .spillDirectory(spillDirectory->path) + .assertResults(fmt::format("SELECT *, {} FROM tmp", windowExpr)); + + const auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(windowId); + + ASSERT_EQ(stats.spilledBytes, 0); + ASSERT_EQ(stats.spilledRows, 0); + ASSERT_EQ(stats.spilledFiles, 0); + ASSERT_EQ(stats.spilledPartitions, 0); +} + +TEST_F(SpillableWindowTest, nonSpillableSameFrameAggregationBatchesInput) { + registerWindowBatchProbeAggregate(); + windowBatchProbeMaxRows = 0; + + const vector_size_t size = 1'024; + auto data = makeRowVector( + {"d", "p", "s"}, + { + makeFlatVector(size, [](auto row) { return row; }), + makeFlatVector(size, [](auto row) { return row / 512; }), + makeFlatVector(size, [](auto row) { return row; }), + }); + + auto windowExpr = + "window_batch_probe(d) over (partition by p order by s ROWS BETWEEN " + "UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING)"; + auto plan = + PlanBuilder().values(split(data, 8)).window({windowExpr}).planNode(); + + auto expected = makeRowVector( + {"d", "p", "s", "w"}, + { + data->childAt(0), + data->childAt(1), + data->childAt(2), + makeFlatVector(size, [](auto /*row*/) { return 512; }), + }); + + AssertQueryBuilder(plan) + .config(core::QueryConfig::kPreferredOutputBatchRows, "50") + .config(core::QueryConfig::kMaxOutputBatchRows, "50") + .assertResults(expected); + + ASSERT_EQ(windowBatchProbeMaxRows.load(), 50); +} + TEST_F(SpillableWindowTest, basic) { constexpr vector_size_t size = 4096; const auto testCases = std::vector{