Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ rand = { workspace = true }
regex = { workspace = true }
tempfile = { workspace = true }
minijinja = { workspace = true }
serde_arrow = { version = "0.14", features = ["arrow-58"] }

[package.metadata.cargo-machete]
# These dependencies are added to ensure minimal dependency version
Expand Down
235 changes: 227 additions & 8 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMa
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column in schema should be primitive type but got {field_type:?}"
"Leaf column in schema should be primitive type but got {field_type:?}"
),
));
}
Expand All @@ -1111,14 +1111,33 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMa
}

/// Build a fallback field ID map for Parquet files without embedded field IDs.
/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
///
/// Must use top-level field positions (not leaf column positions) to stay consistent
/// with `add_fallback_field_ids_to_arrow_schema`, which assigns ordinal IDs to
/// top-level Arrow fields. Using leaf positions instead would produce wrong indices
/// when nested types (struct/list/map) expand into multiple leaf columns.
///
/// Mirrors iceberg-java's ParquetSchemaUtil.addFallbackIds() which iterates
/// fileSchema.getFields() assigning ordinal IDs to top-level fields.
fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
let mut column_map = HashMap::new();
let mut leaf_idx = 0;

// 1-indexed to match iceberg-java's convention
for (idx, _field) in parquet_schema.columns().iter().enumerate() {
let field_id = (idx + 1) as i32;
column_map.insert(field_id, idx);
for (top_pos, field) in parquet_schema.root_schema().get_fields().iter().enumerate() {
let field_id = (top_pos + 1) as i32;

if field.is_primitive() {
column_map.insert(field_id, leaf_idx);
leaf_idx += 1;
} else {
// Advance past all leaves in this group. Count them by checking
// how many subsequent leaves share this root index.
while leaf_idx < parquet_schema.num_columns()
&& parquet_schema.get_column_root_idx(leaf_idx) == top_pos
{
leaf_idx += 1;
}
}
}

column_map
Expand Down Expand Up @@ -1409,7 +1428,7 @@ impl PredicateConverter<'_> {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates isn't a root column in Parquet schema.",
"Leaf column `{}` in predicates isn't a root column in Parquet schema.",
reference.field().name
),
));
Expand All @@ -1423,7 +1442,7 @@ impl PredicateConverter<'_> {
.ok_or(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates cannot be found in the required column indices.",
"Leaf column `{}` in predicates cannot be found in the required column indices.",
reference.field().name
),
))?;
Expand Down Expand Up @@ -4667,4 +4686,204 @@ message schema {
assert_eq!(result[1], expected_1);
assert_eq!(result[2], expected_2);
}

/// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
/// predicate on a column after nested types in a migrated file (no field IDs).
/// Schema has struct, list, and map columns before the predicate target (`id`),
/// exercising the fallback field ID mapping across all nested type variants.
#[tokio::test]
async fn test_predicate_on_migrated_file_with_nested_types() {
use serde::{Deserialize, Serialize};
use serde_arrow::schema::{SchemaLike, TracingOptions};

#[derive(Serialize, Deserialize)]
struct Person {
name: String,
age: i32,
}

#[derive(Serialize, Deserialize)]
struct Row {
person: Person,
people: Vec<Person>,
props: std::collections::BTreeMap<String, String>,
id: i32,
}

let rows = vec![
Row {
person: Person {
name: "Alice".into(),
age: 30,
},
people: vec![Person {
name: "Alice".into(),
age: 30,
}],
props: [("k1".into(), "v1".into())].into(),
id: 1,
},
Row {
person: Person {
name: "Bob".into(),
age: 25,
},
people: vec![Person {
name: "Bob".into(),
age: 25,
}],
props: [("k2".into(), "v2".into())].into(),
id: 2,
},
Row {
person: Person {
name: "Carol".into(),
age: 40,
},
people: vec![Person {
name: "Carol".into(),
age: 40,
}],
props: [("k3".into(), "v3".into())].into(),
id: 3,
},
];

let tracing_options = TracingOptions::default()
.map_as_struct(false)
.strings_as_large_utf8(false)
.sequence_as_large_list(false);
let fields = Vec::<arrow_schema::FieldRef>::from_type::<Row>(tracing_options).unwrap();
let arrow_schema = Arc::new(ArrowSchema::new(fields.clone()));
let batch = serde_arrow::to_record_batch(&fields, &rows).unwrap();

// Fallback field IDs: person=1, people=2, props=3, id=4
let iceberg_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(
1,
"person",
Type::Struct(crate::spec::StructType::new(vec![
NestedField::required(
5,
"name",
Type::Primitive(PrimitiveType::String),
)
.into(),
NestedField::required(6, "age", Type::Primitive(PrimitiveType::Int))
.into(),
])),
)
.into(),
NestedField::required(
2,
"people",
Type::List(crate::spec::ListType {
element_field: NestedField::required(
7,
"element",
Type::Struct(crate::spec::StructType::new(vec![
NestedField::required(
8,
"name",
Type::Primitive(PrimitiveType::String),
)
.into(),
NestedField::required(
9,
"age",
Type::Primitive(PrimitiveType::Int),
)
.into(),
])),
)
.into(),
}),
)
.into(),
NestedField::required(
3,
"props",
Type::Map(crate::spec::MapType {
key_field: NestedField::required(
10,
"key",
Type::Primitive(PrimitiveType::String),
)
.into(),
value_field: NestedField::required(
11,
"value",
Type::Primitive(PrimitiveType::String),
)
.into(),
}),
)
.into(),
NestedField::required(4, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_path = format!("{table_location}/1.parquet");

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema, Some(props)).unwrap();
writer.write(&batch).expect("Writing batch");
writer.close().unwrap();

let predicate = Reference::new("id").greater_than(Datum::int(1));

let reader = ArrowReaderBuilder::new(FileIO::new_with_fs())
.with_row_group_filtering_enabled(true)
.with_row_selection_enabled(true)
.build();

let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: 0,
length: 0,
record_count: None,
data_file_path: file_path,
data_file_format: DataFileFormat::Parquet,
schema: iceberg_schema.clone(),
project_field_ids: vec![4],
predicate: Some(predicate.bind(iceberg_schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let ids: Vec<i32> = result
.iter()
.flat_map(|b| {
b.column(0)
.as_primitive::<arrow_array::types::Int32Type>()
.values()
.iter()
.copied()
})
.collect();
assert_eq!(ids, vec![2, 3]);
}
}
Loading