Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public class RowHelper implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Threshold in bytes for releasing the internal reuse buffer. When big records are written, the
* BinaryRowWriter's internal segment can grow very large via grow(). The {@link
* #resetIfTooLarge()} method checks this threshold and releases the bloated
* reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely.
*/
private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB

private final FieldGetter[] fieldGetters;
private final ValueSetter[] valueSetters;
private final boolean[] writeNulls;
Expand Down Expand Up @@ -81,6 +89,21 @@ public void copyInto(InternalRow row) {
reuseWriter.complete();
}

/**
* Release the internal reuse buffer if the segment exceeds the threshold AND the last written
* record is small. This hysteresis avoids thrashing when records are consistently large, while
* still reclaiming memory when the workload transitions back to small records.
*/
public void resetIfTooLarge() {
if (reuseWriter != null
&& reuseWriter.getSegments() != null
&& reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD
&& reuseRow.getSizeInBytes() < REUSE_RELEASE_THRESHOLD) {
reuseRow = null;
reuseWriter = null;
}
}

public BinaryRow reuseRow() {
return reuseRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void reset() {

@Override
public void putByteArray(int elementNum, byte[] sourceBuf, int start, int length) {
reserveBytes(bytesAppended + length);
long requiredCapacity = (long) bytesAppended + length;
reserveBytes(requiredCapacity);
System.arraycopy(sourceBuf, start, buffer, bytesAppended, length);
this.start[elementNum] = bytesAppended;
this.length[elementNum] = length;
Expand All @@ -96,7 +97,8 @@ public void appendByteArray(byte[] value, int offset, int length) {

@Override
public void fill(byte[] value) {
reserveBytes(start.length * value.length);
long requiredCapacity = (long) start.length * value.length;
reserveBytes(requiredCapacity);
for (int i = 0; i < start.length; i++) {
System.arraycopy(value, 0, buffer, i * value.length, value.length);
}
Expand All @@ -106,19 +108,35 @@ public void fill(byte[] value) {
Arrays.fill(this.length, value.length);
}

private void reserveBytes(int newCapacity) {
if (newCapacity > buffer.length) {
int newBytesCapacity = newCapacity * 2;
try {
buffer = Arrays.copyOf(buffer, newBytesCapacity);
} catch (NegativeArraySizeException e) {
throw new RuntimeException(
String.format(
"The new claimed capacity %s is too large, will overflow the INTEGER.MAX after multiply by 2. "
+ "Try reduce `read.batch-size` to avoid this exception.",
newCapacity),
e);
}
/** The maximum size of array to allocate. Some VMs reserve header words in an array. */
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

private void reserveBytes(long requiredCapacity) {
if (requiredCapacity > buffer.length) {
buffer = Arrays.copyOf(buffer, calculateNewBytesCapacity(requiredCapacity));
}
}

/**
* Calculate the new buffer capacity for the given required capacity. Visible for testing.
*
* <p>The strategy is: double the required capacity for amortized growth when safe. If doubling
* would exceed {@link #MAX_ARRAY_SIZE}, fall back to the exact required capacity. Throws if the
* required capacity itself exceeds {@link #MAX_ARRAY_SIZE}.
*/
static int calculateNewBytesCapacity(long requiredCapacity) {
if (requiredCapacity > MAX_ARRAY_SIZE) {
throw new RuntimeException(
String.format(
"The required byte buffer capacity %d exceeds the maximum array size %d. "
+ "Try reducing `read.batch-size` to avoid this exception.",
requiredCapacity, MAX_ARRAY_SIZE));
}
int intCapacity = (int) requiredCapacity;
if (intCapacity <= (MAX_ARRAY_SIZE >> 1)) {
return intCapacity << 1;
} else {
return intCapacity;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public BinaryRow deserialize(DataInputView source) throws IOException {
return row;
}

/**
* Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This
* prevents accumulation of large byte arrays when a few large records inflate the reuse buffer
* and subsequent small records never trigger reallocation.
*/
private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB

public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException {
MemorySegment[] segments = reuse.getSegments();
checkArgument(
Expand All @@ -88,6 +95,13 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc

int length = source.readInt();
if (segments == null || segments[0].size() < length) {
// Need a larger buffer
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
} else if (segments[0].size() > REUSE_SHRINK_THRESHOLD && length < REUSE_SHRINK_THRESHOLD) {
// Hysteresis: only shrink when the buffer is oversized AND the current record is
// small. This avoids thrashing (release-and-rebuild on every record) when records
// are consistently large (e.g. 5-10MB), while still reclaiming memory when the
// workload transitions back to small records.
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
}
source.readFully(segments[0].getArray(), 0, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ public InternalRowSerializer duplicate() {

@Override
public void serialize(InternalRow row, DataOutputView target) throws IOException {
binarySerializer.serialize(toBinaryRow(row), target);
try {
binarySerializer.serialize(toBinaryRow(row), target);
} finally {
// Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer
// for large records (e.g. 100MB+). The serialization can exit via EOFException
// thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is
// full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal.
// Without finally, the bloated buffer would never be released on that path.
rowHelper.resetIfTooLarge();
}
}

@Override
Expand Down Expand Up @@ -132,7 +141,13 @@ public InternalRow createReuseInstance() {
@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
throws IOException {
return binarySerializer.serializeToPages(toBinaryRow(row), target);
try {
return binarySerializer.serializeToPages(toBinaryRow(row), target);
} finally {
// Same as serialize(): must use finally because EOFException may bypass normal
// return when the sort buffer is full.
rowHelper.resetIfTooLarge();
}
}

@Override
Expand Down
114 changes: 114 additions & 0 deletions paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data;

import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;

import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */
class RowHelperTest {

@Test
void testResetIfTooLargeReleasesAfterTransitionToSmallRecord() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));

// Write a large record (> 4MB) to inflate the internal buffer
byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB
Arrays.fill(largePayload, (byte) 'x');
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
largeRow.setRowKind(RowKind.INSERT);
helper.copyInto(largeRow);

assertThat(helper.reuseRow()).isNotNull();

// Hysteresis: resetIfTooLarge() should NOT release when last record is large
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNotNull();

// Now write a small record — buffer is still oversized from the large record
GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new byte[10]);
smallRow.setRowKind(RowKind.INSERT);
helper.copyInto(smallRow);

// resetIfTooLarge() should release now: buffer > 4MB but last record < 4MB
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNull();
}

@Test
void testResetIfTooLargeKeepsSmallBuffer() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT()));

// Write a small record (< 4MB)
GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42);
smallRow.setRowKind(RowKind.INSERT);
helper.copyInto(smallRow);

assertThat(helper.reuseRow()).isNotNull();

// resetIfTooLarge() should NOT release the small buffer
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNotNull();
}

@Test
void testResetIfTooLargeBeforeCopyInto() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING()));

// reuseRow is null before any copyInto
assertThat(helper.reuseRow()).isNull();

// resetIfTooLarge() should be safe to call when reuseRow is null
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNull();
}

@Test
void testReuseIsRecreatedAfterRelease() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));

// Write a large record to inflate the buffer, then a small record to trigger release
byte[] largePayload = new byte[5 * 1024 * 1024];
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
largeRow.setRowKind(RowKind.INSERT);
helper.copyInto(largeRow);

GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), new byte[10]);
smallRow.setRowKind(RowKind.INSERT);
helper.copyInto(smallRow);

// Buffer is oversized + last record is small → release
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNull();

// Write another small record — reuseRow should be recreated
helper.copyInto(smallRow);
assertThat(helper.reuseRow()).isNotNull();

// Small buffer should survive resetIfTooLarge()
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNotNull();
}
}
Loading