Skip to content
299 changes: 289 additions & 10 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMa
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column in schema should be primitive type but got {field_type:?}"
"Leaf column in schema should be primitive type but got {field_type:?}"
),
));
}
Expand All @@ -1111,14 +1111,24 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMa
}

/// Build a fallback field ID map for Parquet files without embedded field IDs.
/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
///
/// Must use top-level field positions (not leaf column positions) to stay consistent
/// with `add_fallback_field_ids_to_arrow_schema`, which assigns ordinal IDs to
/// top-level Arrow fields. Using leaf positions instead would produce wrong indices
/// when nested types (struct/list/map) expand into multiple leaf columns.
///
/// Matches iceberg-java's ParquetSchemaUtil.addFallbackIds().
fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
Comment thread
mbutrovich marked this conversation as resolved.
Comment thread
mbutrovich marked this conversation as resolved.
let mut column_map = HashMap::new();

// 1-indexed to match iceberg-java's convention
for (idx, _field) in parquet_schema.columns().iter().enumerate() {
let field_id = (idx + 1) as i32;
column_map.insert(field_id, idx);
for leaf_idx in 0..parquet_schema.num_columns() {
let root_idx = parquet_schema.get_column_root_idx(leaf_idx);
// Only map primitive root columns; leaves inside groups (struct/list/map)
// don't get fallback IDs since predicates only target root-level primitives.
if !parquet_schema.get_column_root(leaf_idx).is_group() {
let field_id = (root_idx + 1) as i32;
column_map.insert(field_id, leaf_idx);
}
}

column_map
Expand Down Expand Up @@ -1409,7 +1419,7 @@ impl PredicateConverter<'_> {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates isn't a root column in Parquet schema.",
"Leaf column `{}` in predicates isn't a root column in Parquet schema.",
reference.field().name
),
));
Expand All @@ -1423,7 +1433,7 @@ impl PredicateConverter<'_> {
.ok_or(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates cannot be found in the required column indices.",
"Leaf column `{}` in predicates cannot be found in the required column indices.",
reference.field().name
),
))?;
Expand Down Expand Up @@ -1957,9 +1967,14 @@ mod tests {
use std::ops::Range;
use std::sync::Arc;

use arrow_array::builder::StringBuilder;
use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use arrow_array::{
Array, ArrayRef, Int32Array, LargeStringArray, ListArray, RecordBatch, StringArray,
StructArray,
};
use arrow_buffer::OffsetBuffer;
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::{ArrowWriter, ProjectionMask};
Expand Down Expand Up @@ -4667,4 +4682,268 @@ message schema {
assert_eq!(result[1], expected_1);
assert_eq!(result[2], expected_2);
}

/// Helper: write a Parquet file without field IDs containing a nested type column
/// followed by an `id` column, then read it with a predicate on `id`.
/// This exercises the fallback field ID mapping path that Comet uses.
async fn read_migrated_file_with_nested_type_and_predicate(
iceberg_schema: SchemaRef,
arrow_schema: Arc<ArrowSchema>,
batch: RecordBatch,
) {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_path = format!("{table_location}/1.parquet");

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema, Some(props)).unwrap();
writer.write(&batch).expect("Writing batch");
writer.close().unwrap();

// Fallback field IDs: nested_col=1, id=2
let predicate = Reference::new("id").greater_than(Datum::int(1));

let reader = ArrowReaderBuilder::new(FileIO::new_with_fs())
.with_row_group_filtering_enabled(true)
.with_row_selection_enabled(true)
.build();

let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: 0,
length: 0,
record_count: None,
data_file_path: file_path,
data_file_format: DataFileFormat::Parquet,
schema: iceberg_schema.clone(),
project_field_ids: vec![2],
predicate: Some(predicate.bind(iceberg_schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let ids: Vec<i32> = result
.iter()
.flat_map(|b| {
b.column(0)
.as_primitive::<arrow_array::types::Int32Type>()
.values()
.iter()
.copied()
})
.collect();
assert_eq!(ids, vec![2, 3]);
}

/// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
/// predicate on a column after a struct in a migrated file (no field IDs).
#[tokio::test]
async fn test_predicate_on_migrated_file_with_struct() {
Comment thread
mbutrovich marked this conversation as resolved.
Outdated
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(
1,
"person",
Type::Struct(crate::spec::StructType::new(vec![
NestedField::required(
3,
"name",
Type::Primitive(PrimitiveType::String),
)
.into(),
NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
.into(),
])),
)
.into(),
NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new(
"person",
DataType::Struct(Fields::from(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
])),
false,
),
Field::new("id", DataType::Int32, false),
]));

