Skip to content

[flink] Fix RemoteTableQuery key serializer to use trimmed primary keys (#8145)#8146

Open
jordepic wants to merge 2 commits into
apache:masterfrom
jordepic:remote-table-query-trimmed-primary-key
Open

[flink] Fix RemoteTableQuery key serializer to use trimmed primary keys (#8145)#8146
jordepic wants to merge 2 commits into
apache:masterfrom
jordepic:remote-table-query-trimmed-primary-key

Conversation

@jordepic
Copy link
Copy Markdown
Contributor

@jordepic jordepic commented Jun 6, 2026

Purpose

(This change is just a bug fix, it should not have further implications)

RemoteTableQuery built its lookup-key serializer from the table's full primary keys (table.primaryKeys()), but the lookup contract (PrimaryKeyPartialLookupTable#get) passes lookup() the trimmed primary key (primary keys minus partition keys -- the native LSM key within a (partition, bucket)). For a partitioned primary-key table these differ, so the serializer reads the wrong fields: it interprets field 0 of the trimmed key as the first full-PK field.

When the partition key and trimmed key happen to share a type this silently produces a usable key (the extra trailing field is ignored on the server side), which is why it went unnoticed for unpartitioned tables and same-type keys. When the types differ -- e.g. PRIMARY KEY (pt INT, k STRING) PARTITIONED BY (pt), where the trimmed key is (k STRING) -- it throws ClassCastException in InternalRowSerializer.toBinaryRow. The local executor (LocalTableQuery) consumes the trimmed key natively, so only the remote path is affected.

Build keySerializer from schema().trimmedPrimaryKeys() instead, matching the key the caller actually provides. Unpartitioned tables are unaffected, since trimmed primary keys equal the full primary keys there.

Tests

Add RemoteLookupJoinITCase#testLookupPartitionedRemoteTable covering a partitioned table with a differently-typed trimmed key; it fails with the ClassCastException before this change and passes after.

…ys (apache#8145)

RemoteTableQuery built its lookup-key serializer from the table's full
primary keys (table.primaryKeys()), but the lookup contract
(PrimaryKeyPartialLookupTable#get) passes lookup() the trimmed primary key
(primary keys minus partition keys -- the native LSM key within a
(partition, bucket)). For a partitioned primary-key table these differ, so
the serializer reads the wrong fields: it interprets field 0 of the trimmed
key as the first full-PK field.

When the partition key and trimmed key happen to share a type this silently
produces a usable key (the extra trailing field is ignored on the server
side), which is why it went unnoticed for unpartitioned tables and same-type
keys. When the types differ -- e.g. PRIMARY KEY (pt INT, k STRING) PARTITIONED
BY (pt), where the trimmed key is (k STRING) -- it throws ClassCastException
in InternalRowSerializer.toBinaryRow. The local executor (LocalTableQuery)
consumes the trimmed key natively, so only the remote path is affected.

Build keySerializer from schema().trimmedPrimaryKeys() instead, matching the
key the caller actually provides. Unpartitioned tables are unaffected, since
trimmed primary keys equal the full primary keys there.

Add RemoteLookupJoinITCase#testLookupPartitionedRemoteTable covering a
partitioned table with a differently-typed trimmed key; it fails with the
ClassCastException before this change and passes after.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@jordepic
Copy link
Copy Markdown
Contributor Author

jordepic commented Jun 6, 2026

@JingsongLi would you mind taking a look here when you have a chance? Thank you!

…ys (apache#8145)

RemoteTableQuery built its lookup-key serializer from the table's full
primary keys (table.primaryKeys()), but the lookup contract
(PrimaryKeyPartialLookupTable#get) passes lookup() the trimmed primary key
(primary keys minus partition keys -- the native LSM key within a
(partition, bucket)). For a partitioned primary-key table these differ, so
the serializer reads the wrong fields: it interprets field 0 of the trimmed
key as the first full-PK field.

When the partition key and trimmed key happen to share a type this silently
produces a usable key (the extra trailing field is ignored on the server
side), which is why it went unnoticed for unpartitioned tables and same-type
keys. When the types differ -- e.g. PRIMARY KEY (pt INT, k STRING) PARTITIONED
BY (pt), where the trimmed key is (k STRING) -- it throws ClassCastException
in InternalRowSerializer.toBinaryRow. The local executor (LocalTableQuery)
consumes the trimmed key natively, so only the remote path is affected.

Build keySerializer from schema().trimmedPrimaryKeys() instead, matching the
key the caller actually provides. Unpartitioned tables are unaffected, since
trimmed primary keys equal the full primary keys there.

Add RemoteLookupJoinITCase#testLookupPartitionedRemoteTable covering a
partitioned table with a differently-typed trimmed key; it fails with the
ClassCastException before this change and passes after.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant