Skip to content

Commit 8632bdd

Browse files
committed
Update for API changes
1 parent 65f92e4 commit 8632bdd

14 files changed

Lines changed: 233 additions & 132 deletions

File tree

datafusion/common/src/column.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ impl Column {
7171
}
7272
}
7373

74-
fn from_idents(idents: &mut Vec<String>) -> Option<Self> {
74+
/// Create a Column from multiple normalized identifiers
75+
///
76+
/// For example, `foo.bar` would be represented as a two element vector
77+
/// `["foo", "bar"]`
78+
pub fn from_idents(mut idents: Vec<String>) -> Option<Self> {
7579
let (relation, name) = match idents.len() {
7680
1 => (None, idents.remove(0)),
7781
2 => (
@@ -109,7 +113,7 @@ impl Column {
109113
/// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
110114
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
111115
let flat_name = flat_name.into();
112-
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, false)).unwrap_or(
116+
Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or(
113117
Self {
114118
relation: None,
115119
name: flat_name,
@@ -120,7 +124,7 @@ impl Column {
120124
/// Deserialize a fully qualified name string into a column preserving column text case
121125
pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
122126
let flat_name = flat_name.into();
123-
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, true)).unwrap_or(
127+
Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or(
124128
Self {
125129
relation: None,
126130
name: flat_name,

datafusion/expr/src/logical_plan/statement.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ pub enum TransactionIsolationLevel {
153153
ReadCommitted,
154154
RepeatableRead,
155155
Serializable,
156+
Snapshot,
156157
}
157158

158159
/// Indicator that the following statements should be committed or rolled back atomically

datafusion/sql/src/expr/identifier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
8383
}
8484
}
8585

86-
pub(super) fn sql_compound_identifier_to_expr(
86+
pub(crate) fn sql_compound_identifier_to_expr(
8787
&self,
8888
ids: Vec<Ident>,
8989
schema: &DFSchema,

datafusion/sql/src/expr/mod.rs

Lines changed: 138 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ use datafusion_expr::planner::{
2121
PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr,
2222
};
2323
use sqlparser::ast::{
24-
BinaryOperator, CastFormat, CastKind, DataType as SQLDataType, DictionaryField,
25-
Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry, StructField, Subscript,
26-
TrimWhereField, Value,
24+
AccessExpr, BinaryOperator, CastFormat, CastKind, DataType as SQLDataType,
25+
DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry,
26+
StructField, Subscript, TrimWhereField, Value,
2727
};
2828

2929
use datafusion_common::{
30-
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
31-
ScalarValue,
30+
internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema,
31+
Result, ScalarValue,
3232
};
3333
use datafusion_expr::expr::ScalarFunction;
3434
use datafusion_expr::expr::{InList, WildcardOptions};
@@ -236,14 +236,14 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
236236
self.sql_identifier_to_expr(id, schema, planner_context)
237237
}
238238

239-
SQLExpr::MapAccess { .. } => {
240-
not_impl_err!("Map Access")
241-
}
242-
243239
// <expr>["foo"], <expr>[4] or <expr>[4:5]
244-
SQLExpr::Subscript { expr, subscript } => {
245-
self.sql_subscript_to_expr(*expr, subscript, schema, planner_context)
246-
}
240+
SQLExpr::CompoundFieldAccess { root, access_chain } => self
241+
.sql_compound_field_access_to_expr(
242+
*root,
243+
access_chain,
244+
schema,
245+
planner_context,
246+
),
247247

248248
SQLExpr::CompoundIdentifier(ids) => {
249249
self.sql_compound_identifier_to_expr(ids, schema, planner_context)
@@ -984,84 +984,141 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
984984
Ok(Expr::Cast(Cast::new(Box::new(expr), dt)))
985985
}
986986

987-
fn sql_subscript_to_expr(
987+
fn sql_compound_field_access_to_expr(
988988
&self,
989-
expr: SQLExpr,
990-
subscript: Box<Subscript>,
989+
root: SQLExpr,
990+
access_chain: Vec<AccessExpr>,
991991
schema: &DFSchema,
992992
planner_context: &mut PlannerContext,
993993
) -> Result<Expr> {
994-
let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
995-
996-
let field_access = match *subscript {
997-
Subscript::Index { index } => {
998-
// index can be a name, in which case it is a named field access
999-
match index {
1000-
SQLExpr::Value(
1001-
Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
1002-
) => GetFieldAccess::NamedStructField {
1003-
name: ScalarValue::from(s),
1004-
},
1005-
SQLExpr::JsonAccess { .. } => {
1006-
return not_impl_err!("JsonAccess");
994+
let mut root = self.sql_expr_to_logical_expr(root, schema, planner_context)?;
995+
let fields = access_chain
996+
.into_iter()
997+
.map(|field| match field {
998+
AccessExpr::Subscript(subscript) => {
999+
match subscript {
1000+
Subscript::Index { index } => {
1001+
// index can be a name, in which case it is a named field access
1002+
match index {
1003+
SQLExpr::Value(
1004+
Value::SingleQuotedString(s)
1005+
| Value::DoubleQuotedString(s),
1006+
) => Ok(Some(GetFieldAccess::NamedStructField {
1007+
name: ScalarValue::from(s),
1008+
})),
1009+
SQLExpr::JsonAccess { .. } => {
1010+
not_impl_err!("JsonAccess")
1011+
}
1012+
// otherwise treat like a list index
1013+
_ => Ok(Some(GetFieldAccess::ListIndex {
1014+
key: Box::new(self.sql_expr_to_logical_expr(
1015+
index,
1016+
schema,
1017+
planner_context,
1018+
)?),
1019+
})),
1020+
}
1021+
}
1022+
Subscript::Slice {
1023+
lower_bound,
1024+
upper_bound,
1025+
stride,
1026+
} => {
1027+
// Means access like [:2]
1028+
let lower_bound = if let Some(lower_bound) = lower_bound {
1029+
self.sql_expr_to_logical_expr(
1030+
lower_bound,
1031+
schema,
1032+
planner_context,
1033+
)
1034+
} else {
1035+
not_impl_err!("Slice subscript requires a lower bound")
1036+
}?;
1037+
1038+
// means access like [2:]
1039+
let upper_bound = if let Some(upper_bound) = upper_bound {
1040+
self.sql_expr_to_logical_expr(
1041+
upper_bound,
1042+
schema,
1043+
planner_context,
1044+
)
1045+
} else {
1046+
not_impl_err!("Slice subscript requires an upper bound")
1047+
}?;
1048+
1049+
// stride, default to 1
1050+
let stride = if let Some(stride) = stride {
1051+
self.sql_expr_to_logical_expr(
1052+
stride,
1053+
schema,
1054+
planner_context,
1055+
)?
1056+
} else {
1057+
lit(1i64)
1058+
};
1059+
1060+
Ok(Some(GetFieldAccess::ListRange {
1061+
start: Box::new(lower_bound),
1062+
stop: Box::new(upper_bound),
1063+
stride: Box::new(stride),
1064+
}))
1065+
}
10071066
}
1008-
// otherwise treat like a list index
1009-
_ => GetFieldAccess::ListIndex {
1010-
key: Box::new(self.sql_expr_to_logical_expr(
1011-
index,
1012-
schema,
1013-
planner_context,
1014-
)?),
1015-
},
10161067
}
1017-
}
1018-
Subscript::Slice {
1019-
lower_bound,
1020-
upper_bound,
1021-
stride,
1022-
} => {
1023-
// Means access like [:2]
1024-
let lower_bound = if let Some(lower_bound) = lower_bound {
1025-
self.sql_expr_to_logical_expr(lower_bound, schema, planner_context)
1026-
} else {
1027-
not_impl_err!("Slice subscript requires a lower bound")
1028-
}?;
1029-
1030-
// means access like [2:]
1031-
let upper_bound = if let Some(upper_bound) = upper_bound {
1032-
self.sql_expr_to_logical_expr(upper_bound, schema, planner_context)
1033-
} else {
1034-
not_impl_err!("Slice subscript requires an upper bound")
1035-
}?;
1036-
1037-
// stride, default to 1
1038-
let stride = if let Some(stride) = stride {
1039-
self.sql_expr_to_logical_expr(stride, schema, planner_context)?
1040-
} else {
1041-
lit(1i64)
1042-
};
1043-
1044-
GetFieldAccess::ListRange {
1045-
start: Box::new(lower_bound),
1046-
stop: Box::new(upper_bound),
1047-
stride: Box::new(stride),
1068+
AccessExpr::Dot(expr) => {
1069+
let expr =
1070+
self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
1071+
match expr {
1072+
Expr::Column(Column { name, relation }) => {
1073+
if let Some(relation) = &relation {
1074+
// If the first part of the dot access is a column reference, we should
1075+
// check if the column is from the same table as the root expression.
1076+
// If it is, we should replace the root expression with the column reference.
1077+
// Otherwise, we should treat the dot access as a named field access.
1078+
if relation.table() == root.schema_name().to_string() {
1079+
root = Expr::Column(Column {
1080+
name,
1081+
relation: Some(relation.clone()),
1082+
});
1083+
Ok(None)
1084+
} else {
1085+
plan_err!(
1086+
"table name mismatch: {} != {}",
1087+
relation.table(),
1088+
root.schema_name()
1089+
)
1090+
}
1091+
} else {
1092+
Ok(Some(GetFieldAccess::NamedStructField {
1093+
name: ScalarValue::from(name),
1094+
}))
1095+
}
1096+
}
1097+
_ => not_impl_err!(
1098+
"Dot access not supported for non-column expr: {expr:?}"
1099+
),
1100+
}
10481101
}
1049-
}
1050-
};
1102+
})
1103+
.collect::<Result<Vec<_>>>()?;
10511104

1052-
let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
1053-
for planner in self.context_provider.get_expr_planners() {
1054-
match planner.plan_field_access(field_access_expr, schema)? {
1055-
PlannerResult::Planned(expr) => return Ok(expr),
1056-
PlannerResult::Original(expr) => {
1057-
field_access_expr = expr;
1105+
fields
1106+
.into_iter()
1107+
.flatten()
1108+
.try_fold(root, |expr, field_access| {
1109+
let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
1110+
for planner in self.context_provider.get_expr_planners() {
1111+
match planner.plan_field_access(field_access_expr, schema)? {
1112+
PlannerResult::Planned(expr) => return Ok(expr),
1113+
PlannerResult::Original(expr) => {
1114+
field_access_expr = expr;
1115+
}
1116+
}
10581117
}
1059-
}
1060-
}
1061-
1062-
not_impl_err!(
1063-
"GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
1064-
)
1118+
not_impl_err!(
1119+
"GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
1120+
)
1121+
})
10651122
}
10661123
}
10671124

datafusion/sql/src/parser.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ impl<'a> DFParser<'a> {
563563

564564
loop {
565565
if let Token::Word(_) = self.parser.peek_token().token {
566-
let identifier = self.parser.parse_identifier(false)?;
566+
let identifier = self.parser.parse_identifier()?;
567567
partitions.push(identifier.to_string());
568568
} else {
569569
return self.expected("partition name", self.parser.peek_token());
@@ -666,7 +666,7 @@ impl<'a> DFParser<'a> {
666666
}
667667

668668
fn parse_column_def(&mut self) -> Result<ColumnDef, ParserError> {
669-
let name = self.parser.parse_identifier(false)?;
669+
let name = self.parser.parse_identifier()?;
670670
let data_type = self.parser.parse_data_type()?;
671671
let collation = if self.parser.parse_keyword(Keyword::COLLATE) {
672672
Some(self.parser.parse_object_name(false)?)
@@ -676,7 +676,7 @@ impl<'a> DFParser<'a> {
676676
let mut options = vec![];
677677
loop {
678678
if self.parser.parse_keyword(Keyword::CONSTRAINT) {
679-
let name = Some(self.parser.parse_identifier(false)?);
679+
let name = Some(self.parser.parse_identifier()?);
680680
if let Some(option) = self.parser.parse_optional_column_option()? {
681681
options.push(ColumnOptionDef { name, option });
682682
} else {

datafusion/sql/src/planner.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
430430
SQLDataType::UnsignedBigInt(_) | SQLDataType::UnsignedInt8(_) => Ok(DataType::UInt64),
431431
SQLDataType::Float(_) => Ok(DataType::Float32),
432432
SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
433-
SQLDataType::Double | SQLDataType::DoublePrecision | SQLDataType::Float8 => Ok(DataType::Float64),
433+
SQLDataType::Double(ExactNumberInfo::None) | SQLDataType::DoublePrecision | SQLDataType::Float8 => Ok(DataType::Float64),
434+
SQLDataType::Double(ExactNumberInfo::Precision(_)|ExactNumberInfo::PrecisionAndScale(_, _)) => {
435+
not_impl_err!("Unsupported SQL type (precision/scale not supported) {sql_type}")
436+
}
434437
SQLDataType::Char(_)
435438
| SQLDataType::Text
436439
| SQLDataType::String(_) => Ok(DataType::Utf8),
@@ -566,7 +569,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
566569
| SQLDataType::MediumText
567570
| SQLDataType::LongText
568571
| SQLDataType::Bit(_)
569-
|SQLDataType::BitVarying(_)
572+
| SQLDataType::BitVarying(_)
573+
// BIG Query UDFs
574+
| SQLDataType::AnyType
570575
=> not_impl_err!(
571576
"Unsupported SQL type {sql_type:?}"
572577
),

datafusion/sql/src/relation/join.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
19-
use datafusion_common::{not_impl_err, Column, Result};
19+
use datafusion_common::{not_impl_datafusion_err, not_impl_err, Column, Result};
2020
use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder};
2121
use sqlparser::ast::{Join, JoinConstraint, JoinOperator, TableFactor, TableWithJoins};
2222
use std::collections::HashSet;
@@ -123,11 +123,21 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
123123
.join_on(right, join_type, Some(expr))?
124124
.build()
125125
}
126-
JoinConstraint::Using(idents) => {
127-
let keys: Vec<Column> = idents
126+
JoinConstraint::Using(object_names) => {
127+
let keys = object_names
128128
.into_iter()
129-
.map(|x| Column::from_name(self.ident_normalizer.normalize(x)))
130-
.collect();
129+
.map(|object_name| {
130+
let idents = object_name
131+
.0
132+
.into_iter()
133+
.map(|id| self.ident_normalizer.normalize(id))
134+
.collect::<Vec<_>>();
135+
Column::from_idents(idents).ok_or_else(|| {
136+
not_impl_datafusion_err!("Invalid identifier in USING clause")
137+
})
138+
})
139+
.collect::<Result<Vec<_>>>()?;
140+
131141
LogicalPlanBuilder::from(left)
132142
.join_using(right, join_type, keys)?
133143
.build()

datafusion/sql/src/set_expr.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
8888
(SetOperator::Except, false) => {
8989
LogicalPlanBuilder::except(left_plan, right_plan, false)
9090
}
91+
(SetOperator::Minus, _) => {
92+
not_impl_err!("MINUS Set Operator not implemented")
93+
}
9194
}
9295
}
9396
}

0 commit comments

Comments
 (0)