[common] Fix OOM when writing/compacting table with large records#7621
[common] Fix OOM when writing/compacting table with large records#7621yugan95 wants to merge 3 commits into
Conversation
|
cc @LsomeYeah to take a review. |
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the detailed fix. The RowHelper / InternalRowSerializer cleanup and BinaryRowSerializer shrink path look reasonable to me, and the focused common tests pass locally:
mvn -pl paimon-common -Dtest=RowHelperTest,HeapBytesVectorReserveBytesTest,BinaryRowSerializerShrinkTest -DskipITs -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -DfailIfNoTests=false test
However, I think the HeapBytesVector.reserveBytes part still needs changes before merging:
-
When
newCapacity > MAX_ARRAY_SIZE >> 1, the code allocatesMAX_ARRAY_SIZEbytes. For example, a ~1.1GB required capacity would try to allocate ~2GB, which can create a new OOM risk. This also contradicts the comment saying it falls back to the exact required capacity. It should allocate the exact required capacity when doubling would exceed the max. -
The required capacity is still computed with
intarithmetic before enteringreserveBytes:bytesAppended + lengthinputByteArrayandstart.length * value.lengthinfill. These can overflow to a negative or smaller positive value before theMAX_ARRAY_SIZEguard sees them, so the guard can be skipped and the failure later becomes an unclearArrayIndexOutOfBoundsException/ bad offset path. Please compute required capacity withlong(or makereserveBytesacceptlong) and throw the clear error before downcasting.
It would also be good to add tests for these arithmetic edge cases without requiring huge allocations, e.g. by extracting the capacity-growth calculation into a small helper or otherwise testing the overflow checks directly.
|
@leaves12138 Thanks for the review! All three points are addressed in the updated commit:
When requiredCapacity > MAX_ARRAY_SIZE >> 1, the code now allocates the exact required capacity instead of MAX_ARRAY_SIZE. This avoids the unnecessary ~2GB allocation for a ~1.1GB request.
reserveBytes now accepts long. The callers compute required capacity with long arithmetic: putByteArray: (long) bytesAppended + length
The capacity-growth logic is extracted into a package-private static int calculateNewBytesCapacity(long) method, tested directly with 7 new cases covering: Normal doubling for small values |
LsomeYeah
left a comment
There was a problem hiding this comment.
Thanks for the detailed fix! The heap-dump-based root cause breakdown is very clear. The HeapBytesVector overflow fix and Parquet config pass-through both look great.
We have a few concerns about the RowHelper / BinaryRowSerializer parts though — would love to hear your thoughts:
- 4MB threshold lacks a basis. Data patterns vary a lot across workloads — a table with uniform 1MB records, one with mixed 1KB/100MB records, and one with sustained 5MB records all behave very differently around a fixed 4MB cutoff.
- Reuse mechanism effectively defeated near the threshold. For sustained 5–10MB records, every call triggers release-and-rebuild or shrink-and-reallocate, paying allocation cost on every record. This defeats the purpose of these reuse buffers.
- Hot-path behavior change with no opt-out. The finally { resetIfTooLarge() } and the new shrink branch change default behavior for every user on upgrade, not only those experiencing OOM.
- Prior art. Worth checking how Spark's UnsafeExternalSorter or similar engines handle reuse-buffer growth — there may be a more principled heuristic (hysteresis, memory-pressure-based release) that avoids the thrash pattern.
JingsongLi
left a comment
There was a problem hiding this comment.
Excellent bug analysis. The heap dump investigation identifying four independent root causes is thorough. Comments per fix:
1. RowHelper buffer release (resetIfTooLarge):
- The
finallyblock inInternalRowSerializer.serialize()is critical — EOFException as a normal signal is a subtle but real issue. Good catch. - The 4MB threshold is reasonable. Consider making it configurable via a system property for operators with different memory profiles, or at least document why 4MB was chosen (e.g., 256 buckets × 4MB = 1GB baseline, which leaves headroom).
resetIfTooLarge()checksreuseWriter.getSegments().size()— is this the number of segments or the total bytes? If it's segment count, the threshold of 4MB doesn't directly match.
2. BinaryRowSerializer shrink:
- The shrink-on-read approach (replacing oversized buffer during
deserialize) is correct. The threshold matches RowHelper's. - Note: shrinking on every
deserializewhen the current record is small but the buffer is > 4MB means we're allocating every time until a large record appears again. This is acceptable since the alternative (OOM) is worse.
3. HeapBytesVector overflow fix:
- Promoting to
longbefore multiplication is the right fix. ThecalculateNewBytesCapacitymethod with MAX_ARRAY_SIZE fallback is clean. - Good that you made this
staticand package-visible for testing.
4. Parquet config pass-through:
- This is a separate concern from the OOM fix. Could this be a standalone PR? It changes the behavior for all Parquet writes, not just large-record scenarios.
Test coverage looks comprehensive with dedicated test classes for each fix. Well done.
One concern: The resetIfTooLarge() adds overhead on every serialize call (a null check + size check). For normal-sized records this should be negligible, but verify there's no measurable regression in microbenchmarks for small-record-heavy workloads.
|
@LsomeYeah Thanks for the thoughtful review! All four points are addressed in the updated commit: 1. 4MB threshold basis The threshold is derived from the production scenario that surfaced this bug: 256 buckets × 4MB = 1GB baseline, which leaves reasonable headroom in a typical 4–8GB TaskManager heap. 4MB is high enough to avoid interfering with normal workloads (most records are well under 1MB) while preventing the pathological accumulation we saw in heap dumps (256 × 100MB+ = tens of GB). 2. Thrashing near the threshold — fixed with hysteresis Good catch — this was a real gap. I've updated both
This means:
Tests updated to verify the hysteresis behavior in both classes. 3. Hot-path overhead With hysteresis, the overhead on the hot path is 1 null check + 2 int comparisons — all on fields already in L1 cache from the preceding serialize(). For normal-sized records the buffer never reaches 4MB, so 4. Prior art The hysteresis approach here is inspired by the same principle behind Spark's Also, the Parquet config pass-through has been moved to #7956. |
|
@JingsongLi Thanks for the detailed per-fix comments! Addressing each point: 1. RowHelper — Good question. I've also added a hysteresis guard per @LsomeYeah's feedback: Configurability: I investigated making the threshold a proper table-level option via CoreOptions. However, these thresholds live in This would break the config-free invariant of paimon-common's serializer layer and touch a wide set of classes across modules. I think a fixed 4MB is sufficient for the initial fix — the value is grounded in a concrete production scenario (256 buckets x 4MB = 1GB), and the hysteresis guard makes the threshold much less sensitive. If a follow-up is needed, it could be done as a standalone PR. 2. BinaryRowSerializer shrink — now improved with hysteresis. The 3. HeapBytesVector — addressed in commit f1954ff. 4. Parquet config pass-through — moved to #7956. 5. Performance overhead — the check is 1 null check + 2 int comparisons, all on hot fields already in cache. For small-record workloads (buffer < 4MB), |
…n RowDataParquetBuilder (#7956) Pass through parquet statistics and page-size-check configuration in `RowDataParquetBuilder`. Currently `RowDataParquetBuilder` does not forward the following Parquet config keys to the writer: - `parquet.statistics.truncate.length` - `parquet.columnindex.truncate.length` - `parquet.page.size.row.check.min` - `parquet.page.size.row.check.max` Without these, users cannot tune Parquet page-size checking behavior or control the truncation length of statistics and column indexes. This is especially relevant for tables with large records, where the default page-size check thresholds can lead to oversized pages. Split out from #7621 per reviewer feedback, as this is an independent enhancement.
JingsongLi
left a comment
There was a problem hiding this comment.
Since this is the most critical link, can you break it down into smaller points to contribute? Each PR only completes one simple task, and each task is difficult to choose.
|
@JingsongLi Good suggestion — I've split this into three independent PRs, one per root cause:
The Parquet config pass-through is already merged in #7956. Each PR is self-contained with its own tests and no cross-dependencies. |
|
All fixes have been split into independent PRs and merged:
Closing this PR. Thanks for the reviews! |
Purpose
Linked issue: close #7620
Fix OOM when writing table with large records (100MB+) and many buckets (e.g. 256) due to unbounded buffer growth in sort, merge and compaction paths. Each bucket's writer independently holds its own sort buffer, merge channels, and compaction readers. When a large record inflates an internal reuse buffer, that bloated buffer is retained per-bucket, causing memory usage to quickly exceed available heap.
Heap dump analysis identified three independent root causes:
1. Sort path —
RowHelperinternal buffer never shrinksRowHelper.reuseWritergrows its internalMemorySegmentfor large records, butBinaryRowWriter.reset()only resets the cursor without releasing the oversized segment. Additionally,InternalRowSerializer.serialize()can exit viaEOFException(a normal signal when the sort buffer is full), skipping any cleanup of the bloated buffer.2. Merge path —
BinaryRowSerializer.deserialize(reuse)only grows, never shrinksEach merge channel holds a
BinaryRowreuse instance. When a large record is deserialized, the backingMemorySegmentgrows to fit it but is never shrunk for subsequent small records. Withmax-num-file-handles(default 128) channels each retaining a 100MB+ buffer, memory usage explodes.3. Compaction read path —
HeapBytesVector.reserveBytes()integer overflowreserveBytes()computesnewCapacity * 2using plain multiplication. WhennewCapacityexceeds ~1.07 billion bytes, this overflowsInteger.MAX_VALUE, causingNegativeArraySizeExceptionor silent data corruption.Changes
RowHelper: addresetIfTooLarge()with hysteresis — release internal buffer only when the segment exceeds 4MB and the last written record is smaller than 4MB. This avoids thrashing for sustained large-record workloads while reclaiming memory when the workload transitions back to small records.InternalRowSerializer: callresetIfTooLarge()infinallyblock ofserialize()andserializeToPages()to handleEOFExceptionexit path.BinaryRowSerializer: add shrink logic with hysteresis indeserialize(reuse)— reallocate only when existing buffer > 4MB and current record < 4MB.HeapBytesVector: promote tolongbefore capacity arithmetic, extractcalculateNewBytesCapacity(long)helper, cap atMAX_ARRAY_SIZE = Integer.MAX_VALUE - 8, return exact required capacity when doubling would exceed max, throw clear error on overflow.Tests
RowHelperTest— validates hysteresis: buffer retained for large records, released only on transition to small recordsBinaryRowSerializerShrinkTest— validates hysteresis: buffer retained for consecutive large records, shrunk on transition to small recordsHeapBytesVectorReserveBytesTest— validates overflow-safereserveBytes()growth,calculateNewBytesCapacityedge cases (doubling, exact capacity, overflow), and data correctnessAPI and Format
N/A — no public API or format changes.
Documentation
N/A