Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ recursive = "0.1.1"
regex = "1.8"
rstest = "0.25.0"
serde_json = "1"
sqlparser = { version = "0.55.0", default-features = false, features = ["std", "visitor"] }
sqlparser = { version = "0.56.0", default-features = false, features = ["std", "visitor"] }
tempfile = "3"
tokio = { version = "1.45", features = ["macros", "rt", "sync"] }
url = "2.5.4"
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
substring_from,
substring_for,
special: _,
shorthand: _,
} => self.sql_substring_to_expr(
expr,
substring_from,
Expand Down
17 changes: 14 additions & 3 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
pub(super) fn parse_in_subquery(
&self,
expr: SQLExpr,
subquery: Query,
subquery: SetExpr,
negated: bool,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
Expand All @@ -58,7 +58,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
if let SetExpr::Select(select) = &subquery {
for item in &select.projection {
if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = item {
if let Some(span) = Span::try_from_sqlparser_span(ident.span) {
Expand All @@ -68,7 +68,18 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let query = Query {
with: None,
body: Box::new(subquery),
order_by: None,
limit_clause: None,
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
};
let sub_plan = self.query_to_plan(query, planner_context)?;
Comment thread
Dimchikkk marked this conversation as resolved.
Outdated
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);

Expand Down
21 changes: 15 additions & 6 deletions datafusion/sql/src/expr/substring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{not_impl_err, plan_err};
use datafusion_common::{internal_datafusion_err, plan_err};
use datafusion_common::{DFSchema, Result, ScalarValue};
use datafusion_expr::planner::PlannerResult;
use datafusion_expr::Expr;
use datafusion_expr::{expr::ScalarFunction, planner::PlannerResult, Expr};

use sqlparser::ast::Expr as SQLExpr;

impl<S: ContextProvider> SqlToRel<'_, S> {
Expand Down Expand Up @@ -62,6 +62,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
substring_from: None,
substring_for: None,
special: false,
shorthand: false,
};

return plan_err!("Substring without for/from is not valid {orig_sql:?}");
Expand All @@ -77,8 +78,16 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}

not_impl_err!(
"Substring not supported by UserDefinedExtensionPlanners: {substring_args:?}"
)
let fun = self
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically hard codes the use of a function named "substr" into the planner

I think the preferred way is to use the ExprPlanner API, similarly to how plan field access is done here:

fn plan_field_access(

Copy link
Copy Markdown
Contributor

@Jefffrey Jefffrey Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with that API, though it seems to be used above already?

for planner in self.context_provider.get_expr_planners() {
match planner.plan_substring(substring_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
substring_args = args;
}
}
}

So this code here (that hardcodes the "substr" string) is more of a fallback I guess? Might need some help understanding this part

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time debugging and I think the solution is that we needed to register the appropriate planner with the tests. I pushed a commit here: e46e532

Note I also found the name of the extension planner very confusing, and I will make a separate PR to fix it

Copy link
Copy Markdown
Contributor

@Jefffrey Jefffrey Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, yeah. Was confusing to have that fallback behaviour 😅

.context_provider
.get_function_meta("substr")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'substr' function")
})?;

Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
fun,
substring_args,
)))
Comment thread
Jefffrey marked this conversation as resolved.
Outdated
}
}
6 changes: 6 additions & 0 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
| SQLDataType::AnyType
| SQLDataType::Table(_)
| SQLDataType::VarBit(_)
| SQLDataType::UTinyInt
| SQLDataType::USmallInt
Comment thread
Jefffrey marked this conversation as resolved.
| SQLDataType::HugeInt
| SQLDataType::UHugeInt
| SQLDataType::UBigInt
| SQLDataType::TimestampNtz
| SQLDataType::GeometricType(_) => {
not_impl_err!("Unsupported SQL type {sql_type:?}")
}
Expand Down
61 changes: 45 additions & 16 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Ident, Offset as SQLOffset, OrderBy, OrderByExpr, OrderByKind,
Query, SelectInto, SetExpr,
Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, Query,
SelectInto, SetExpr,
};
use sqlparser::tokenizer::Span;

Expand All @@ -54,8 +54,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let select_into = select.into.take();
let plan =
self.select_to_plan(*select, query.order_by, planner_context)?;
let plan =
self.limit(plan, query.offset, query.limit, planner_context)?;
let plan = self.limit(plan, query.limit_clause, planner_context)?;
// Process the `SELECT INTO` after `LIMIT`.
self.select_into(plan, select_into)
}
Expand All @@ -77,7 +76,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
None,
)?;
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, query.offset, query.limit, planner_context)
self.limit(plan, query.limit_clause, planner_context)
}
}
}
Expand All @@ -86,23 +85,53 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
fn limit(
&self,
input: LogicalPlan,
skip: Option<SQLOffset>,
fetch: Option<SQLExpr>,
limit_clause: Option<LimitClause>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
if skip.is_none() && fetch.is_none() {
let Some(limit_clause) = limit_clause else {
return Ok(input);
}
};

// skip and fetch expressions are not allowed to reference columns from the input plan
let empty_schema = DFSchema::empty();

let skip = skip
.map(|o| self.sql_to_expr(o.value, &empty_schema, planner_context))
.transpose()?;
let fetch = fetch
.map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
.transpose()?;
let (skip, fetch, limit_by_exprs) = match limit_clause {
LimitClause::LimitOffset {
limit,
offset,
limit_by,
} => {
let skip = offset
.map(|o| self.sql_to_expr(o.value, &empty_schema, planner_context))
.transpose()?;

let fetch = limit
.map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
.transpose()?;

let limit_by_exprs = limit_by
.into_iter()
.map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
.collect::<Result<Vec<_>>>()?;

(skip, fetch, limit_by_exprs)
}
LimitClause::OffsetCommaLimit { offset, limit } => {
let skip =
Some(self.sql_to_expr(offset, &empty_schema, planner_context)?);
let fetch =
Some(self.sql_to_expr(limit, &empty_schema, planner_context)?);
(skip, fetch, vec![])
}
};

if !limit_by_exprs.is_empty() {
return not_impl_err!("LIMIT BY clause is not supported yet");
}

if skip.is_none() && fetch.is_none() {
return Ok(input);
}

LogicalPlanBuilder::from(input)
.limit_by_expr(skip, fetch)?
.build()
Expand Down
Loading
Loading