-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Add datasketches HLL sketch aggregate functions #63143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
a70254f
8a277f9
89dae1b
216d172
2fd6967
cdf2f9d
894be1a
201ae9c
8d36961
fd97b44
03e739c
99f59fe
09b8b25
82b7fc2
b90dc9d
4d2e209
ace372e
d2a11a2
1e9c55f
1b85ce6
1873aec
acb64ee
42bab33
661002c
8805b07
0be7062
9a7d371
21cfd93
e954b27
a436b03
33110b1
f422370
57aee5a
e33bfb6
2ee0068
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h" | ||
|
|
||
| #include <string> | ||
|
|
||
| #include "core/data_type/data_type.h" | ||
| #include "core/data_type/define_primitive_type.h" | ||
| #include "exec/common/hash_table/hash.h" // IWYU pragma: keep | ||
| #include "exprs/aggregate/aggregate_function_simple_factory.h" | ||
| #include "exprs/aggregate/helpers.h" | ||
| namespace doris { | ||
| template <template <PrimitiveType> class Data> | ||
| AggregateFunctionPtr create_aggregate_function_datasketches_hll_union_agg( | ||
| const std::string& name, const DataTypes& argument_types, const DataTypePtr& result_type, | ||
| const bool result_is_nullable, const AggregateFunctionAttr& attr) { | ||
| return creator_with_type_list<TYPE_STRING, TYPE_VARCHAR, TYPE_BINARY, TYPE_VARBINARY>::create< | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In FE, you have only registered |
||
| AggregateFunctionDataSketchesHllUnionAgg, Data>(argument_types, result_is_nullable, | ||
| attr); | ||
| } | ||
| void register_aggregate_function_datasketches_HLL_union_agg( | ||
| AggregateFunctionSimpleFactory& factory) { | ||
| AggregateFunctionCreator creator = | ||
| create_aggregate_function_datasketches_hll_union_agg<AggregateFunctionHllSketchData>; | ||
| factory.register_function_both("datasketches_hll_union_agg", creator); | ||
| factory.register_alias("datasketches_hll_union_agg", "ds_hll_union_count"); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the basis for aliases? For the same behavior, sr named |
||
| factory.register_alias("datasketches_hll_union_agg", "ds_cardinality"); | ||
| } | ||
| } // namespace doris | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,233 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once | ||
| #include <stddef.h> | ||
|
|
||
| #include <algorithm> | ||
| #include <DataSketches/hll.hpp> | ||
| #include <boost/iterator/iterator_facade.hpp> | ||
| #include <memory> | ||
| #include <optional> | ||
| #include <type_traits> | ||
| #include <vector> | ||
|
|
||
| #include "common/compiler_util.h" // IWYU pragma: keep | ||
| #include "core/assert_cast.h" | ||
| #include "core/column/column.h" | ||
| #include "core/column/column_varbinary.h" | ||
| #include "core/column/column_vector.h" | ||
| #include "core/custom_allocator.h" | ||
| #include "core/data_type/data_type_number.h" | ||
| #include "core/data_type/define_primitive_type.h" | ||
| #include "core/field.h" | ||
| #include "core/string_ref.h" | ||
| #include "core/types.h" | ||
| #include "core/uint128.h" | ||
| #include "exec/common/hash_table/hash.h" | ||
| #include "exec/common/hash_table/phmap_fwd_decl.h" | ||
| #include "exprs/aggregate/aggregate_function.h" | ||
| #include "util/var_int.h" | ||
| template <typename T> | ||
| struct HashCRC32; | ||
| namespace doris { | ||
| class Arena; | ||
| class BufferReadable; | ||
| class BufferWritable; | ||
| template <PrimitiveType T> | ||
| class ColumnDecimal; | ||
| /// datasketches_hll_union_agg | ||
| template <PrimitiveType T> | ||
| struct AggregateFunctionHllSketchData { | ||
| /** We set the default LgK to 12, | ||
| * as this value is used as a performance baseline in the relevant documentation. | ||
| * (https://datasketches.apache.org/docs/HLL/HllPerformance.html) | ||
| */ | ||
| static constexpr uint8_t DEFAULT_LOG_K = 12; | ||
| using Alloc = CustomStdAllocator<uint8_t>; | ||
| using Sketch = datasketches::hll_sketch_alloc<Alloc>; | ||
| using Union = datasketches::hll_union_alloc<Alloc>; | ||
|
|
||
| std::optional<Union> hll_union_data; | ||
|
|
||
| static String get_name() { return "datasketches_hll_union_agg"; } | ||
|
|
||
| void merge(const Sketch& sketch_data) { | ||
| if (!hll_union_data.has_value()) { | ||
| /** We clamp max lg_k to [7, 21], | ||
| * considering that the code comment requires 7 to 21. | ||
| * See: datasketches-cpp/hll/include/hll.hpp:451 | ||
| */ | ||
| constexpr uint8_t MIN_UNION_LOG_K = 7; | ||
| const uint8_t union_lg_k = std::clamp<uint8_t>(sketch_data.get_lg_config_k(), | ||
| MIN_UNION_LOG_K, | ||
| datasketches::hll_constants::MAX_LOG_K); | ||
| hll_union_data.emplace(union_lg_k, Alloc()); | ||
| } | ||
| try { | ||
| hll_union_data->update(sketch_data); | ||
| } catch (const doris::Exception& e) { | ||
| throw Exception(e.code(), "Internal error happened when update HLL sketch: {}", | ||
| e.to_string()); | ||
| } catch (const std::exception& e) { | ||
| throw Exception(ErrorCode::INTERNAL_ERROR, | ||
| "Internal error happened when update HLL sketch: {}", e.what()); | ||
| } catch (...) { | ||
|
nooneuse marked this conversation as resolved.
|
||
| throw Exception(ErrorCode::INTERNAL_ERROR, | ||
| "Internal error happened when update HLL sketch: unknown exception."); | ||
| } | ||
| } | ||
| void reset() { | ||
| if (hll_union_data.has_value()) { | ||
| hll_union_data->reset(); | ||
| } | ||
| hll_union_data.reset(); | ||
| } | ||
|
|
||
| void write_sketch(BufferWritable& buf, const Sketch& sk) const { | ||
| auto serialized_bytes = sk.serialize_compact(); | ||
| StringRef d(serialized_bytes.data(), serialized_bytes.size()); | ||
| buf.write_binary(d); | ||
| } | ||
| void write(BufferWritable& buf) const { | ||
| if (!hll_union_data.has_value()) { | ||
| /** Using DEFAULT_LOG_K(12) here is surely sufficient, | ||
| * because in this case the union that actually needs to be serialized should contain no data. | ||
| */ | ||
| Union u(DEFAULT_LOG_K, Alloc()); | ||
| write_sketch(buf, u.get_result()); | ||
| return; | ||
| } | ||
| try { | ||
| auto cache = hll_union_data->get_result(); | ||
| write_sketch(buf, cache); | ||
| } catch (const doris::Exception& e) { | ||
| throw Exception(e.code(), "Internal error happened when serialize HLL sketch: {}", | ||
| e.to_string()); | ||
| } catch (const std::exception& e) { | ||
| throw Exception(ErrorCode::INTERNAL_ERROR, | ||
| "Internal error happened when serialize HLL sketch: {}", e.what()); | ||
| } catch (...) { | ||
| throw Exception(ErrorCode::INTERNAL_ERROR, | ||
| "Internal error happened when serialize HLL sketch: unknown exception."); | ||
| } | ||
| } | ||
| void read(BufferReadable& buf) { | ||
| StringRef d; | ||
| buf.read_binary(d); | ||
| try { | ||
| auto cache = Sketch::deserialize(d.data, d.size, Alloc()); | ||
| merge(cache); | ||
| } catch (const doris::Exception& e) { | ||
| throw Exception(ErrorCode::CORRUPTION, "HLL sketch data corrupted when read: {}", | ||
| e.to_string()); | ||
| } catch (const std::exception& e) { | ||
| throw Exception(ErrorCode::CORRUPTION, "HLL sketch data corrupted when read: {}", | ||
| e.what()); | ||
| } catch (...) { | ||
| throw Exception(ErrorCode::CORRUPTION, | ||
| "HLL sketch data corrupted when read: unknown exception."); | ||
| } | ||
| } | ||
| double get_result() const { | ||
| if (hll_union_data.has_value()) { | ||
| try { | ||
| return hll_union_data->get_estimate(); | ||
| } catch (const doris::Exception& e) { | ||
| throw Exception(e.code(), "Internal error happened when get HLL sketch estimate: {}", | ||
| e.to_string()); | ||
| } catch (const std::exception& e) { | ||
| throw Exception(ErrorCode::INTERNAL_ERROR, | ||
| "Internal error happened when get HLL sketch estimate: {}", | ||
| e.what()); | ||
| } catch (...) { | ||
| throw Exception( | ||
| ErrorCode::INTERNAL_ERROR, | ||
| "Internal error happened when get HLL sketch estimate: unknown exception."); | ||
| } | ||
| } | ||
| return 0.0; | ||
| } | ||
| }; | ||
|
|
||
| /// Calculates the number of different values approximately using hll sketch. | ||
| template <PrimitiveType T, typename Data> | ||
| class AggregateFunctionDataSketchesHllUnionAgg final | ||
| : public IAggregateFunctionDataHelper<Data, | ||
| AggregateFunctionDataSketchesHllUnionAgg<T, Data>>, | ||
| UnaryExpression, | ||
| NotNullableAggregateFunction { | ||
| public: | ||
| AggregateFunctionDataSketchesHllUnionAgg(const DataTypes& argument_types_) | ||
| : IAggregateFunctionDataHelper<Data, AggregateFunctionDataSketchesHllUnionAgg<T, Data>>( | ||
| argument_types_) {} | ||
| String get_name() const override { return Data::get_name(); } | ||
| DataTypePtr get_return_type() const override { return std::make_shared<DataTypeFloat64>(); } | ||
| void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } | ||
| void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, | ||
| Arena&) const override { | ||
| add_one(this->data(place), *columns[0], row_num); | ||
| } | ||
| void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, | ||
| Arena&) const override { | ||
| const auto& rhs_data = this->data(rhs); | ||
| if (!rhs_data.hll_union_data.has_value()) { | ||
| return; | ||
| } | ||
| this->data(place).merge(rhs_data.hll_union_data->get_result(datasketches::HLL_8)); | ||
| } | ||
| void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { | ||
| this->data(place).write(buf); | ||
| } | ||
| void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, | ||
| Arena&) const override { | ||
| this->data(place).read(buf); | ||
| } | ||
| void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { | ||
| assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get_result()); | ||
| } | ||
|
|
||
| private: | ||
| static void ALWAYS_INLINE add_one(Data& data, const IColumn& column, ssize_t row_num) { | ||
| if constexpr (is_string_type(T) || is_varbinary(T)) { | ||
| const auto& src_column = | ||
| assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&, | ||
| TypeCheckOnRelease::DISABLE>(column); | ||
| StringRef value = src_column.get_data_at(static_cast<size_t>(row_num)); | ||
| if (value.empty()) { | ||
| throw Exception(ErrorCode::CORRUPTION, | ||
| "HLL sketch data corrupted when add: empty input."); | ||
| } | ||
| try { | ||
| using Sketch = typename Data::Sketch; | ||
| using Alloc = typename Data::Alloc; | ||
| Sketch sketch_data = Sketch::deserialize(value.begin(), value.size, Alloc()); | ||
| data.merge(sketch_data); | ||
| } catch (const doris::Exception& e) { | ||
| throw Exception(ErrorCode::CORRUPTION, "HLL sketch data corrupted when add: {}", | ||
| e.to_string()); | ||
| } catch (const std::exception& e) { | ||
| throw Exception(ErrorCode::CORRUPTION, "HLL sketch data corrupted when add: {}", | ||
| e.what()); | ||
| } catch (...) { | ||
| throw Exception(ErrorCode::CORRUPTION, | ||
| "HLL sketch data corrupted when add: unknown exception."); | ||
| } | ||
| } | ||
| } | ||
| }; | ||
| } // namespace doris | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I directly pointed the submodule to the Apache DataSketches GitHub repository. Later, if needed, we can consider adding DataSketches to the doris-thirdparty repository.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why maintain through
contribinstead ofthirdparty?