let person_data = Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("name", DataType::Utf8, false)),
Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])) as ArrayRef,
),
(
Arc::new(Field::new("age", DataType::Int32, false)),
Arc::new(Int32Array::from(vec![30, 25, 40])) as ArrayRef,
),
])) as ArrayRef;
let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;

let batch = RecordBatch::try_new(arrow_schema.clone(), vec![person_data, id_data]).unwrap();

read_migrated_file_with_nested_type_and_predicate(schema, arrow_schema, batch).await;
}

/// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
/// predicate on a column after a list in a migrated file (no field IDs).
/// Uses list-of-struct (2+ leaves) because a list of primitives has only 1 leaf,
/// which coincidentally produces the same mapping as a top-level primitive.
#[tokio::test]
async fn test_predicate_on_migrated_file_with_list() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(
1,
"people",
Type::List(crate::spec::ListType {
element_field: NestedField::required(
3,
"element",
Type::Struct(crate::spec::StructType::new(vec![
NestedField::required(
4,
"name",
Type::Primitive(PrimitiveType::String),
)
.into(),
NestedField::required(
5,
"age",
Type::Primitive(PrimitiveType::Int),
)
.into(),
])),
)
.into(),
}),
)
.into(),
NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let name_field = Arc::new(Field::new("name", DataType::Utf8, false));
let age_field = Arc::new(Field::new("age", DataType::Int32, false));
let struct_fields = Fields::from(vec![name_field.clone(), age_field.clone()]);
let element_field = Field::new("element", DataType::Struct(struct_fields), false);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new_list("people", element_field, false),
Field::new("id", DataType::Int32, false),
]));

// 3 rows, each with a single-element list of struct
let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])) as ArrayRef;
let ages = Arc::new(Int32Array::from(vec![30, 25, 40])) as ArrayRef;
let structs = StructArray::from(vec![(name_field, names), (age_field, ages)]);
// One list element per row
let offsets = OffsetBuffer::from_lengths([1, 1, 1]);
let people_data = Arc::new(ListArray::new(
Arc::new(Field::new("element", structs.data_type().clone(), false)),
offsets,
Arc::new(structs),
None,
)) as ArrayRef;
let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;

let batch = RecordBatch::try_new(arrow_schema.clone(), vec![people_data, id_data]).unwrap();

read_migrated_file_with_nested_type_and_predicate(schema, arrow_schema, batch).await;
}

/// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
/// predicate on a column after a map in a migrated file (no field IDs).
#[tokio::test]
async fn test_predicate_on_migrated_file_with_map() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(
1,
"props",
Type::Map(crate::spec::MapType {
key_field: NestedField::required(
3,
"key",
Type::Primitive(PrimitiveType::String),
)
.into(),
value_field: NestedField::required(
4,
"value",
Type::Primitive(PrimitiveType::String),
)
.into(),
}),
)
.into(),
NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new_map(
"props",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
false,
false,
),
Field::new("id", DataType::Int32, false),
]));

let key_field = Arc::new(Field::new("key", DataType::Utf8, false));
let value_field = Arc::new(Field::new("value", DataType::Utf8, false));
let mut map_builder =
arrow_array::builder::MapBuilder::new(None, StringBuilder::new(), StringBuilder::new())
.with_keys_field(key_field)
.with_values_field(value_field);
map_builder.keys().append_value("k1");
map_builder.values().append_value("v1");
map_builder.append(true).unwrap();
map_builder.keys().append_value("k2");
map_builder.values().append_value("v2");
map_builder.append(true).unwrap();
map_builder.keys().append_value("k3");
map_builder.values().append_value("v3");
map_builder.append(true).unwrap();
let props_data = Arc::new(map_builder.finish()) as ArrayRef;
let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;

let batch = RecordBatch::try_new(arrow_schema.clone(), vec![props_data, id_data]).unwrap();

read_migrated_file_with_nested_type_and_predicate(schema, arrow_schema, batch).await;
}
}
Loading