Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions bolt/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class AggregateWindowFunction : public exec::WindowFunction {
bolt::memory::MemoryPool* pool,
HashStringAllocator* stringAllocator,
const core::QueryConfig& config)
: WindowFunction(resultType, pool, stringAllocator) {
: WindowFunction(resultType, pool, stringAllocator),
maxBatchSize_(std::max<vector_size_t>(1, config.maxOutputBatchRows())) {
BOLT_USER_CHECK(
!ignoreNulls, "Aggregate window functions do not support IGNORE NULLS");
argTypes_.reserve(args.size());
Expand Down Expand Up @@ -128,6 +129,8 @@ class AggregateWindowFunction : public exec::WindowFunction {
partition_ = partition;

previousFrameMetadata_.reset();
cachedSameFrameResult_.reset();
cachedSameFrameBounds_.reset();
}

void resetAggregateGroup() {
Expand Down Expand Up @@ -158,7 +161,14 @@ class AggregateWindowFunction : public exec::WindowFunction {
FrameMetadata frameMetadata =
analyzeFrameValues(validRows, rawFrameStarts, rawFrameEnds);

if (frameMetadata.incrementalAggregation) {
if (frameMetadata.sameFullPartitionFrame) {
sameFrameAggregation(
validRows,
frameMetadata.firstRow,
frameMetadata.lastRow,
resultOffset,
result);
} else if (frameMetadata.incrementalAggregation) {
vector_size_t startRow;
if (frameMetadata.usePreviousAggregate) {
// If incremental aggregation can be resumed from the previous block,
Expand Down Expand Up @@ -212,6 +222,11 @@ class AggregateWindowFunction : public exec::WindowFunction {

// Resume incremental aggregation from the prior block.
bool usePreviousAggregate;

// True if all rows in this block have the full partition frame. This is
// common for UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. It is handled
// separately to avoid loading the whole partition into argument vectors.
bool sameFullPartitionFrame;
};

bool handleAllEmptyFrames(
Expand Down Expand Up @@ -244,6 +259,8 @@ class AggregateWindowFunction : public exec::WindowFunction {
vector_size_t prevFrameEnds = lastRow;

bool incrementalAggregation = true;
bool allSameFrame = true;
const auto fixedFrameEndRow = lastRow;
validRows.applyToSelected([&](auto i) {
firstRow = std::min(firstRow, rawFrameStarts[i]);
lastRow = std::max(lastRow, rawFrameEnds[i]);
Expand All @@ -254,6 +271,9 @@ class AggregateWindowFunction : public exec::WindowFunction {
incrementalAggregation &= (rawFrameStarts[i] == fixedFrameStartRow);
incrementalAggregation &= rawFrameEnds[i] >= prevFrameEnds;
prevFrameEnds = rawFrameEnds[i];

allSameFrame &= rawFrameStarts[i] == fixedFrameStartRow;
allSameFrame &= rawFrameEnds[i] == fixedFrameEndRow;
});

bool usePreviousAggregate = false;
Expand All @@ -270,7 +290,15 @@ class AggregateWindowFunction : public exec::WindowFunction {
}
}

return {firstRow, lastRow, incrementalAggregation, usePreviousAggregate};
const bool sameFullPartitionFrame = !partition_->supportRowsStreaming() &&
allSameFrame && firstRow == 0 && lastRow == partition_->numRows() - 1;

return {
firstRow,
lastRow,
incrementalAggregation,
usePreviousAggregate,
sameFullPartitionFrame};
}

void fillArgVectors(vector_size_t firstRow, vector_size_t lastRow) {
Expand Down Expand Up @@ -307,6 +335,47 @@ class AggregateWindowFunction : public exec::WindowFunction {
aggregate_->extractValues(&rawSingleGroupRow_, 1, &aggregateResultVector_);
}

void sameFrameAggregation(
const SelectivityVector& validRows,
vector_size_t firstRow,
vector_size_t lastRow,
vector_size_t resultOffset,
const VectorPtr& result) {
if (cachedSameFrameResult_ && cachedSameFrameBounds_.has_value() &&
cachedSameFrameBounds_->first == firstRow &&
cachedSameFrameBounds_->second == lastRow) {
validRows.applyToSelected([&](auto i) {
result->copy(cachedSameFrameResult_.get(), resultOffset + i, 0, 1);
});
} else {
resetAggregateGroup();

for (auto batchStart = firstRow; batchStart <= lastRow;
batchStart += maxBatchSize_) {
const auto batchEnd =
std::min<vector_size_t>(batchStart + maxBatchSize_ - 1, lastRow);
fillArgVectors(batchStart, batchEnd);

SelectivityVector rows(batchEnd - batchStart + 1);
aggregate_->addSingleGroupRawInput(
rawSingleGroupRow_, rows, argVectors_, false);
}

BaseVector::prepareForReuse(aggregateResultVector_, 1);
aggregate_->extractValues(
&rawSingleGroupRow_, 1, &aggregateResultVector_);

validRows.applyToSelected([&](auto i) {
result->copy(aggregateResultVector_.get(), resultOffset + i, 0, 1);
});

cachedSameFrameResult_ = aggregateResultVector_;
cachedSameFrameBounds_ = {firstRow, lastRow};
}

setEmptyFramesResult(validRows, resultOffset, emptyResult_, result);
}

void incrementalAggregation(
const SelectivityVector& validRows,
vector_size_t startFrame,
Expand Down Expand Up @@ -442,6 +511,13 @@ class AggregateWindowFunction : public exec::WindowFunction {
// return the default value of an aggregate (aggregation with no rows) for
// empty frames. e.g. count for empty frames should return 0 and not null.
VectorPtr emptyResult_;

// Maximum number of partition rows to load into argument vectors while
// evaluating one full-partition frame.
const vector_size_t maxBatchSize_;

VectorPtr cachedSameFrameResult_;
std::optional<std::pair<vector_size_t, vector_size_t>> cachedSameFrameBounds_;
};

} // namespace
Expand Down
187 changes: 187 additions & 0 deletions bolt/exec/tests/SpillableWindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
* limitations under the License.
*/

#include <atomic>

#include "bolt/common/base/tests/GTestUtils.h"
#include "bolt/common/file/FileSystems.h"
#include "bolt/exec/Aggregate.h"
#include "bolt/exec/PlanNodeStats.h"
#include "bolt/exec/Window.h"
#include "bolt/exec/tests/utils/AssertQueryBuilder.h"
Expand All @@ -28,6 +31,107 @@ namespace bytedance::bolt::exec {

namespace {

std::atomic<int64_t> windowBatchProbeMaxRows{0};

void updateWindowBatchProbeMaxRows(vector_size_t rows) {
auto current = windowBatchProbeMaxRows.load();
while (rows > current &&
!windowBatchProbeMaxRows.compare_exchange_weak(current, rows)) {
}
}

class WindowBatchProbeAggregate : public Aggregate {
public:
explicit WindowBatchProbeAggregate(TypePtr resultType)
: Aggregate(std::move(resultType)) {}

int32_t accumulatorFixedWidthSize() const override {
return sizeof(int64_t);
}

void initializeNewGroups(
char** groups,
folly::Range<const vector_size_t*> indices) override {
for (auto i : indices) {
groups[i][nullByte_] &= ~nullMask_;
*value<int64_t>(groups[i]) = 0;
}
}

void addRawInput(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) override {
addSingleGroupRawInput(groups[0], rows, args, mayPushdown);
}

void addIntermediateResults(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) override {
addRawInput(groups, rows, args, mayPushdown);
}

void addSingleGroupRawInput(
char* group,
const SelectivityVector& rows,
const std::vector<VectorPtr>& /*args*/,
bool /*mayPushdown*/) override {
updateWindowBatchProbeMaxRows(rows.countSelected());
*value<int64_t>(group) += rows.countSelected();
}

void addSingleGroupIntermediateResults(
char* group,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) override {
addSingleGroupRawInput(group, rows, args, mayPushdown);
}

void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override {
BaseVector::ensureWritable(
SelectivityVector(numGroups), BIGINT(), pool_, *result);
auto flatResult = (*result)->as<FlatVector<int64_t>>();
auto rawValues = flatResult->mutableRawValues();
auto rawNulls = getRawNulls(flatResult);
for (auto i = 0; i < numGroups; ++i) {
clearNull(rawNulls, i);
rawValues[i] = *value<int64_t>(groups[i]);
}
}

void extractAccumulators(char** groups, int32_t numGroups, VectorPtr* result)
override {
extractValues(groups, numGroups, result);
}

static std::vector<std::shared_ptr<AggregateFunctionSignature>> signatures() {
return {AggregateFunctionSignatureBuilder()
.returnType("bigint")
.intermediateType("bigint")
.argumentType("bigint")
.build()};
}
};

void registerWindowBatchProbeAggregate() {
registerAggregateFunction(
"window_batch_probe",
WindowBatchProbeAggregate::signatures(),
[](core::AggregationNode::Step /*step*/,
const std::vector<TypePtr>& /*argTypes*/,
const TypePtr& resultType,
const core::QueryConfig& /*config*/) -> std::unique_ptr<Aggregate> {
return std::make_unique<WindowBatchProbeAggregate>(resultType);
},
false,
true);
}

class SpillableWindowTest : public OperatorTestBase {
public:
void SetUp() override {
Expand Down Expand Up @@ -392,6 +496,89 @@ TEST_F(SpillableWindowTest, noneSpillable) {
}
}

TEST_F(SpillableWindowTest, nonSpillableLastAggUnboundedFrame) {
const vector_size_t size = 2'048;
auto data = makeRowVector(
{"d", "p", "s"},
{
makeFlatVector<std::string>(
size,
[](auto row) {
return fmt::format("{}{}", row, std::string(8 * 1024, 'a'));
}),
makeFlatVector<int16_t>(size, [](auto row) { return row / 512; }),
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
});

createDuckDbTable({data});

auto windowExpr =
"last(d) over (partition by p order by s ROWS BETWEEN UNBOUNDED "
"PRECEDING and UNBOUNDED FOLLOWING)";
core::PlanNodeId windowId;
auto plan = PlanBuilder()
.values(split(data, 10))
.window({windowExpr})
.capturePlanNodeId(windowId)
.planNode();

auto spillDirectory = TempDirectoryPath::create();
auto task =
AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(core::QueryConfig::kPreferredOutputBatchBytes, "256")
.config(core::QueryConfig::kPreferredOutputBatchRows, "50")
.config(core::QueryConfig::kMaxOutputBatchRows, "50")
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kWindowSpillEnabled, "true")
.config(core::QueryConfig::kTestingSpillPct, "100")
.spillDirectory(spillDirectory->path)
.assertResults(fmt::format("SELECT *, {} FROM tmp", windowExpr));

const auto taskStats = exec::toPlanStats(task->taskStats());
const auto& stats = taskStats.at(windowId);

ASSERT_EQ(stats.spilledBytes, 0);
ASSERT_EQ(stats.spilledRows, 0);
ASSERT_EQ(stats.spilledFiles, 0);
ASSERT_EQ(stats.spilledPartitions, 0);
}

TEST_F(SpillableWindowTest, nonSpillableSameFrameAggregationBatchesInput) {
registerWindowBatchProbeAggregate();
windowBatchProbeMaxRows = 0;

const vector_size_t size = 1'024;
auto data = makeRowVector(
{"d", "p", "s"},
{
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
makeFlatVector<int16_t>(size, [](auto row) { return row / 512; }),
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
});

auto windowExpr =
"window_batch_probe(d) over (partition by p order by s ROWS BETWEEN "
"UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING)";
auto plan =
PlanBuilder().values(split(data, 8)).window({windowExpr}).planNode();

auto expected = makeRowVector(
{"d", "p", "s", "w"},
{
data->childAt(0),
data->childAt(1),
data->childAt(2),
makeFlatVector<int64_t>(size, [](auto /*row*/) { return 512; }),
});

AssertQueryBuilder(plan)
.config(core::QueryConfig::kPreferredOutputBatchRows, "50")
.config(core::QueryConfig::kMaxOutputBatchRows, "50")
.assertResults(expected);

ASSERT_EQ(windowBatchProbeMaxRows.load(), 50);
}

TEST_F(SpillableWindowTest, basic) {
constexpr vector_size_t size = 4096;
const auto testCases = std::vector<TestCase>{
Expand Down
Loading