Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions src/graph_catalog/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ impl Identifier {
/// Generate a pipe-joined SQL expression for use as a string ID.
/// If alias is non-empty: `toString(alias.col)` / `concat(toString(alias.c1), '|', toString(alias.c2))`
/// If alias is empty: `toString(col)` / `concat(toString(c1), '|', toString(c2))`
/// For single non-composite IDs without alias, just returns the column name (no toString).
/// For single non-composite IDs without alias, just returns the column name (no cast).
pub fn to_pipe_joined_sql(&self, alias: &str) -> String {
let to_str = crate::sql_generator::function_mapper::current_function_mapper().cast_string();
let qualify = |col: &str| -> String {
let quoted = crate::clickhouse_query_generator::quote_identifier(col);
if alias.is_empty() {
Expand All @@ -177,20 +178,20 @@ impl Identifier {
if alias.is_empty() {
col.clone()
} else {
format!("toString({})", qualify(col))
format!("{}({})", to_str, qualify(col))
}
}
Identifier::Composite(cols) => {
let parts: Vec<String> = cols
.iter()
.map(|c| format!("toString({})", qualify(c)))
.map(|c| format!("{}({})", to_str, qualify(c)))
.collect();
format!("concat({})", parts.join(", '|', "))
}
}
}

/// Get the ID column as SQL without toString() wrapper
/// Get the ID column as SQL without string-cast wrapper.
/// Used when we need native type comparison (e.g., for WHERE clauses)
pub fn to_sql_native(&self, alias: &str) -> String {
match self {
Expand All @@ -203,14 +204,16 @@ impl Identifier {
}
}
Identifier::Composite(cols) => {
// For composite IDs, we still need toString for concatenation
// For composite IDs, we still need string cast for concatenation
let to_str =
crate::sql_generator::function_mapper::current_function_mapper().cast_string();
let qualify = |col: &str| {
let quoted = crate::clickhouse_query_generator::quote_identifier(col);
format!("{}.{}", alias, quoted)
};
let parts: Vec<String> = cols
.iter()
.map(|c| format!("toString({})", qualify(c)))
.map(|c| format!("{}({})", to_str, qualify(c)))
.collect();
format!("concat({})", parts.join(", '|', "))
}
Expand All @@ -236,12 +239,14 @@ impl Identifier {
crate::graph_catalog::expression_parser::PropertyValue::Column(col.clone())
}
Identifier::Composite(cols) => {
let to_str =
crate::sql_generator::function_mapper::current_function_mapper().cast_string();
let parts: Vec<String> = cols
.iter()
.map(|c| {
let quoted =
crate::clickhouse_query_generator::quote_identifier(c.as_str());
format!("toString({})", quoted)
format!("{}({})", to_str, quoted)
})
.collect();
crate::graph_catalog::expression_parser::PropertyValue::Expression(format!(
Expand Down
13 changes: 9 additions & 4 deletions src/render_plan/cte_extraction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,17 +1107,22 @@ pub fn render_expr_to_sql_string(expr: &RenderExpr, alias_mapping: &[(String, St
Operator::ModuloDivision => format!("{} % {}", operands[0], operands[1]),
Operator::Exponentiation => format!("POWER({}, {})", operands[0], operands[1]),
Operator::In => {
// Check if right operand is a property access (array column)
// Cypher: x IN array_property → ClickHouse: has(array, x)
// Cypher: x IN array_property → CH: has(arr, x), Spark: array_contains(arr, x)
if matches!(&op.operands[1], RenderExpr::PropertyAccessExp(_)) {
format!("has({}, {})", operands[1], operands[0])
let contains =
crate::sql_generator::function_mapper::current_function_mapper()
.array_contains();
format!("{}({}, {})", contains, operands[1], operands[0])
} else {
format!("{} IN {}", operands[0], operands[1])
}
}
Operator::NotIn => {
if matches!(&op.operands[1], RenderExpr::PropertyAccessExp(_)) {
format!("NOT has({}, {})", operands[1], operands[0])
let contains =
crate::sql_generator::function_mapper::current_function_mapper()
.array_contains();
format!("NOT {}({}, {})", contains, operands[1], operands[0])
} else {
format!("{} NOT IN {}", operands[0], operands[1])
}
Expand Down
134 changes: 94 additions & 40 deletions src/sql_generator/emitters/clickhouse/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,47 @@
/// Maps Neo4j function names to ClickHouse equivalents with optional argument transformations.
use std::collections::HashMap;

/// Wrap a temporal extraction argument with fromUnixTimestamp64Milli().
/// Wrap a temporal extraction argument so a downstream `year()`/`month()`/etc.
/// sees a real DateTime / TIMESTAMP rather than a raw epoch-millis BIGINT.
///
/// Schemas that store timestamps as Int64 epoch milliseconds (e.g., LDBC SNB)
/// need conversion to DateTime64 before temporal extraction functions (toYear, etc.)
/// can be applied. This function wraps the first argument unless it already
/// contains a datetime conversion function (to avoid double-wrapping).
/// need conversion before extraction. Dialect-aware:
/// - ClickHouse: `fromUnixTimestamp64Milli(arg)` -> DateTime64
/// - Databricks: `timestamp_millis(arg)` -> TIMESTAMP
///
/// Skips wrapping when the argument is already a datetime expression to
/// avoid double-conversion.
fn wrap_epoch_millis_arg(args: &[String]) -> Vec<String> {
use crate::server::query_context::get_current_dialect;
use crate::sql_generator::SqlDialect;
if args.is_empty() {
return args.to_vec();
}
let arg = &args[0];
// Skip wrapping if argument is already a datetime expression
let already_datetime = arg.contains("parseDateTime64BestEffort")
|| arg.contains("fromUnixTimestamp64Milli")
|| arg.contains("now64")
|| arg.contains("now()")
|| arg.contains("toDateTime");
let dialect = get_current_dialect();
let already_datetime = match dialect {
SqlDialect::Databricks => {
arg.contains("timestamp_millis")
|| arg.contains("to_timestamp")
|| arg.contains("from_unixtime")
|| arg.contains("current_timestamp")
}
_ => {
arg.contains("parseDateTime64BestEffort")
|| arg.contains("fromUnixTimestamp64Milli")
|| arg.contains("now64")
|| arg.contains("now()")
|| arg.contains("toDateTime")
}
};
if already_datetime {
args.to_vec()
} else {
vec![format!("fromUnixTimestamp64Milli({})", arg)]
let wrapped = match dialect {
SqlDialect::Databricks => format!("timestamp_millis({})", arg),
_ => format!("fromUnixTimestamp64Milli({})", arg),
};
vec![wrapped]
}
}

Expand Down Expand Up @@ -94,21 +114,27 @@ lazy_static::lazy_static! {
}),
});

// toUnixTimestampMillis() -> toUnixTimestamp64Milli(parseDateTime64BestEffort(arg))
// Converts datetime string to Unix timestamp in milliseconds (Int64)
// For schemas that store dates as Int64 milliseconds (e.g., LDBC)
// toUnixTimestampMillis() — datetime string -> epoch millis BIGINT.
// CH: toUnixTimestamp64Milli(parseDateTime64BestEffort(arg, 3))
// Spark: unix_millis(to_timestamp(arg))
m.insert("tounixtimestampmillis", FunctionMapping {
neo4j_name: "toUnixTimestampMillis",
clickhouse_name: "toUnixTimestamp64Milli",
databricks_name: None,
databricks_name: Some("unix_millis"),
arg_transform: Some(|args| {
use crate::server::query_context::get_current_dialect;
use crate::sql_generator::SqlDialect;
let databricks = matches!(get_current_dialect(), SqlDialect::Databricks);
if args.is_empty() {
// No args: return current time in milliseconds
vec!["now64(3)".to_string()]
} else {
// Parse string and convert to milliseconds since epoch
vec![format!("parseDateTime64BestEffort({}, 3)", args[0])]
let now = if databricks { "current_timestamp()" } else { "now64(3)" };
return vec![now.to_string()];
}
let wrapped = if databricks {
format!("to_timestamp({})", args[0])
} else {
format!("parseDateTime64BestEffort({}, 3)", args[0])
};
vec![wrapped]
}),
});

Expand Down Expand Up @@ -793,84 +819,94 @@ lazy_static::lazy_static! {
// date().year, datetime().month, etc. are property accesses
// But Neo4j also has explicit functions:

// year(datetime) -> toYear(fromUnixTimestamp64Milli(datetime))
// Wraps argument with fromUnixTimestamp64Milli for epoch-millis Int64 columns
// Temporal extraction. Spark's year/month/.../quarter accept a
// TIMESTAMP directly; `wrap_epoch_millis_arg` chooses the dialect's
// BIGINT→TIMESTAMP wrapper (CH: fromUnixTimestamp64Milli,
// Spark: timestamp_millis). Per-component name mapping below; same
// name in both dialects when names match.

// year(datetime) -> CH: toYear, Spark: year
m.insert("year", FunctionMapping {
neo4j_name: "year",
clickhouse_name: "toYear",
databricks_name: None,
databricks_name: Some("year"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// month(datetime) -> toMonth(fromUnixTimestamp64Milli(datetime))
// month(datetime) -> CH: toMonth, Spark: month
m.insert("month", FunctionMapping {
neo4j_name: "month",
clickhouse_name: "toMonth",
databricks_name: None,
databricks_name: Some("month"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// day(datetime) -> toDayOfMonth(fromUnixTimestamp64Milli(datetime))
// day(datetime) -> CH: toDayOfMonth, Spark: dayofmonth
m.insert("day", FunctionMapping {
neo4j_name: "day",
clickhouse_name: "toDayOfMonth",
databricks_name: None,
databricks_name: Some("dayofmonth"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// hour(datetime) -> toHour(fromUnixTimestamp64Milli(datetime))
// hour(datetime) -> CH: toHour, Spark: hour
m.insert("hour", FunctionMapping {
neo4j_name: "hour",
clickhouse_name: "toHour",
databricks_name: None,
databricks_name: Some("hour"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// minute(datetime) -> toMinute(fromUnixTimestamp64Milli(datetime))
// minute(datetime) -> CH: toMinute, Spark: minute
m.insert("minute", FunctionMapping {
neo4j_name: "minute",
clickhouse_name: "toMinute",
databricks_name: None,
databricks_name: Some("minute"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// second(datetime) -> toSecond(fromUnixTimestamp64Milli(datetime))
// second(datetime) -> CH: toSecond, Spark: second
m.insert("second", FunctionMapping {
neo4j_name: "second",
clickhouse_name: "toSecond",
databricks_name: None,
databricks_name: Some("second"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// dayOfWeek(datetime) -> toDayOfWeek(fromUnixTimestamp64Milli(datetime))
// dayOfWeek(datetime) -> CH: toDayOfWeek (1=Monday..7=Sunday, ISO)
// Spark: dayofweek (1=Sunday..7=Saturday) — different!
// Direct name swap would silently shift the result by one day; needs a
// structural rewrite like `weekday(x) + 1` to preserve ISO semantics.
// Until that lands, fall through to `toDayOfWeek` on Spark so the gap
// surfaces as UNRESOLVED_ROUTINE rather than silently-wrong data.
m.insert("dayofweek", FunctionMapping {
neo4j_name: "dayOfWeek",
clickhouse_name: "toDayOfWeek",
databricks_name: None,
arg_transform: Some(wrap_epoch_millis_arg),
});

// dayOfYear(datetime) -> toDayOfYear(fromUnixTimestamp64Milli(datetime))
// dayOfYear(datetime) -> CH: toDayOfYear, Spark: dayofyear
m.insert("dayofyear", FunctionMapping {
neo4j_name: "dayOfYear",
clickhouse_name: "toDayOfYear",
databricks_name: None,
databricks_name: Some("dayofyear"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// quarter(datetime) -> toQuarter(fromUnixTimestamp64Milli(datetime))
// quarter(datetime) -> CH: toQuarter, Spark: quarter
m.insert("quarter", FunctionMapping {
neo4j_name: "quarter",
clickhouse_name: "toQuarter",
databricks_name: None,
databricks_name: Some("quarter"),
arg_transform: Some(wrap_epoch_millis_arg),
});

// week(datetime) -> toISOWeek(fromUnixTimestamp64Milli(datetime))
// week(datetime) -> CH: toISOWeek, Spark: weekofyear
m.insert("week", FunctionMapping {
neo4j_name: "week",
clickhouse_name: "toISOWeek",
databricks_name: None,
databricks_name: Some("weekofyear"),
arg_transform: Some(wrap_epoch_millis_arg),
});

Expand Down Expand Up @@ -964,6 +1000,24 @@ lazy_static::lazy_static! {
arg_transform: None,
});

// anyLast() — internal IR-level aggregate used by property_expansion
// to pick a non-deterministic value of a non-grouped column. CH ships
// `anyLast`; Spark's `any_value()` (3.4+) has matching semantics.
m.insert("anylast", FunctionMapping {
neo4j_name: "anyLast",
clickhouse_name: "anyLast",
databricks_name: Some("any_value"),
arg_transform: None,
});

// countIf(predicate) — conditional count. CH: countIf, Spark: count_if (DBR 13.1+).
m.insert("countif", FunctionMapping {
neo4j_name: "countIf",
clickhouse_name: "countIf",
databricks_name: Some("count_if"),
arg_transform: None,
});

// ===== SPATIAL FUNCTIONS (basic) =====
// Note: Full spatial support would require more extensive work

Expand Down
8 changes: 5 additions & 3 deletions src/sql_generator/emitters/clickhouse/json_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ pub fn generate_multi_type_union_sql(
continue;
}

let to_str = crate::sql_generator::function_mapper::current_function_mapper().cast_string();

// Get node ID column
let node_id_col = match &node_schema.node_id.id {
crate::graph_catalog::config::Identifier::Single(column) => column.clone(),
Expand All @@ -286,7 +288,7 @@ pub fn generate_multi_type_union_sql(
"concat({})",
columns
.iter()
.map(|c| format!("toString({})", c))
.map(|c| format!("{}({})", to_str, c))
.collect::<Vec<_>>()
.join(", '|', ")
)
Expand All @@ -298,8 +300,8 @@ pub fn generate_multi_type_union_sql(
let json_props = generate_json_properties_from_schema(node_schema, &node_schema.table_name);

branches.push(format!(
"SELECT '{}' as _label, toString({}) as _id, {} as _properties FROM {}",
base_label, node_id_col, json_props, table_ref
"SELECT '{}' as _label, {}({}) as _id, {} as _properties FROM {}",
base_label, to_str, node_id_col, json_props, table_ref
));
}

Expand Down
Loading
Loading