@@ -23,8 +23,8 @@ use std::path::{Path, PathBuf};
2323use std:: ptr:: NonNull ;
2424
2525use arrow:: array:: ArrayData ;
26- use arrow:: datatypes:: SchemaRef ;
27- use arrow:: ipc:: reader:: FileReader ;
26+ use arrow:: datatypes:: { Schema , SchemaRef } ;
27+ use arrow:: ipc:: { reader:: StreamReader , writer :: StreamWriter } ;
2828use arrow:: record_batch:: RecordBatch ;
2929use log:: debug;
3030use tokio:: sync:: mpsc:: Sender ;
@@ -34,7 +34,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
3434use datafusion_execution:: memory_pool:: human_readable_size;
3535use datafusion_execution:: SendableRecordBatchStream ;
3636
37- use crate :: common:: IPCWriter ;
3837use crate :: stream:: RecordBatchReceiverStream ;
3938
4039/// Read spilled batches from the disk
@@ -59,13 +58,13 @@ pub(crate) fn read_spill_as_stream(
5958///
6059/// Returns total number of the rows spilled to disk.
6160pub ( crate ) fn spill_record_batches (
62- batches : Vec < RecordBatch > ,
61+ batches : & [ RecordBatch ] ,
6362 path : PathBuf ,
6463 schema : SchemaRef ,
6564) -> Result < ( usize , usize ) > {
66- let mut writer = IPCWriter :: new ( path. as_ref ( ) , schema. as_ref ( ) ) ?;
65+ let mut writer = IPCStreamWriter :: new ( path. as_ref ( ) , schema. as_ref ( ) ) ?;
6766 for batch in batches {
68- writer. write ( & batch) ?;
67+ writer. write ( batch) ?;
6968 }
7069 writer. finish ( ) ?;
7170 debug ! (
@@ -79,7 +78,7 @@ pub(crate) fn spill_record_batches(
7978
8079fn read_spill ( sender : Sender < Result < RecordBatch > > , path : & Path ) -> Result < ( ) > {
8180 let file = BufReader :: new ( File :: open ( path) ?) ;
82- let reader = FileReader :: try_new ( file, None ) ?;
81+ let reader = StreamReader :: try_new ( file, None ) ?;
8382 for batch in reader {
8483 sender
8584 . blocking_send ( batch. map_err ( Into :: into) )
@@ -98,7 +97,7 @@ pub fn spill_record_batch_by_size(
9897) -> Result < ( ) > {
9998 let mut offset = 0 ;
10099 let total_rows = batch. num_rows ( ) ;
101- let mut writer = IPCWriter :: new ( & path, schema. as_ref ( ) ) ?;
100+ let mut writer = IPCStreamWriter :: new ( & path, schema. as_ref ( ) ) ?;
102101
103102 while offset < total_rows {
104103 let length = std:: cmp:: min ( total_rows - offset, batch_size_rows) ;
@@ -130,7 +129,7 @@ pub fn spill_record_batch_by_size(
130129/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
131130/// ^ ^ ^ ^
132131/// | | | |
133- /// col1->{ } | |
132+ /// col1->{ } | |
134133/// col2--------->{ }
135134///
136135/// In the above case, `get_record_batch_memory_size` will return the size of
@@ -179,6 +178,51 @@ fn count_array_data_memory_size(
179178 }
180179}
181180
181+ /// Write in Arrow IPC Stream format to a file.
182+ ///
183+ /// Stream format is used for spill because it supports dictionary replacement, and the random
184+ /// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
185+ pub ( crate ) struct IPCStreamWriter {
186+ /// Inner writer
187+ pub writer : StreamWriter < File > ,
188+ /// Batches written
189+ pub num_batches : usize ,
190+ /// Rows written
191+ pub num_rows : usize ,
192+ /// Bytes written
193+ pub num_bytes : usize ,
194+ }
195+
196+ impl IPCStreamWriter {
197+ /// Create new writer
198+ pub fn new ( path : & Path , schema : & Schema ) -> Result < Self > {
199+ let file = File :: create ( path) . map_err ( |e| {
200+ exec_datafusion_err ! ( "Failed to create partition file at {path:?}: {e:?}" )
201+ } ) ?;
202+ Ok ( Self {
203+ num_batches : 0 ,
204+ num_rows : 0 ,
205+ num_bytes : 0 ,
206+ writer : StreamWriter :: try_new ( file, schema) ?,
207+ } )
208+ }
209+
210+ /// Write one single batch
211+ pub fn write ( & mut self , batch : & RecordBatch ) -> Result < ( ) > {
212+ self . writer . write ( batch) ?;
213+ self . num_batches += 1 ;
214+ self . num_rows += batch. num_rows ( ) ;
215+ let num_bytes: usize = batch. get_array_memory_size ( ) ;
216+ self . num_bytes += num_bytes;
217+ Ok ( ( ) )
218+ }
219+
220+ /// Finish the writer
221+ pub fn finish ( & mut self ) -> Result < ( ) > {
222+ self . writer . finish ( ) . map_err ( Into :: into)
223+ }
224+ }
225+
182226#[ cfg( test) ]
183227mod tests {
184228 use super :: * ;
@@ -190,6 +234,7 @@ mod tests {
190234 use datafusion_common:: Result ;
191235 use datafusion_execution:: disk_manager:: DiskManagerConfig ;
192236 use datafusion_execution:: DiskManager ;
237+ use itertools:: Itertools ;
193238 use std:: fs:: File ;
194239 use std:: io:: BufReader ;
195240 use std:: sync:: Arc ;
@@ -214,18 +259,20 @@ mod tests {
214259 let schema = batch1. schema ( ) ;
215260 let num_rows = batch1. num_rows ( ) + batch2. num_rows ( ) ;
216261 let ( spilled_rows, _) = spill_record_batches (
217- vec ! [ batch1, batch2] ,
262+ & [ batch1, batch2] ,
218263 spill_file. path ( ) . into ( ) ,
219264 Arc :: clone ( & schema) ,
220265 ) ?;
221266 assert_eq ! ( spilled_rows, num_rows) ;
222267
223268 let file = BufReader :: new ( File :: open ( spill_file. path ( ) ) ?) ;
224- let reader = FileReader :: try_new ( file, None ) ?;
269+ let reader = StreamReader :: try_new ( file, None ) ?;
225270
226- assert_eq ! ( reader. num_batches( ) , 2 ) ;
227271 assert_eq ! ( reader. schema( ) , schema) ;
228272
273+ let batches = reader. collect_vec ( ) ;
274+ assert ! ( batches. len( ) == 2 ) ;
275+
229276 Ok ( ( ) )
230277 }
231278
@@ -249,11 +296,13 @@ mod tests {
249296 ) ?;
250297
251298 let file = BufReader :: new ( File :: open ( spill_file. path ( ) ) ?) ;
252- let reader = FileReader :: try_new ( file, None ) ?;
299+ let reader = StreamReader :: try_new ( file, None ) ?;
253300
254- assert_eq ! ( reader. num_batches( ) , 4 ) ;
255301 assert_eq ! ( reader. schema( ) , schema) ;
256302
303+ let batches = reader. collect_vec ( ) ;
304+ assert ! ( batches. len( ) == 4 ) ;
305+
257306 Ok ( ( ) )
258307 }
259308
0 commit comments