@@ -723,6 +723,8 @@ fn build_filter_schema(
723723 regular_indices : & [ usize ] ,
724724 struct_field_accesses : & [ StructFieldAccess ] ,
725725) -> SchemaRef {
726+ let regular_set: BTreeSet < usize > = regular_indices. iter ( ) . copied ( ) . collect ( ) ;
727+
726728 let all_indices = regular_indices
727729 . iter ( )
728730 . copied ( )
@@ -738,6 +740,15 @@ fn build_filter_schema(
738740 . map ( |& idx| {
739741 let field = file_schema. field ( idx) ;
740742
743+ // if this column appears as a regular (whole-column) reference,
744+ // keep the full type
745+ //
746+ // Pruning is only valid when the column is accessed exclusively
747+ // through struct field accesses
748+ if regular_set. contains ( & idx) {
749+ return Arc :: new ( field. clone ( ) ) ;
750+ }
751+
741752 // collect all field paths that access this root struct column
742753 let field_paths = struct_field_accesses
743754 . iter ( )
@@ -752,7 +763,6 @@ fn build_filter_schema(
752763 . collect :: < Vec < _ > > ( ) ;
753764
754765 if field_paths. is_empty ( ) {
755- // its a regular column - use the full type
756766 return Arc :: new ( field. clone ( ) ) ;
757767 }
758768
@@ -1027,6 +1037,8 @@ mod test {
10271037 use parquet:: file:: reader:: { FileReader , SerializedFileReader } ;
10281038 use tempfile:: NamedTempFile ;
10291039
1040+ use datafusion_physical_expr:: expressions:: Column as PhysicalColumn ;
1041+
10301042 // List predicates used by the decoder should be accepted for pushdown
10311043 #[ test]
10321044 fn test_filter_candidate_builder_supports_list_types ( ) {
@@ -1883,6 +1895,86 @@ mod test {
18831895 assert_eq ! ( file_metrics. pushdown_rows_matched. value( ) , 2 ) ;
18841896 }
18851897
1898+ #[ test]
1899+ fn projection_read_plan_preserves_full_struct ( ) {
1900+ // Schema: id (Int32), s (Struct{value: Int32, label: Utf8})
1901+ // Parquet leaves: id=0, s.value=1, s.label=2
1902+ let struct_fields: Fields = vec ! [
1903+ Arc :: new( Field :: new( "value" , DataType :: Int32 , false ) ) ,
1904+ Arc :: new( Field :: new( "label" , DataType :: Utf8 , false ) ) ,
1905+ ]
1906+ . into ( ) ;
1907+
1908+ let schema = Arc :: new ( Schema :: new ( vec ! [
1909+ Field :: new( "id" , DataType :: Int32 , false ) ,
1910+ Field :: new( "s" , DataType :: Struct ( struct_fields. clone( ) ) , false ) ,
1911+ ] ) ) ;
1912+
1913+ let batch = RecordBatch :: try_new (
1914+ Arc :: clone ( & schema) ,
1915+ vec ! [
1916+ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
1917+ Arc :: new( StructArray :: new(
1918+ struct_fields,
1919+ vec![
1920+ Arc :: new( Int32Array :: from( vec![ 10 , 20 , 30 ] ) ) as _,
1921+ Arc :: new( StringArray :: from( vec![ "a" , "b" , "c" ] ) ) as _,
1922+ ] ,
1923+ None ,
1924+ ) ) ,
1925+ ] ,
1926+ )
1927+ . unwrap ( ) ;
1928+
1929+ let file = NamedTempFile :: new ( ) . expect ( "temp file" ) ;
1930+ let mut writer =
1931+ ArrowWriter :: try_new ( file. reopen ( ) . unwrap ( ) , Arc :: clone ( & schema) , None )
1932+ . expect ( "writer" ) ;
1933+ writer. write ( & batch) . expect ( "write batch" ) ;
1934+ writer. close ( ) . expect ( "close writer" ) ;
1935+
1936+ let reader_file = file. reopen ( ) . expect ( "reopen file" ) ;
1937+ let builder = ParquetRecordBatchReaderBuilder :: try_new ( reader_file)
1938+ . expect ( "reader builder" ) ;
1939+ let metadata = builder. metadata ( ) . clone ( ) ;
1940+ let file_schema = builder. schema ( ) . clone ( ) ;
1941+ let schema_descr = metadata. file_metadata ( ) . schema_descr ( ) ;
1942+
1943+ // Simulate SELECT * output projection: Column("id") and Column("s")
1944+ // Plus a get_field(s, 'value') expression from the pushed-down filter
1945+ let exprs: Vec < Arc < dyn PhysicalExpr > > = vec ! [
1946+ Arc :: new( PhysicalColumn :: new( "id" , 0 ) ) ,
1947+ Arc :: new( PhysicalColumn :: new( "s" , 1 ) ) ,
1948+ logical2physical(
1949+ & get_field( ) . call( vec![
1950+ col( "s" ) ,
1951+ Expr :: Literal ( ScalarValue :: Utf8 ( Some ( "value" . to_string( ) ) ) , None ) ,
1952+ ] ) ,
1953+ & file_schema,
1954+ ) ,
1955+ ] ;
1956+
1957+ let read_plan = build_projection_read_plan ( exprs, & file_schema, schema_descr) ;
1958+
1959+ // The projected schema must have the FULL struct type because Column("s")
1960+ // is in the projection. It should NOT be narrowed to Struct{value: Int32}.
1961+ let s_field = read_plan. projected_schema . field_with_name ( "s" ) . unwrap ( ) ;
1962+ assert_eq ! (
1963+ s_field. data_type( ) ,
1964+ & DataType :: Struct (
1965+ vec![
1966+ Arc :: new( Field :: new( "value" , DataType :: Int32 , false ) ) ,
1967+ Arc :: new( Field :: new( "label" , DataType :: Utf8 , false ) ) ,
1968+ ]
1969+ . into( )
1970+ ) ,
1971+ ) ;
1972+
1973+ // all3 Parquet leaves should be in the projection mask
1974+ let expected_mask = ProjectionMask :: leaves ( schema_descr, [ 0 , 1 , 2 ] ) ;
1975+ assert_eq ! ( read_plan. projection_mask, expected_mask, ) ;
1976+ }
1977+
18861978 /// Sanity check that the given expression could be evaluated against the given schema without any errors.
18871979 /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
18881980 fn check_expression_can_evaluate_against_schema (
0 commit comments