diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index 57aabca361553..8c0d0ccbea430 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -377,13 +377,44 @@ cargo test --features memory-accounting --test sqllogictests -- \ `--default-pool-size-mb` seeds each per-file SLT context's MemoryPool with the given size in MB and arms the bank as a no-op until a test opts in. -**Opting an individual test in.** Add `SET datafusion.runtime.memory_limit = 'N'` at the top of the `.slt`. The wrapping `AccountingMemoryPool` then -tightens its allocator-level bank to `N * 1.10` (10% headroom). If the test -allocates more than that — including bytes DataFusion's tracker didn't see -— the test panics with an `OverdraftPanic` reporting the actual balance at -panic time. SLTs without a `SET` of `memory_limit` see no change in -behavior; the bank stays loose and `SHOW ALL` continues to render the limit -as `unlimited`. +**Opting an individual test in.** Add `SET datafusion.runtime.memory_limit = 'N'` +at the top of the `.slt`. The wrapping `AccountingMemoryPool` then tightens +its allocator-level bank to `N * DEFAULT_MEMORY_OVERDRAFT_FACTOR` (currently +`8.0`, i.e. 800% — yes, that's loose). If the test allocates more than that +— including bytes DataFusion's tracker didn't see — the test panics with an +`OverdraftPanic` reporting the actual balance at panic time. SLTs without a +`SET` of `memory_limit` see no change in behavior; the bank stays loose and +`SHOW ALL` continues to render the limit as `unlimited`. + +The default factor is deliberately loose so the existing suite passes today; +the long-term goal is to drive it down toward ~`1.1` as operators that +allocate outside the pool's tracking are fixed. Progress is tracked in +[epic #22758](https://github.com/apache/datafusion/issues/22758). + +**When a test trips the accounting check**, the runner emits a message like: + +> Memory accounting mismatch: this query allocated N bytes more than +> DataFusion's MemoryPool accounted for. Some operator is allocating outside +> the pool's tracking — this is a real accounting bug worth fixing. + +That bytes-over-budget number is the gap we're hunting. The right fix is to +account for the offending allocation inside the operator (file a sub-issue +under the epic above, with the query + observed overshoot). For an SLT you +didn't author that just started failing, the unblocker is to bump the +overdraft factor for that one test: + +```sql +SET datafusion.sqllogictest.memory_overdraft_factor = 5.0; +SET datafusion.runtime.memory_limit = '150K'; +-- ... rest of test ... +``` + +The override is consumed by the *next* `SET datafusion.runtime.memory_limit` +(which is what arms the bank), then auto-resets to the default — so each +test opts in independently and the per-file isolation in the runner still +holds. The setting only exists in the SLT runner; DataFusion's parser +doesn't know about it, and nothing changes in `datafusion-cli` or anywhere +else. Inside the runner each file gets its own multi-thread Tokio runtime so context-ids stamped onto worker threads stay stable for the allocator diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 2b08769bf5208..f046c9e32683e 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -144,16 +144,28 @@ async fn run_tests() -> Result<()> { options.warn_on_ignored(); #[cfg(feature = "memory-accounting")] - if let Some(pool_mb) = options.default_pool_size_mb { - let pool_bytes = pool_mb.saturating_mul(1024 * 1024); - // Same value drives the inner MemoryPool's size and the bank's - // default budget. The wrapper renders this value as `unlimited` in - // `SHOW ALL` (sentinel for "no SET has happened"); once a test - // calls `SET datafusion.runtime.memory_limit`, the wrapper retunes - // the bank to that limit + 10% headroom. - datafusion_sqllogictest::set_memory_tracker_limit(pool_bytes); - datafusion_sqllogictest::set_default_budget(pool_bytes as isize); - log::info!("memory-accounting on: default pool size = {pool_mb} MB"); + { + // OverdraftPanic frequently fires on tokio worker threads spawned by + // operators like RepartitionExec; the joiner converts that JoinError + // into a generic DataFusionError::External("task NN panicked") that + // discards the payload. The hook prints the real message + remediation + // on stderr before the harness swallows the unwind. Install + // unconditionally — the bank is also armed by the resize path that + // runs for every `SET datafusion.runtime.memory_limit`, independent + // of whether `--default-pool-size-mb` was passed. + datafusion_sqllogictest::install_overdraft_panic_hook(); + + if let Some(pool_mb) = options.default_pool_size_mb { + let pool_bytes = pool_mb.saturating_mul(1024 * 1024); + // Same value drives the inner MemoryPool's size and the bank's + // default budget. The wrapper renders this value as `unlimited` in + // `SHOW ALL` (sentinel for "no SET has happened"); once a test + // calls `SET datafusion.runtime.memory_limit`, the wrapper retunes + // the bank to that limit + 10% headroom. + datafusion_sqllogictest::set_memory_tracker_limit(pool_bytes); + datafusion_sqllogictest::set_default_budget(pool_bytes as isize); + log::info!("memory-accounting on: default pool size = {pool_mb} MB"); + } } // Print parallelism info for debugging CI performance diff --git a/datafusion/sqllogictest/src/accounting.rs b/datafusion/sqllogictest/src/accounting.rs index 46b6120c24d28..498ab199f55f4 100644 --- a/datafusion/sqllogictest/src/accounting.rs +++ b/datafusion/sqllogictest/src/accounting.rs @@ -58,7 +58,16 @@ static ACCOUNTS: OnceLock>> = OnceLock::new() /// Starting budget for any new account, set by [`set_default_budget`] and /// inherited by per-file SLT contexts spawned after. -static DEFAULT_BUDGET: AtomicIsize = AtomicIsize::new(0); +/// +/// Default is `isize::MAX / 2` — effectively infinite — so an un-armed bank +/// is a no-op. Real enforcement only kicks in once either +/// [`set_default_budget`] (via `--default-pool-size-mb`) or the +/// `AccountingMemoryPool::try_resize` path (via `SET datafusion.runtime. +/// memory_limit`) calls [`set_account_balance`] with a finite value. +/// Otherwise file setup allocations would settle into a 0-budget bank +/// before any real work runs and trip the panic with a misleading +/// backtrace pointing at `RuntimeEnv::default`. +static DEFAULT_BUDGET: AtomicIsize = AtomicIsize::new(isize::MAX / 2); fn accounts() -> &'static RwLock> { ACCOUNTS.get_or_init(|| RwLock::new(HashMap::new())) @@ -142,6 +151,71 @@ pub struct OverdraftPanic { pub account_balance: isize, } +/// Install a chained panic hook that formats [`OverdraftPanic`] payloads as a +/// readable message instead of the default `Box` rendering. Other +/// panic payloads pass through to the previous hook unchanged. +/// +/// Idempotent: only the first call wins. Safe to call from any thread, but +/// best called once from `main` before any work spawns. +/// +/// The hook is needed because [`OverdraftPanic`] fires inside spawned tokio +/// tasks (e.g. RepartitionExec workers). Tokio's task harness catches the +/// unwind on the worker, surfaces it to the joiner as a `JoinError`, and the +/// joiner often coerces that into `DataFusionError::External("task NN +/// panicked")` — the original payload is lost. The hook runs on the worker +/// *before* the harness eats it, so the operator + balance still hit stderr. +pub fn install_overdraft_panic_hook() { + static INSTALLED: std::sync::Once = std::sync::Once::new(); + INSTALLED.call_once(|| { + let prev = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + if let Some(od) = info.payload().downcast_ref::() { + let overshoot = -od.account_balance; + let loc = info + .location() + .map(|l| format!("{}:{}:{}", l.file(), l.line(), l.column())) + .unwrap_or_else(|| "".to_string()); + let thread = std::thread::current(); + let thread_name = thread.name().unwrap_or(""); + // Force-capture the backtrace — without this, the default + // hook's RUST_BACKTRACE behavior is lost when we short-circuit, + // and that's exactly the line of code that allocated outside + // the MemoryPool's tracking. Without it the user has nothing + // to grep for. + let bt = std::backtrace::Backtrace::force_capture(); + eprintln!( + "thread '{thread_name}' panicked at {loc}:\n\ + Memory accounting mismatch: this thread allocated {overshoot} \ + bytes more than DataFusion's MemoryPool accounted for. Some \ + operator is allocating outside the pool's tracking — this is a \ + real accounting bug worth fixing.\n\ + \n\ + If you made changes to an operator or UDF this is probably \ + related to your work and should be investigated. If you do not \ + believe this is related to your change, the fastest path \ + forward is to opt the failing SLT into a larger overdraft \ + tolerance and file the gap against the epic so we can pay it \ + down:\n\ + \n \ + SET datafusion.sqllogictest.memory_overdraft_factor = N;\n\ + \n\ + where N is roughly `2 * / \ + datafusion.runtime.memory_limit`. The override applies to the \ + next `SET datafusion.runtime.memory_limit` only, then \ + auto-resets.\n\ + \n\ + Please record the query + observed overshoot at:\n \ + https://github.com/apache/datafusion/issues/22758\n\ + \n\ + The untracked allocation is in this backtrace:\n{bt}" + ); + return; + } + prev(info); + })); + }); +} + /// Set the default budget new accounts will be created with. Existing /// accounts are untouched. pub fn set_default_budget(value: isize) { diff --git a/datafusion/sqllogictest/src/accounting_pool.rs b/datafusion/sqllogictest/src/accounting_pool.rs index a9d2db9f12261..fe68ea87ccfef 100644 --- a/datafusion/sqllogictest/src/accounting_pool.rs +++ b/datafusion/sqllogictest/src/accounting_pool.rs @@ -23,24 +23,58 @@ //! which `RuntimeEnvBuilder::with_memory_limit` triggers on `SET //! datafusion.runtime.memory_limit = '…'`). //! -//! Each retune sets the bank to `new_limit * HEADROOM_FACTOR`. A query -//! that allocates past that envelope panics with an `OverdraftPanic` — -//! the gap between DF's voluntary tracker and the allocator's reality -//! is the bug we're hunting. +//! Each retune sets the bank to `new_limit * DEFAULT_MEMORY_OVERDRAFT_FACTOR` +//! (or whatever the current thread has set via [`set_memory_overdraft_factor`] +//! — used by the SLT runner to honor per-test +//! `SET datafusion.sqllogictest.memory_overdraft_factor = N` overrides). +//! A query that allocates past that envelope panics with an `OverdraftPanic` +//! — the gap between DF's voluntary tracker and the allocator's reality is +//! the bug we're hunting. use crate::set_account_balance; use datafusion::common::Result; use datafusion::execution::memory_pool::{ MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, }; +use std::cell::Cell; use std::fmt::{self, Display, Formatter}; use std::sync::Arc; -/// Headroom over the pool's declared limit. Anything past this is an -/// untracked allocation — by definition, since DF's pool didn't see it. +/// Default overdraft factor: how much the allocator may exceed DataFusion's +/// declared `MemoryPool` limit before [`AccountingMemoryPool`] panics. +/// Anything past this is an untracked allocation — by definition, since +/// DF's pool didn't see it. /// -/// 800% high, but that's what it takes to pass the SLT suite right now. Goal should be ~10% -const HEADROOM_FACTOR: f64 = 8.0; +/// `8.0` (800%) is what it takes to pass the SLT suite today. Goal is to +/// drive this down to ~`1.1`; progress tracked in epic +/// . +/// +/// SLTs that exercise queries with more in-flight batches than the default +/// covers can opt into a larger factor for one resize via +/// `SET datafusion.sqllogictest.memory_overdraft_factor = N`, which the +/// SLT runner intercepts and stores in [`MEMORY_OVERDRAFT_FACTOR`]. +pub const DEFAULT_MEMORY_OVERDRAFT_FACTOR: f64 = 8.0; + +thread_local! { + /// Per-thread overdraft multiplier, applied by [`AccountingMemoryPool::try_resize`]. + /// The SLT runner writes here when it sees a + /// `SET datafusion.sqllogictest.memory_overdraft_factor = N` statement; + /// it resets to [`DEFAULT_MEMORY_OVERDRAFT_FACTOR`] on each `try_resize` + /// so opt-ins stay scoped to the SLT block that asked for them. + static MEMORY_OVERDRAFT_FACTOR: Cell = const { Cell::new(DEFAULT_MEMORY_OVERDRAFT_FACTOR) }; +} + +/// Override the memory overdraft factor for the current thread. The next +/// [`AccountingMemoryPool::try_resize`] consumes the override, then resets +/// the thread-local back to [`DEFAULT_MEMORY_OVERDRAFT_FACTOR`]. +pub fn set_memory_overdraft_factor(factor: f64) { + MEMORY_OVERDRAFT_FACTOR.with(|h| h.set(factor)); +} + +/// Read the current thread's memory overdraft factor. +pub fn memory_overdraft_factor() -> f64 { + MEMORY_OVERDRAFT_FACTOR.with(|h| h.get()) +} pub struct AccountingMemoryPool { inner: Arc, @@ -117,7 +151,9 @@ impl MemoryPool for AccountingMemoryPool { fn try_resize(&self, new_limit: usize) -> Result<()> { self.inner.try_resize(new_limit)?; - set_account_balance((new_limit as f64 * HEADROOM_FACTOR) as isize); + let factor = + MEMORY_OVERDRAFT_FACTOR.with(|h| h.replace(DEFAULT_MEMORY_OVERDRAFT_FACTOR)); + set_account_balance((new_limit as f64 * factor) as isize); Ok(()) } } @@ -162,13 +198,45 @@ mod tests { ); pool.try_resize(50_000).unwrap(); - // Balance is reset to limit * HEADROOM_FACTOR, minus a small - // drift from this test thread's own allocs between set and read. - let expected = (50_000.0 * HEADROOM_FACTOR) as isize; + // Balance is reset to limit * DEFAULT_MEMORY_OVERDRAFT_FACTOR, + // minus a small drift from this test thread's own allocs between + // set and read. + let expected = (50_000.0 * DEFAULT_MEMORY_OVERDRAFT_FACTOR) as isize; let bal = account_balance(); assert!( (50_000..=expected).contains(&bal), "balance not in expected range: got {bal}, expected ≤ {expected}" ); } + + #[test] + fn overdraft_factor_override_is_consumed_by_try_resize() { + set_thread_context_id(next_context_id()); + + let default_size = 1_000_000; + let pool = AccountingMemoryPool::new( + Arc::new(GreedyMemoryPool::new(default_size)), + default_size, + ); + + set_memory_overdraft_factor(2.0); + pool.try_resize(100_000).unwrap(); + // 2.0 * 100K = 200K; subtract worst-case drift from this thread's + // own allocs between the resize and the read. + let bal = account_balance(); + assert!( + (100_000..=200_000).contains(&bal), + "first resize used wrong factor: got {bal}, expected ≤ 200_000" + ); + + // Override consumed: next resize falls back to default. + pool.try_resize(100_000).unwrap(); + let bal = account_balance(); + let expected_default = (100_000.0 * DEFAULT_MEMORY_OVERDRAFT_FACTOR) as isize; + assert!( + (100_000..=expected_default).contains(&bal), + "second resize did not fall back to default factor: got {bal}, \ + expected ≤ {expected_default}" + ); + } } diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 0c038fb00fa08..6e1681ae5f30c 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -87,6 +87,15 @@ impl DataFusion { /// feature, allocator-detected overdrafts panic with `OverdraftPanic`; /// catch them here and translate to a clean `Err`. async fn run_one(&self, sql: &str) -> Result { + // SLT-only knob: `SET datafusion.sqllogictest.memory_overdraft_factor = N` + // is intercepted here (DataFusion doesn't know about it) and feeds + // the AccountingMemoryPool's next try_resize. No-op when the + // memory-accounting feature is off, so the same SLT parses either + // way. + if let Some(result) = try_intercept_overdraft_set(sql) { + return result; + } + #[cfg(feature = "memory-accounting")] { use crate::OverdraftPanic; @@ -97,23 +106,18 @@ impl DataFusion { return match std::panic::AssertUnwindSafe(fut).catch_unwind().await { Ok(r) => r, Err(payload) => { - if let Some(od) = payload.downcast_ref::() { - let df_reserved_mb = - (self.ctx.runtime_env().memory_pool.reserved() as u64) - / (1024 * 1024); - warn!( - "[{}] killed by allocator overdraft: \ - account balance = {} bytes, df-pool reserved = {df_reserved_mb} MB; \ - sql = {sql:?}", - self.relative_path.display(), - od.account_balance, - ); - // Restore the bank so the next statement starts clean + if payload.is::() { + // The remediation message was already printed to stderr by + // `install_overdraft_panic_hook` on whichever thread fired + // the panic. Reset the bank so the next statement starts + // clean — otherwise the bank stays negative and every + // subsequent allocation refires. crate::reset_account_to_default(); - Err(DFSqlLogicTestError::Other(format!( - "allocator overdraft: account balance at panic = {} bytes", - od.account_balance, - ))) + Err(DFSqlLogicTestError::Other( + "memory accounting overdraft — see stderr for the \ + actionable message and remediation" + .to_string(), + )) } else { // Not our panic — re-raise so test runner sees it. std::panic::resume_unwind(payload); @@ -265,11 +269,106 @@ async fn run_query( } } +/// The variable name the SLT runner intercepts to override the +/// [`crate::AccountingMemoryPool`]'s per-thread overdraft factor. +const OVERDRAFT_FACTOR_VAR: &str = "datafusion.sqllogictest.memory_overdraft_factor"; + +/// Recognize `SET datafusion.sqllogictest.memory_overdraft_factor = N` and +/// route it into the accounting pool's per-thread override. Returns: +/// - `None` if `sql` is anything else (caller runs it normally), +/// - `Some(Ok(StatementComplete))` after a successful override, +/// - `Some(Err(...))` if the value didn't parse as `f64`. +/// +/// DataFusion's parser owns the SQL grammar; this just inspects the AST +/// for our magic variable and short-circuits before the statement reaches +/// the planner (which would reject the unknown variable name). +fn try_intercept_overdraft_set(sql: &str) -> Option> { + use datafusion::sql::parser::{DFParser, Statement as DFStatement}; + use sqlparser::ast::{Set, Statement as SqlStatement}; + + let mut parsed = DFParser::parse_sql(sql).ok()?; + let DFStatement::Statement(stmt) = parsed.pop_front()? else { + return None; + }; + let SqlStatement::Set(Set::SingleAssignment { + variable, values, .. + }) = *stmt + else { + return None; + }; + + if !variable + .to_string() + .eq_ignore_ascii_case(OVERDRAFT_FACTOR_VAR) + { + return None; + } + + if values.len() != 1 { + return Some(Err(DFSqlLogicTestError::Other(format!( + "{OVERDRAFT_FACTOR_VAR} expects exactly one value, got {}", + values.len() + )))); + } + + // Render whatever Expr the parser produced (handles `5.0`, `'5.0'`, + // `"5.0"`, signed numbers via UnaryOp, etc.) then peel any quotes. + let raw = values[0].to_string(); + let value_str = raw.trim_matches(|c| c == '\'' || c == '"').trim(); + let factor: f64 = match value_str.parse() { + Ok(f) => f, + Err(e) => { + return Some(Err(DFSqlLogicTestError::Other(format!( + "{OVERDRAFT_FACTOR_VAR} expects an f64, got {value_str:?}: {e}" + )))); + } + }; + + #[cfg(feature = "memory-accounting")] + crate::set_memory_overdraft_factor(factor); + #[cfg(not(feature = "memory-accounting"))] + let _ = factor; // accepted but inert without the feature + + Some(Ok(DBOutput::StatementComplete(0))) +} + #[cfg(test)] mod tests { use super::*; use sqllogictest::AsyncDB; + #[tokio::test] + async fn intercept_overdraft_set_parses_bare_value() { + let out = try_intercept_overdraft_set( + "SET datafusion.sqllogictest.memory_overdraft_factor = 5.0", + ) + .expect("recognized as our SET"); + assert!(out.is_ok(), "parse should succeed"); + #[cfg(feature = "memory-accounting")] + assert!((crate::memory_overdraft_factor() - 5.0).abs() < f64::EPSILON); + } + + #[tokio::test] + async fn intercept_overdraft_set_passes_through_unrelated_sets() { + assert!( + try_intercept_overdraft_set("SET datafusion.execution.batch_size = 2048") + .is_none() + ); + assert!(try_intercept_overdraft_set("SELECT 1").is_none()); + } + + #[tokio::test] + async fn intercept_overdraft_set_rejects_bad_value() { + let out = try_intercept_overdraft_set( + "SET datafusion.sqllogictest.memory_overdraft_factor = 'not a number'", + ) + .expect("recognized as our SET"); + let Err(err) = out else { + panic!("expected parse to fail"); + }; + assert!(err.to_string().contains("expects an f64"), "{err}"); + } + #[tokio::test] async fn validate_config_unchanged_detects_modified_config() { let ctx = SessionContext::new(); diff --git a/datafusion/sqllogictest/src/lib.rs b/datafusion/sqllogictest/src/lib.rs index 54f460958c0ab..a38d3f42233a2 100644 --- a/datafusion/sqllogictest/src/lib.rs +++ b/datafusion/sqllogictest/src/lib.rs @@ -36,12 +36,15 @@ mod test_file; #[cfg(feature = "memory-accounting")] pub use accounting::{ AccountingAllocator, OverdraftPanic, account_balance, current_context_id, - default_budget, local_balance, memory_tracker_limit, next_context_id, - reset_account_to_default, set_account_balance, set_default_budget, + default_budget, install_overdraft_panic_hook, local_balance, memory_tracker_limit, + next_context_id, reset_account_to_default, set_account_balance, set_default_budget, set_memory_tracker_limit, set_thread_context_id, settle_thread_local, }; #[cfg(feature = "memory-accounting")] -pub use accounting_pool::AccountingMemoryPool; +pub use accounting_pool::{ + AccountingMemoryPool, DEFAULT_MEMORY_OVERDRAFT_FACTOR, memory_overdraft_factor, + set_memory_overdraft_factor, +}; pub use engines::CurrentlyExecutingSqlTracker; pub use engines::DFColumnType;