Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions datafusion/sqllogictest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,44 @@ cargo test --features memory-accounting --test sqllogictests -- \
`--default-pool-size-mb` seeds each per-file SLT context's MemoryPool with
the given size in MB and arms the bank as a no-op until a test opts in.

**Opting an individual test in.** Add `SET datafusion.runtime.memory_limit = 'N'` at the top of the `.slt`. The wrapping `AccountingMemoryPool` then
tightens its allocator-level bank to `N * 1.10` (10% headroom). If the test
allocates more than that — including bytes DataFusion's tracker didn't see
— the test panics with an `OverdraftPanic` reporting the actual balance at
panic time. SLTs without a `SET` of `memory_limit` see no change in
behavior; the bank stays loose and `SHOW ALL` continues to render the limit
as `unlimited`.
**Opting an individual test in.** Add `SET datafusion.runtime.memory_limit = 'N'`
at the top of the `.slt`. The wrapping `AccountingMemoryPool` then tightens
its allocator-level bank to `N * DEFAULT_MEMORY_OVERDRAFT_FACTOR` (currently
`8.0`, i.e. 800% — yes, that's loose). If the test allocates more than that
— including bytes DataFusion's tracker didn't see — the test panics with an
`OverdraftPanic` reporting the actual balance at panic time. SLTs without a
`SET` of `memory_limit` see no change in behavior; the bank stays loose and
`SHOW ALL` continues to render the limit as `unlimited`.

The default factor is deliberately loose so the existing suite passes today;
the long-term goal is to drive it down toward ~`1.1` as operators that
allocate outside the pool's tracking are fixed. Progress is tracked in
[epic #22758](https://github.com/apache/datafusion/issues/22758).

**When a test trips the accounting check**, the runner emits a message like:

> Memory accounting mismatch: this query allocated N bytes more than
> DataFusion's MemoryPool accounted for. Some operator is allocating outside
> the pool's tracking — this is a real accounting bug worth fixing.

That bytes-over-budget number is the gap we're hunting. The right fix is to
account for the offending allocation inside the operator (file a sub-issue
under the epic above, with the query + observed overshoot). For an SLT you
didn't author that just started failing, the unblocker is to bump the
overdraft factor for that one test:

```sql
SET datafusion.sqllogictest.memory_overdraft_factor = 5.0;
SET datafusion.runtime.memory_limit = '150K';
-- ... rest of test ...
```

The override is consumed by the *next* `SET datafusion.runtime.memory_limit`
(which is what arms the bank), then auto-resets to the default — so each
test opts in independently and the per-file isolation in the runner still
holds. The setting only exists in the SLT runner; DataFusion's parser
doesn't know about it, and nothing changes in `datafusion-cli` or anywhere
else.

Inside the runner each file gets its own multi-thread Tokio runtime so
context-ids stamped onto worker threads stay stable for the allocator
Expand Down
32 changes: 22 additions & 10 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,28 @@ async fn run_tests() -> Result<()> {
options.warn_on_ignored();

#[cfg(feature = "memory-accounting")]
if let Some(pool_mb) = options.default_pool_size_mb {
let pool_bytes = pool_mb.saturating_mul(1024 * 1024);
// Same value drives the inner MemoryPool's size and the bank's
// default budget. The wrapper renders this value as `unlimited` in
// `SHOW ALL` (sentinel for "no SET has happened"); once a test
// calls `SET datafusion.runtime.memory_limit`, the wrapper retunes
// the bank to that limit + 10% headroom.
datafusion_sqllogictest::set_memory_tracker_limit(pool_bytes);
datafusion_sqllogictest::set_default_budget(pool_bytes as isize);
log::info!("memory-accounting on: default pool size = {pool_mb} MB");
{
// OverdraftPanic frequently fires on tokio worker threads spawned by
// operators like RepartitionExec; the joiner converts that JoinError
// into a generic DataFusionError::External("task NN panicked") that
// discards the payload. The hook prints the real message + remediation
// on stderr before the harness swallows the unwind. Install
// unconditionally — the bank is also armed by the resize path that
// runs for every `SET datafusion.runtime.memory_limit`, independent
// of whether `--default-pool-size-mb` was passed.
datafusion_sqllogictest::install_overdraft_panic_hook();

if let Some(pool_mb) = options.default_pool_size_mb {
let pool_bytes = pool_mb.saturating_mul(1024 * 1024);
// Same value drives the inner MemoryPool's size and the bank's
// default budget. The wrapper renders this value as `unlimited` in
// `SHOW ALL` (sentinel for "no SET has happened"); once a test
// calls `SET datafusion.runtime.memory_limit`, the wrapper retunes
// the bank to that limit + 10% headroom.
datafusion_sqllogictest::set_memory_tracker_limit(pool_bytes);
datafusion_sqllogictest::set_default_budget(pool_bytes as isize);
log::info!("memory-accounting on: default pool size = {pool_mb} MB");
}
}

// Print parallelism info for debugging CI performance
Expand Down
76 changes: 75 additions & 1 deletion datafusion/sqllogictest/src/accounting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@ static ACCOUNTS: OnceLock<RwLock<HashMap<usize, AtomicIsize>>> = OnceLock::new()

/// Starting budget for any new account, set by [`set_default_budget`] and
/// inherited by per-file SLT contexts spawned after.
static DEFAULT_BUDGET: AtomicIsize = AtomicIsize::new(0);
///
/// Default is `isize::MAX / 2` — effectively infinite — so an un-armed bank
/// is a no-op. Real enforcement only kicks in once either
/// [`set_default_budget`] (via `--default-pool-size-mb`) or the
/// `AccountingMemoryPool::try_resize` path (via `SET datafusion.runtime.
/// memory_limit`) calls [`set_account_balance`] with a finite value.
/// Otherwise file setup allocations would settle into a 0-budget bank
/// before any real work runs and trip the panic with a misleading
/// backtrace pointing at `RuntimeEnv::default`.
static DEFAULT_BUDGET: AtomicIsize = AtomicIsize::new(isize::MAX / 2);

fn accounts() -> &'static RwLock<HashMap<usize, AtomicIsize>> {
ACCOUNTS.get_or_init(|| RwLock::new(HashMap::new()))
Expand Down Expand Up @@ -142,6 +151,71 @@ pub struct OverdraftPanic {
pub account_balance: isize,
}

/// Install a chained panic hook that formats [`OverdraftPanic`] payloads as a
/// readable message instead of the default `Box<dyn Any>` rendering. Other
/// panic payloads pass through to the previous hook unchanged.
///
/// Idempotent: only the first call wins. Safe to call from any thread, but
/// best called once from `main` before any work spawns.
///
/// The hook is needed because [`OverdraftPanic`] fires inside spawned tokio
/// tasks (e.g. RepartitionExec workers). Tokio's task harness catches the
/// unwind on the worker, surfaces it to the joiner as a `JoinError`, and the
/// joiner often coerces that into `DataFusionError::External("task NN
/// panicked")` — the original payload is lost. The hook runs on the worker
/// *before* the harness eats it, so the operator + balance still hit stderr.
pub fn install_overdraft_panic_hook() {
static INSTALLED: std::sync::Once = std::sync::Once::new();
INSTALLED.call_once(|| {
let prev = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
if let Some(od) = info.payload().downcast_ref::<OverdraftPanic>() {
let overshoot = -od.account_balance;
let loc = info
.location()
.map(|l| format!("{}:{}:{}", l.file(), l.line(), l.column()))
.unwrap_or_else(|| "<unknown>".to_string());
let thread = std::thread::current();
let thread_name = thread.name().unwrap_or("<unnamed>");
// Force-capture the backtrace — without this, the default
// hook's RUST_BACKTRACE behavior is lost when we short-circuit,
// and that's exactly the line of code that allocated outside
// the MemoryPool's tracking. Without it the user has nothing
// to grep for.
let bt = std::backtrace::Backtrace::force_capture();
eprintln!(
"thread '{thread_name}' panicked at {loc}:\n\
Memory accounting mismatch: this thread allocated {overshoot} \
bytes more than DataFusion's MemoryPool accounted for. Some \
operator is allocating outside the pool's tracking — this is a \
real accounting bug worth fixing.\n\
\n\
If you made changes to an operator or UDF this is probably \
related to your work and should be investigated. If you do not \
believe this is related to your change, the fastest path \
forward is to opt the failing SLT into a larger overdraft \
tolerance and file the gap against the epic so we can pay it \
down:\n\
\n \
SET datafusion.sqllogictest.memory_overdraft_factor = N;\n\
\n\
where N is roughly `2 * <bytes the query actually needs> / \
datafusion.runtime.memory_limit`. The override applies to the \
next `SET datafusion.runtime.memory_limit` only, then \
auto-resets.\n\
\n\
Please record the query + observed overshoot at:\n \
https://github.com/apache/datafusion/issues/22758\n\
\n\
The untracked allocation is in this backtrace:\n{bt}"
);
return;
}
prev(info);
}));
});
}

/// Set the default budget new accounts will be created with. Existing
/// accounts are untouched.
pub fn set_default_budget(value: isize) {
Expand Down
92 changes: 80 additions & 12 deletions datafusion/sqllogictest/src/accounting_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,58 @@
//! which `RuntimeEnvBuilder::with_memory_limit` triggers on `SET
//! datafusion.runtime.memory_limit = '…'`).
//!
//! Each retune sets the bank to `new_limit * HEADROOM_FACTOR`. A query
//! that allocates past that envelope panics with an `OverdraftPanic` —
//! the gap between DF's voluntary tracker and the allocator's reality
//! is the bug we're hunting.
//! Each retune sets the bank to `new_limit * DEFAULT_MEMORY_OVERDRAFT_FACTOR`
//! (or whatever the current thread has set via [`set_memory_overdraft_factor`]
//! — used by the SLT runner to honor per-test
//! `SET datafusion.sqllogictest.memory_overdraft_factor = N` overrides).
//! A query that allocates past that envelope panics with an `OverdraftPanic`
//! — the gap between DF's voluntary tracker and the allocator's reality is
//! the bug we're hunting.

use crate::set_account_balance;
use datafusion::common::Result;
use datafusion::execution::memory_pool::{
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
};
use std::cell::Cell;
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;

/// Headroom over the pool's declared limit. Anything past this is an
/// untracked allocation — by definition, since DF's pool didn't see it.
/// Default overdraft factor: how much the allocator may exceed DataFusion's
/// declared `MemoryPool` limit before [`AccountingMemoryPool`] panics.
/// Anything past this is an untracked allocation — by definition, since
/// DF's pool didn't see it.
///
/// 800% high, but that's what it takes to pass the SLT suite right now. Goal should be ~10%
const HEADROOM_FACTOR: f64 = 8.0;
/// `8.0` (800%) is what it takes to pass the SLT suite today. Goal is to
/// drive this down to ~`1.1`; progress tracked in epic
/// <https://github.com/apache/datafusion/issues/22758>.
///
/// SLTs that exercise queries with more in-flight batches than the default
/// covers can opt into a larger factor for one resize via
/// `SET datafusion.sqllogictest.memory_overdraft_factor = N`, which the
/// SLT runner intercepts and stores in [`MEMORY_OVERDRAFT_FACTOR`].
pub const DEFAULT_MEMORY_OVERDRAFT_FACTOR: f64 = 8.0;

thread_local! {
/// Per-thread overdraft multiplier, applied by [`AccountingMemoryPool::try_resize`].
/// The SLT runner writes here when it sees a
/// `SET datafusion.sqllogictest.memory_overdraft_factor = N` statement;
/// it resets to [`DEFAULT_MEMORY_OVERDRAFT_FACTOR`] on each `try_resize`
/// so opt-ins stay scoped to the SLT block that asked for them.
static MEMORY_OVERDRAFT_FACTOR: Cell<f64> = const { Cell::new(DEFAULT_MEMORY_OVERDRAFT_FACTOR) };
}

/// Override the memory overdraft factor for the current thread. The next
/// [`AccountingMemoryPool::try_resize`] consumes the override, then resets
/// the thread-local back to [`DEFAULT_MEMORY_OVERDRAFT_FACTOR`].
pub fn set_memory_overdraft_factor(factor: f64) {
MEMORY_OVERDRAFT_FACTOR.with(|h| h.set(factor));
}

/// Read the current thread's memory overdraft factor.
pub fn memory_overdraft_factor() -> f64 {
MEMORY_OVERDRAFT_FACTOR.with(|h| h.get())
}

pub struct AccountingMemoryPool {
inner: Arc<dyn MemoryPool>,
Expand Down Expand Up @@ -117,7 +151,9 @@ impl MemoryPool for AccountingMemoryPool {

fn try_resize(&self, new_limit: usize) -> Result<()> {
self.inner.try_resize(new_limit)?;
set_account_balance((new_limit as f64 * HEADROOM_FACTOR) as isize);
let factor =
MEMORY_OVERDRAFT_FACTOR.with(|h| h.replace(DEFAULT_MEMORY_OVERDRAFT_FACTOR));
set_account_balance((new_limit as f64 * factor) as isize);
Ok(())
}
}
Expand Down Expand Up @@ -162,13 +198,45 @@ mod tests {
);
pool.try_resize(50_000).unwrap();

// Balance is reset to limit * HEADROOM_FACTOR, minus a small
// drift from this test thread's own allocs between set and read.
let expected = (50_000.0 * HEADROOM_FACTOR) as isize;
// Balance is reset to limit * DEFAULT_MEMORY_OVERDRAFT_FACTOR,
// minus a small drift from this test thread's own allocs between
// set and read.
let expected = (50_000.0 * DEFAULT_MEMORY_OVERDRAFT_FACTOR) as isize;
let bal = account_balance();
assert!(
(50_000..=expected).contains(&bal),
"balance not in expected range: got {bal}, expected ≤ {expected}"
);
}

#[test]
fn overdraft_factor_override_is_consumed_by_try_resize() {
set_thread_context_id(next_context_id());

let default_size = 1_000_000;
let pool = AccountingMemoryPool::new(
Arc::new(GreedyMemoryPool::new(default_size)),
default_size,
);

set_memory_overdraft_factor(2.0);
pool.try_resize(100_000).unwrap();
// 2.0 * 100K = 200K; subtract worst-case drift from this thread's
// own allocs between the resize and the read.
let bal = account_balance();
assert!(
(100_000..=200_000).contains(&bal),
"first resize used wrong factor: got {bal}, expected ≤ 200_000"
);

// Override consumed: next resize falls back to default.
pool.try_resize(100_000).unwrap();
let bal = account_balance();
let expected_default = (100_000.0 * DEFAULT_MEMORY_OVERDRAFT_FACTOR) as isize;
assert!(
(100_000..=expected_default).contains(&bal),
"second resize did not fall back to default factor: got {bal}, \
expected ≤ {expected_default}"
);
}
}
Loading
Loading