Skip to content

[flink] Expose scan.bucket for single-bucket manifest pruning#8117

Open
wwj6591812 wants to merge 1 commit into
apache:masterfrom
wwj6591812:add_scan_bucket_0604
Open

[flink] Expose scan.bucket for single-bucket manifest pruning#8117
wwj6591812 wants to merge 1 commit into
apache:masterfrom
wwj6591812:add_scan_bucket_0604

Conversation

@wwj6591812

Copy link
Copy Markdown
Contributor

Background

ReadBuilder.withBucket(int) and manifest scanning already support reading a single bucket, but Flink SQL had no connector option to expose it. Operators often need to debug or scan one bucket of a fixed-bucket primary-key table without reading all buckets.

Why this PR

Expose scan.bucket in Flink so users can run:

SELECT * FROM t /*+ OPTIONS('scan.bucket' = '0') */

and plan splits only for that bucket.

What changes

  • Add FlinkConnectorOptions.SCAN_BUCKET (scan.bucket).
  • ScanBucketUtils.applyScanBucket() reads the option and calls ReadBuilder.withBucket().
  • Wire into FlinkSourceBuilder and FlinkTableSource (batch and split inference).
  • Validate in ReadBuilderImpl.withBucket() (canonical read path): non-negative bucket id, FileStoreTable only, not postpone-bucket mode, bucket < table.bucket when table bucket > 0.

Stage optimized: scan / manifest planning — fewer manifest entries and splits before read. No change to merge or per-record logic.

Tests

  • ScanBucketUtilsTest — invalid bucket id fails fast.
  • ScanBucketITCase — SQL with scan.bucket matches reading that bucket via the table API.

Test plan

  • mvn test -pl paimon-flink/paimon-flink-common -am -Dtest=ScanBucketUtilsTest,ScanBucketITCase

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 17c2722 to 7e8c5d8 Compare June 4, 2026 09:45
@wwj6591812

Copy link
Copy Markdown
Contributor Author

The failed test is not related to my modifications.

@JingsongLi

Copy link
Copy Markdown
Contributor

The validation here still allows scan.bucket on non-fixed-bucket tables.

validateSpecifiedBucket rejects postpone bucket tables, but it does not require a fixed bucket mode or bucket > 0. For bucket-unaware/dynamic bucket tables, CoreOptions.bucket() can be <= 0, so the upper-bound check is skipped and the scan proceeds with physical bucket pruning. That can turn an invalid configuration into an empty/incorrect result instead of a clear error. The generated doc says this option is only supported for fixed-bucket primary-key tables (bucket > 0).

Could we enforce that here, e.g. require the fixed bucket mode and configured bucket count > 0, and also check primary-key-ness if the option is intended only for primary-key tables?

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 7e8c5d8 to 9b67691 Compare June 8, 2026 02:30
@wwj6591812

Copy link
Copy Markdown
Contributor Author

The validation here still allows scan.bucket on non-fixed-bucket tables.

validateSpecifiedBucket rejects postpone bucket tables, but it does not require a fixed bucket mode or bucket > 0. For bucket-unaware/dynamic bucket tables, CoreOptions.bucket() can be <= 0, so the upper-bound check is skipped and the scan proceeds with physical bucket pruning. That can turn an invalid configuration into an empty/incorrect result instead of a clear error. The generated doc says this option is only supported for fixed-bucket primary-key tables (bucket > 0).

Could we enforce that here, e.g. require the fixed bucket mode and configured bucket count > 0, and also check primary-key-ness if the option is intended only for primary-key tables?

Hi, thanks for your review.
I have updated code, now alidateSpecifiedBucket requires HASH_FIXED bucket mode, primary keys, and bucket > 0. Added core UT (ReadBuilderImplTest) and extended Flink UT/IT for invalid table types.

Please CC, Thx.
@JingsongLi

if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
ScanBucketUtils.applyScanBucket(table, readBuilder, conf);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies scan.bucket only to the normal source read builder. Aggregate pushdown plans splits through AggregatePushDownUtils.planSplits(...) with a separate ReadBuilder, so queries such as SELECT COUNT() FROM T /+ OPTIONS(scan.bucket=0) */ can still aggregate all buckets while non-aggregate reads scan only the requested bucket. Please either apply SCAN_BUCKET in the aggregate pushdown planning path as well, or disable aggregate pushdown when scan.bucket is set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @JingsongLi.
I have applied ScanBucketUtils.applyScanBucket in AggregatePushDownUtils.planSplits(...) as well, so the aggregate pushdown planning path now respects the scan.bucket option too.

"Bucket scan is only supported for fixed-bucket tables, but got bucket mode %s.",
fileStoreTable.bucketMode());
checkArgument(
!fileStoreTable.schema().primaryKeys().isEmpty(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the public core ReadBuilder.withBucket(...) API reject fixed-bucket append tables, even though bucket-level manifest pruning is useful and valid there too. If the primary-key restriction is only meant for the Flink scan.bucket option, could we keep ReadBuilder.withBucket(...) generic and enforce the Flink option restriction in the Flink scan.bucket path instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @JingsongLi.
I have removed the primary-key restriction from ReadBuilderImpl.validateSpecifiedBucket(...) to keep the core ReadBuilder.withBucket(...) API generic for fixed-bucket append tables. The primary-key check is now enforced inside ScanBucketUtils.applyScanBucket(...), which is the Flink scan.bucket option path. I also updated ReadBuilderImplTest accordingly.

@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch 2 times, most recently from db92906 to 3b01bc0 Compare June 10, 2026 04:02
@wwj6591812 wwj6591812 force-pushed the add_scan_bucket_0604 branch from 3b01bc0 to 9793995 Compare June 10, 2026 06:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants