-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix: External sort failing on StringView due to shared buffers
#14823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
55bda70
cea8b2b
0e4c164
6c3f4d8
92ca3b0
d5e703a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,7 +44,9 @@ use crate::{ | |
| Statistics, | ||
| }; | ||
|
|
||
| use arrow::array::{Array, RecordBatch, RecordBatchOptions, UInt32Array}; | ||
| use arrow::array::{ | ||
| Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, | ||
| }; | ||
| use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; | ||
| use arrow::datatypes::{DataType, SchemaRef}; | ||
| use arrow::row::{RowConverter, SortField}; | ||
|
|
@@ -300,6 +302,7 @@ impl ExternalSorter { | |
| if input.num_rows() == 0 { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| self.reserve_memory_for_merge()?; | ||
|
|
||
| let size = get_reserved_byte_for_record_batch(&input); | ||
|
|
@@ -397,6 +400,8 @@ impl ExternalSorter { | |
| return Ok(0); | ||
| } | ||
|
|
||
| self.organize_stringview_arrays()?; | ||
|
|
||
| debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); | ||
|
|
||
| let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; | ||
|
|
@@ -414,6 +419,66 @@ impl ExternalSorter { | |
| Ok(used) | ||
| } | ||
|
|
||
| /// Reconstruct `self.in_mem_batches` to organize the payload buffers of each | ||
| /// `StringViewArray` in sequential order by calling `gc()` on them. | ||
| /// | ||
| /// # Rationale | ||
| /// After (merge-based) sorting, all batches will be sorted into a single run, | ||
| /// but physically this sorted run is chunked into many small batches. For | ||
| /// `StringViewArray`s inside each sorted run, their inner buffers are not | ||
| /// re-constructed by default, leading to non-sequential payload locations | ||
| /// (permutated by `interleave()` Arrow kernel). A single payload buffer might | ||
| /// be shared by multiple `RecordBatch`es. | ||
| /// When writing each batch to disk, the writer has to write all referenced buffers, | ||
| /// because they have to be read back one by one to reduce memory usage. This | ||
| /// causes extra disk reads and writes, and potentially execution failure. | ||
| /// | ||
| /// # Example | ||
| /// Before sorting: | ||
| /// batch1 -> buffer1 | ||
| /// batch2 -> buffer2 | ||
| /// | ||
| /// sorted_batch1 -> buffer1 | ||
| /// -> buffer2 | ||
| /// sorted_batch2 -> buffer1 | ||
| /// -> buffer2 | ||
| /// | ||
| /// Then when spilling each batch, the writer has to write all referenced buffers | ||
| /// repeatedly. | ||
| fn organize_stringview_arrays(&mut self) -> Result<()> { | ||
| let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len()); | ||
|
|
||
| for batch in self.in_mem_batches.drain(..) { | ||
| let mut new_columns: Vec<Arc<dyn Array>> = | ||
| Vec::with_capacity(batch.num_columns()); | ||
|
|
||
| let mut arr_mutated = false; | ||
| for array in batch.columns() { | ||
| if let Some(string_view_array) = | ||
| array.as_any().downcast_ref::<StringViewArray>() | ||
| { | ||
| let new_array = string_view_array.gc(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will string_view_array.gc() affect the performance when it call many times?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated, if i make sense right, it seems not too much affection to performance, because we only remain the used buffer data? before gc: after gc:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, there is some inefficiency here, I filed apache/arrow-rs#7184. Once done on |
||
| new_columns.push(Arc::new(new_array)); | ||
| arr_mutated = true; | ||
| } else { | ||
| new_columns.push(Arc::clone(array)); | ||
| } | ||
| } | ||
|
|
||
| let organized_batch = if arr_mutated { | ||
| RecordBatch::try_new(batch.schema(), new_columns)? | ||
| } else { | ||
| batch | ||
| }; | ||
|
|
||
| organized_batches.push(organized_batch); | ||
| } | ||
|
|
||
| self.in_mem_batches = organized_batches; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Sorts the in_mem_batches in place | ||
| /// | ||
| /// Sorting may have freed memory, especially if fetch is `Some`. If | ||
|
|
@@ -446,21 +511,12 @@ impl ExternalSorter { | |
| None => { | ||
| let sorted_size = get_reserved_byte_for_record_batch(&batch); | ||
| if self.reservation.try_grow(sorted_size).is_err() { | ||
| // Directly write in_mem_batches as well as all the remaining batches in | ||
| // sorted_stream to disk. Further batches fetched from `sorted_stream` will | ||
| // be handled by the `Some(writer)` matching arm. | ||
| let spill_file = | ||
| self.runtime.disk_manager.create_tmp_file("Sorting")?; | ||
| let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?; | ||
| // Flush everything in memory to the spill file | ||
| for batch in self.in_mem_batches.drain(..) { | ||
| writer.write(&batch)?; | ||
| } | ||
| // as well as the newly sorted batch | ||
| writer.write(&batch)?; | ||
| spill_writer = Some(writer); | ||
| // Although the reservation is not enough, the batch is | ||
| // already in memory, so it's okay to combine it with previously | ||
| // sorted batches, and spill together. | ||
| self.in_mem_batches.push(batch); | ||
| self.spill().await?; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This refactor is not related to the PR, I did this along the way.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to keep the original logic:
Because the self.in_mem_batches.push(batch) may cause OOM for memory?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you @2010YOUY01 for explain, got it, i was wrong, so we do not have internal mechanism to update reservation. If we have a RAII way to improve it in the future, this is a great idea, may be we can file a ticket for it!
|
||
| self.reservation.free(); | ||
| self.spills.push(spill_file); | ||
| } else { | ||
| self.in_mem_batches.push(batch); | ||
| self.in_mem_batches_sorted = true; | ||
|
|
@@ -1425,7 +1481,7 @@ mod tests { | |
| // Processing 840 KB of data using 400 KB of memory requires at least 2 spills | ||
| // It will spill roughly 18000 rows and 800 KBytes. | ||
| // We leave a little wiggle room for the actual numbers. | ||
| assert!((2..=10).contains(&spill_count)); | ||
| assert!((12..=18).contains(&spill_count)); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is caused by the above refactor, the old implementation forget to update the statistics, so we missed several counts.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also update the comments?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated in 92ca3b0 |
||
| assert!((15000..=20000).contains(&spilled_rows)); | ||
| assert!((700000..=900000).contains(&spilled_bytes)); | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.