diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index b3947170d9e4..ea1a87d09148 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -134,6 +134,10 @@ impl LimitedBatchCoalescer { Ok(()) } + pub(crate) fn is_finished(&self) -> bool { + self.finished + } + /// Return the next completed batch, if any pub fn next_completed_batch(&mut self) -> Option { self.inner.next_completed_batch() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 674fe6692adf..e724cdad6484 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -26,8 +26,7 @@ use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::coalesce::LimitedBatchCoalescer; -use crate::coalesce::PushBatchStatus::LimitReached; +use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ @@ -711,23 +710,6 @@ impl FilterExecMetrics { } } -impl FilterExecStream { - fn flush_remaining_batches( - &mut self, - ) -> Poll>> { - // Flush any remaining buffered batch - match self.batch_coalescer.finish() { - Ok(()) => { - Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| { - self.metrics.selectivity.add_part(batch.num_rows()); - Ok(batch) - })) - } - Err(e) => Poll::Ready(Some(Err(e))), - } - } -} - pub fn batch_filter( batch: &RecordBatch, predicate: &Arc, @@ -767,10 +749,26 @@ impl Stream for FilterExecStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let poll; let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone(); loop { + // If there is a completed batch ready, return it + if let Some(batch) = self.batch_coalescer.next_completed_batch() { + self.metrics.selectivity.add_part(batch.num_rows()); + let poll = Poll::Ready(Some(Ok(batch))); + return self.metrics.baseline_metrics.record_poll(poll); + } + + if self.batch_coalescer.is_finished() { + // If input is done and no batches are ready, return None to signal end of stream. + return Poll::Ready(None); + } + + // Attempt to pull the next batch from the input stream. match ready!(self.input.poll_next_unpin(cx)) { + None => { + self.batch_coalescer.finish()?; + // continue draining the coalescer + } Some(Ok(batch)) => { let timer = elapsed_compute.timer(); let status = self.predicate.as_ref() @@ -802,37 +800,22 @@ impl Stream for FilterExecStream { })?; timer.done(); - if let LimitReached = status { - poll = self.flush_remaining_batches(); - break; - } - - if let Some(batch) = self.batch_coalescer.next_completed_batch() { - self.metrics.selectivity.add_part(batch.num_rows()); - poll = Poll::Ready(Some(Ok(batch))); - break; - } - continue; - } - None => { - // Flush any remaining buffered batch - match self.batch_coalescer.finish() { - Ok(()) => { - poll = self.flush_remaining_batches(); + match status { + PushBatchStatus::Continue => { + // Keep pushing more batches } - Err(e) => { - poll = Poll::Ready(Some(Err(e))); + PushBatchStatus::LimitReached => { + // limit was reached, so stop early + self.batch_coalescer.finish()?; + // continue draining the coalescer } } - break; - } - value => { - poll = Poll::Ready(value); - break; } + + // Error case + other => return Poll::Ready(other), } } - self.metrics.baseline_metrics.record_poll(poll) } fn size_hint(&self) -> (usize, Option) { diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 524304546d56..96471411e0f9 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -871,4 +871,4 @@ DROP TABLE test_limit_with_partitions; # Tear down src_table table: statement ok -DROP TABLE src_table; +DROP TABLE src_table; \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/limit_single_row_batches.slt b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt new file mode 100644 index 000000000000..fbdb0140e047 --- /dev/null +++ b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt @@ -0,0 +1,22 @@ + +# minimize batch size to 1 in order to trigger different code paths +statement ok +set datafusion.execution.batch_size = '1'; + +# ---- +# tests with target partition set to 1 +# ---- +statement ok +set datafusion.execution.target_partitions = '1'; + + +statement ok +CREATE TABLE filter_limit (i INT) as values (1), (2); + +query I +SELECT COUNT(*) FROM (SELECT i FROM filter_limit WHERE i <> 0 LIMIT 1); +---- +1 + +statement ok +DROP TABLE filter_limit; \ No newline at end of file