Add datasketches HLL sketch aggregate functions#63143
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
run buildall |
|
compile |
FE UT Coverage ReportIncrement line coverage |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
TPC-DS: Total hot run time: 171529 ms |
Hello @zclllyybb , I have added docs for the datasketches_hll_union_agg aggregate function(apache/doris-website#3711). Also, could you please help trigger /review again? I've fixed the new comments from the bot. |
TPC-H: Total hot run time: 31515 ms |
TPC-DS: Total hot run time: 168794 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
|
run p0 |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
|
run nonconcurrent |
|
run buildall |
|
|
||
| echo "install datasketches-cpp to thirdparty path before build be" | ||
| update_submodule "contrib/datasketches-cpp" "datasketches-cpp" "https://github.com/apache/datasketches-cpp/archive/refs/tags/5.2.0.tar.gz" | ||
| cd "${DORIS_HOME}/contrib/datasketches-cpp" |
There was a problem hiding this comment.
why maintain through contrib instead of thirdparty?
| AggregateFunctionPtr create_aggregate_function_datasketches_hll_union_agg( | ||
| const std::string& name, const DataTypes& argument_types, const DataTypePtr& result_type, | ||
| const bool result_is_nullable, const AggregateFunctionAttr& attr) { | ||
| return creator_with_type_list<TYPE_STRING, TYPE_VARCHAR, TYPE_BINARY, TYPE_VARBINARY>::create< |
There was a problem hiding this comment.
In FE, you have only registered .args(StringType.INSTANCE). Do the other types also need to be registered in FE?
TPC-H: Total hot run time: 31164 ms |
| AggregateFunctionCreator creator = | ||
| create_aggregate_function_datasketches_hll_union_agg<AggregateFunctionHllSketchData>; | ||
| factory.register_function_both("datasketches_hll_union_agg", creator); | ||
| factory.register_alias("datasketches_hll_union_agg", "ds_hll_union_count"); |
There was a problem hiding this comment.
What is the basis for aliases? For the same behavior, sr named ds_hll_estimate, SnowFlake named DATASKETCHES_HLL_ESTIMATE. I think these should be registered with this name or alias.
| * as this value is used as a performance baseline in the relevant documentation. | ||
| * (https://datasketches.apache.org/docs/HLL/HllPerformance.html) | ||
| */ | ||
| static const uint8_t DEFAULT_LOG_K = 12; |
There was a problem hiding this comment.
| static const uint8_t DEFAULT_LOG_K = 12; | |
| static constexpr uint8_t DEFAULT_LOG_K = 12; |
| } | ||
| try { | ||
| hll_union_data->update(sketch_data); | ||
| } catch (...) { |
There was a problem hiding this comment.
Can we refine the exception handling? Can we include doris::Exception and std::exception::what() in the error messages to identifying the cause of DataSketche failure?
TPC-DS: Total hot run time: 170760 ms |
| namespace detail { | ||
| /** The structure for the delegation work to add one element to the `datasketches_hll_union_agg` aggregate functions. | ||
| * Used for partial specialization to add strings. | ||
| */ | ||
| template <PrimitiveType T, typename Data> | ||
| struct OneAdder { | ||
| static void ALWAYS_INLINE add(Data& data, const IColumn& column, size_t row_num) { | ||
| if constexpr (is_string_type(T) || is_varbinary(T)) { | ||
| StringRef value = column.get_data_at(row_num); | ||
| if (value.empty()) { | ||
| throw Exception(ErrorCode::CORRUPTION, | ||
| "HLL sketch data corrupted when add: empty input."); | ||
| } | ||
| try { | ||
| using Sketch = typename Data::Sketch; | ||
| using Alloc = typename Data::Alloc; | ||
| Sketch sketch_data = Sketch::deserialize(value.begin(), value.size, Alloc()); | ||
| data.merge(sketch_data); | ||
| } catch (...) { | ||
| throw Exception(ErrorCode::CORRUPTION, "HLL sketch data corrupted when add."); | ||
| } | ||
| } | ||
| } | ||
| }; | ||
| } // namespace detail |
There was a problem hiding this comment.
The detail::OneAdder is used by the current aggregation function only and is placed separately in the namespace, I think the abstraction benefits are not obvious.
Can this be changed to the private static function of AggregateFunctionDataSketchesHllUnionAgg to make the call relationship and scope more intuitive?
| class AggregateFunctionDataSketchesHllUnionAgg final | ||
| : public IAggregateFunctionDataHelper<Data, | ||
| AggregateFunctionDataSketchesHllUnionAgg<T, Data>>, | ||
| VarargsExpression, |
There was a problem hiding this comment.
| VarargsExpression, | |
| UnaryExpression, |
| public class DataSketchesHllUnionAgg extends NotNullableAggregateFunction | ||
| implements UnaryExpression, ExplicitlyCastableSignature, FunctionTrait, RollUpTrait { | ||
| public static final List<FunctionSignature> SIGNATURES = ImmutableList.of( | ||
| FunctionSignature.ret(BigIntType.INSTANCE).args(StringType.INSTANCE) |
There was a problem hiding this comment.
What is the consideration for returning to BigInt? I see that the SnowFlake and the get_estimate api return type provided by the third-party library are both double.
| struct OneAdder { | ||
| static void ALWAYS_INLINE add(Data& data, const IColumn& column, size_t row_num) { | ||
| if constexpr (is_string_type(T) || is_varbinary(T)) { | ||
| StringRef value = column.get_data_at(row_num); |
There was a problem hiding this comment.
assert_cast<PrimitiveTypeTraits<T>::ColumnType>(column) before call get_data_at to avoid the overhead of virtual function calls.
|
/review |
There was a problem hiding this comment.
I found one correctness issue that should be fixed before merge.
Critical checkpoint conclusions:
- Goal/test coverage: the PR adds a DataSketches HLL union aggregate with BE/FE registration and regression/unit tests, but coverage misses valid low-lg_k input sketches.
- Scope/focus: the change is mostly focused on the new aggregate and build dependency wiring.
- Concurrency/lifecycle: no new shared concurrency path was identified; aggregate state lifecycle is handled through create/destroy/reset, but the initial union construction has a validity bug.
- Configuration/compatibility: no new config or persisted storage format is introduced; table aggregation-type exposure from earlier revisions appears removed.
- Parallel paths: BE registers string/varchar/binary/varbinary while FE currently exposes string only; this was already raised in an existing thread, so I did not duplicate it.
- Error handling/data correctness: invalid inputs are converted to CORRUPTION, but one valid DataSketches input range is also converted to failure because of the union lg_k initialization.
- Memory/observability: allocator-aware DataSketches state is now used; no additional observability requirement found for this aggregate.
- Regression standards: result queries are deterministic; negative cases use test/exception.
- User focus: no additional user-provided review focus was specified.
|
|
||
| void merge(const Sketch& sketch_data) { | ||
| if (!hll_union_data.has_value()) { | ||
| hll_union_data.emplace(sketch_data.get_lg_config_k(), Alloc()); |
There was a problem hiding this comment.
This initializes hll_union_alloc with the input sketch's lg_config_k, but DataSketches allows HLL sketches with lg_config_k down to 4 while hll_union_alloc requires lg_max_k to be at least 7. A valid serialized sketch produced with new HllSketch(4|5|6, HLL_*) will deserialize successfully, then this constructor throws and add()/read() report CORRUPTION for valid data. Please clamp the union lg_k to the union API minimum (or use the library constant if available) and add a unit/regression case for a valid low-lg_k sketch.
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Issue Number:
Summary:
Implemented a built-in aggregate function that integrates the Datasketches HLL sketch. This aggregate function cannot rely on the Java UDF environment. Considering that in the Java UDF environment, Strings are encoded in UTF-8, which corrupts the binary data of sketches, the serialization/deserialization operations for sketches must be implemented on the BE side. (additionally, since Apache Datasketches has been added to the contrib directory via a git submodule, it will become very easy to add other sketches such as theta sketch in the future.)
see: #63142
use case: see regression test & #63142
Release note
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)