From 8b3c383952cb8f4f1bbddda6d339f7aa09f563cb Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Sun, 11 Jan 2026 15:07:48 +0530 Subject: [PATCH 1/2] fix: sqllogictest cannot convert to Substrait --- .../producer/expr/field_reference.rs | 16 +++++ .../src/logical_plan/producer/expr/mod.rs | 8 +-- .../logical_plan/producer/expr/subquery.rs | 72 ++++++++++++++++++- .../producer/substrait_producer.rs | 40 +++++++++-- 4 files changed, 124 insertions(+), 12 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs index b6af7d3bbc8e1..aa34317a6e292 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs @@ -76,6 +76,22 @@ pub(crate) fn try_to_substrait_field_reference( } } +/// Convert an outer reference column to a Substrait field reference. +/// Outer reference columns reference columns from an outer query scope in correlated subqueries. +/// We convert them the same way as regular columns since the subquery plan will be +/// reconstructed with the proper schema context during consumption. +pub fn from_outer_reference_column( + col: &Column, + schema: &DFSchemaRef, +) -> datafusion::common::Result { + // OuterReferenceColumn is converted similarly to a regular column reference. + // The schema provided should be the schema context in which the outer reference + // column appears. During Substrait round-trip, the consumer will reconstruct + // the outer reference based on the subquery context. + let index = schema.index_of_column(col)?; + substrait_field_ref(index) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs index 5057564d370cf..ecda021aa390f 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs @@ -139,16 +139,16 @@ pub fn to_substrait_rex( } Expr::WindowFunction(expr) => producer.handle_window_function(expr, schema), Expr::InList(expr) => producer.handle_in_list(expr, schema), - Expr::Exists(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), + Expr::Exists(expr) => producer.handle_exists(expr, schema), Expr::InSubquery(expr) => producer.handle_in_subquery(expr, schema), - Expr::ScalarSubquery(expr) => { - not_impl_err!("Cannot convert {expr:?} to Substrait") - } + Expr::ScalarSubquery(expr) => producer.handle_scalar_subquery(expr, schema), #[expect(deprecated)] Expr::Wildcard { .. } => not_impl_err!("Cannot convert {expr:?} to Substrait"), Expr::GroupingSet(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), Expr::Placeholder(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), Expr::OuterReferenceColumn(_, _) => { + // OuterReferenceColumn requires tracking outer query schema context for correlated + // subqueries. This is a complex feature that is not yet implemented. not_impl_err!("Cannot convert {expr:?} to Substrait") } Expr::Unnest(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), diff --git a/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs b/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs index f2e6ff551223c..7e9a2d54335ff 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs @@ -17,8 +17,9 @@ use crate::logical_plan::producer::SubstraitProducer; use datafusion::common::DFSchemaRef; -use datafusion::logical_expr::expr::InSubquery; -use substrait::proto::expression::subquery::InPredicate; +use datafusion::logical_expr::Subquery; +use datafusion::logical_expr::expr::{Exists, InSubquery}; +use substrait::proto::expression::subquery::{InPredicate, Scalar, SetPredicate}; use substrait::proto::expression::{RexType, ScalarFunction}; use substrait::proto::function_argument::ArgType; use substrait::proto::{Expression, FunctionArgument}; @@ -70,3 +71,70 @@ pub fn from_in_subquery( Ok(substrait_subquery) } } + +/// Convert DataFusion ScalarSubquery to Substrait Scalar subquery type +pub fn from_scalar_subquery( + producer: &mut impl SubstraitProducer, + subquery: &Subquery, + _schema: &DFSchemaRef, +) -> datafusion::common::Result { + let subquery_plan = producer.handle_plan(subquery.subquery.as_ref())?; + + Ok(Expression { + rex_type: Some(RexType::Subquery(Box::new( + substrait::proto::expression::Subquery { + subquery_type: Some( + substrait::proto::expression::subquery::SubqueryType::Scalar( + Box::new(Scalar { + input: Some(subquery_plan), + }), + ), + ), + }, + ))), + }) +} + +/// Convert DataFusion Exists expression to Substrait SetPredicate subquery type +pub fn from_exists( + producer: &mut impl SubstraitProducer, + exists: &Exists, + _schema: &DFSchemaRef, +) -> datafusion::common::Result { + let subquery_plan = producer.handle_plan(exists.subquery.subquery.as_ref())?; + + let substrait_exists = Expression { + rex_type: Some(RexType::Subquery(Box::new( + substrait::proto::expression::Subquery { + subquery_type: Some( + substrait::proto::expression::subquery::SubqueryType::SetPredicate( + Box::new(SetPredicate { + predicate_op: substrait::proto::expression::subquery::set_predicate::PredicateOp::Exists as i32, + tuples: Some(subquery_plan), + }), + ), + ), + }, + ))), + }; + + // Handle negated EXISTS (NOT EXISTS) + if exists.negated { + let function_anchor = producer.register_function("not".to_string()); + + #[expect(deprecated)] + Ok(Expression { + rex_type: Some(RexType::ScalarFunction(ScalarFunction { + function_reference: function_anchor, + arguments: vec![FunctionArgument { + arg_type: Some(ArgType::Value(substrait_exists)), + }], + output_type: None, + args: vec![], + options: vec![], + })), + }) + } else { + Ok(substrait_exists) + } +} diff --git a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs index ffc920ffe609e..eaf500aa5b3e5 100644 --- a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs +++ b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs @@ -18,16 +18,19 @@ use crate::extensions::Extensions; use crate::logical_plan::producer::{ from_aggregate, from_aggregate_function, from_alias, from_between, from_binary_expr, - from_case, from_cast, from_column, from_distinct, from_empty_relation, from_filter, - from_in_list, from_in_subquery, from_join, from_like, from_limit, from_literal, - from_projection, from_repartition, from_scalar_function, from_sort, - from_subquery_alias, from_table_scan, from_try_cast, from_unary_expr, from_union, - from_values, from_window, from_window_function, to_substrait_rel, to_substrait_rex, + from_case, from_cast, from_column, from_distinct, from_empty_relation, from_exists, + from_filter, from_in_list, from_in_subquery, from_join, from_like, from_limit, + from_literal, from_outer_reference_column, from_projection, from_repartition, + from_scalar_function, from_scalar_subquery, from_sort, from_subquery_alias, + from_table_scan, from_try_cast, from_unary_expr, from_union, from_values, + from_window, from_window_function, to_substrait_rel, to_substrait_rex, }; +use datafusion::arrow::datatypes::FieldRef; use datafusion::common::{Column, DFSchemaRef, ScalarValue, substrait_err}; use datafusion::execution::SessionState; use datafusion::execution::registry::SerializerRegistry; -use datafusion::logical_expr::expr::{Alias, InList, InSubquery, WindowFunction}; +use datafusion::logical_expr::Subquery; +use datafusion::logical_expr::expr::{Alias, Exists, InList, InSubquery, WindowFunction}; use datafusion::logical_expr::{ Aggregate, Between, BinaryExpr, Case, Cast, Distinct, EmptyRelation, Expr, Extension, Filter, Join, Like, Limit, LogicalPlan, Projection, Repartition, Sort, SubqueryAlias, @@ -361,6 +364,31 @@ pub trait SubstraitProducer: Send + Sync + Sized { ) -> datafusion::common::Result { from_in_subquery(self, in_subquery, schema) } + + fn handle_scalar_subquery( + &mut self, + subquery: &Subquery, + schema: &DFSchemaRef, + ) -> datafusion::common::Result { + from_scalar_subquery(self, subquery, schema) + } + + fn handle_exists( + &mut self, + exists: &Exists, + schema: &DFSchemaRef, + ) -> datafusion::common::Result { + from_exists(self, exists, schema) + } + + fn handle_outer_reference_column( + &mut self, + _data_type: &FieldRef, + column: &Column, + schema: &DFSchemaRef, + ) -> datafusion::common::Result { + from_outer_reference_column(column, schema) + } } pub struct DefaultSubstraitProducer<'a> { From bf36574f72c2c145b3f1f58db12b5e8bc29a1647 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 22 Jan 2026 23:10:01 +0530 Subject: [PATCH 2/2] remove dead codes --- .../producer/expr/field_reference.rs | 16 ---------------- .../producer/substrait_producer.rs | 18 ++++-------------- 2 files changed, 4 insertions(+), 30 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs index aa34317a6e292..b6af7d3bbc8e1 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs @@ -76,22 +76,6 @@ pub(crate) fn try_to_substrait_field_reference( } } -/// Convert an outer reference column to a Substrait field reference. -/// Outer reference columns reference columns from an outer query scope in correlated subqueries. -/// We convert them the same way as regular columns since the subquery plan will be -/// reconstructed with the proper schema context during consumption. -pub fn from_outer_reference_column( - col: &Column, - schema: &DFSchemaRef, -) -> datafusion::common::Result { - // OuterReferenceColumn is converted similarly to a regular column reference. - // The schema provided should be the schema context in which the outer reference - // column appears. During Substrait round-trip, the consumer will reconstruct - // the outer reference based on the subquery context. - let index = schema.index_of_column(col)?; - substrait_field_ref(index) -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs index eaf500aa5b3e5..dcb14c2d8a276 100644 --- a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs +++ b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs @@ -20,12 +20,11 @@ use crate::logical_plan::producer::{ from_aggregate, from_aggregate_function, from_alias, from_between, from_binary_expr, from_case, from_cast, from_column, from_distinct, from_empty_relation, from_exists, from_filter, from_in_list, from_in_subquery, from_join, from_like, from_limit, - from_literal, from_outer_reference_column, from_projection, from_repartition, - from_scalar_function, from_scalar_subquery, from_sort, from_subquery_alias, - from_table_scan, from_try_cast, from_unary_expr, from_union, from_values, - from_window, from_window_function, to_substrait_rel, to_substrait_rex, + from_literal, from_projection, from_repartition, from_scalar_function, + from_scalar_subquery, from_sort, from_subquery_alias, from_table_scan, from_try_cast, + from_unary_expr, from_union, from_values, from_window, from_window_function, + to_substrait_rel, to_substrait_rex, }; -use datafusion::arrow::datatypes::FieldRef; use datafusion::common::{Column, DFSchemaRef, ScalarValue, substrait_err}; use datafusion::execution::SessionState; use datafusion::execution::registry::SerializerRegistry; @@ -380,15 +379,6 @@ pub trait SubstraitProducer: Send + Sync + Sized { ) -> datafusion::common::Result { from_exists(self, exists, schema) } - - fn handle_outer_reference_column( - &mut self, - _data_type: &FieldRef, - column: &Column, - schema: &DFSchemaRef, - ) -> datafusion::common::Result { - from_outer_reference_column(column, schema) - } } pub struct DefaultSubstraitProducer<'a> {