[spark] supports updating blobs through DataEvolution MergeInto#8175
[spark] supports updating blobs through DataEvolution MergeInto#8175steFaiz wants to merge 7 commits into
Conversation
a72f8c8 to
b43f513
Compare
| // The final output is composed by updated columns, metadata columns and blob marker columns. | ||
| // Marker columns are used to mark whether a blob field should be written with placeholder | ||
| val rawBlobUpdateColumns = updateColumnsSorted.filter(isRawBlobUpdateColumn) | ||
| val rawBlobMarkerNamesByColumn = rawBlobUpdateColumns.zipWithIndex.map { |
There was a problem hiding this comment.
The internal marker column names can collide with real target columns. For example, a table can legally have a column named __paimon_raw_blob_placeholder_0; if a MERGE updates that column and a raw BLOB in the same statement, mergeOutput will contain two attributes with the same name. Then reorderPartialWriteColumns selects by quoted name and writePartialFields resolves the marker with data.schema.fieldIndex, so Spark can either report an ambiguous reference or bind the user column as the boolean marker. Could we generate marker names that are guaranteed not to collide with the write columns/source output, or carry the marker attributes through by exprId instead of resolving them by name?
There was a problem hiding this comment.
Thanks! Fixed, now picking new names will loop and increment the index util find some non-existing columns
| base.firstRowId(), | ||
| base.rowCount())); | ||
| if (base.firstRowId() != null && !dedicatedStorageFile(base.fileName())) { | ||
| existingRanges.put(base.firstRowId(), base.rowCount()); |
There was a problem hiding this comment.
Partition level has been removed, cc @leaves12138 to take a look.
| } | ||
|
|
||
| Set<FileRowIdKey> existingIndex = new HashSet<>(); | ||
| NavigableMap<Long, Long> existingRanges = new TreeMap<>(); |
There was a problem hiding this comment.
Can you use RowRangeIndex? Add method containsExactly.
| && rowCount == that.rowCount | ||
| && Objects.equals(partition, that.partition); | ||
| } | ||
| private static boolean rowIdRangeCovered( |
There was a problem hiding this comment.
Can you add containsExactly to RowRangeIndex? This seems common
| return Collections.unmodifiableList(ranges); | ||
| } | ||
|
|
||
| public boolean contains(long start, long end) { |
There was a problem hiding this comment.
There is already contains method in this class.
There was a problem hiding this comment.
Thanks for your remind! I've noticed that this method is added recently in master branch. I've rebased the master and cleaned my code
a4e21ac to
b806157
Compare
Purpose
Parts of #7881
Supports Spark:
where raw_blob means blobs stored in BlobFormat Files
Implementation
Introduce several marker columns during data evolution:
This is because spark only allow literal columns for basic types. i.e. BlobPlaceholder is not allowed.
Each blob column have one marker column, representing whether write blob values or
BlobPlaceholder.INSTANCESide Effects
checkRowIdExistenceto:a. If new files are normal files: should have an exactly matching row ranges
b. If new files are special storage files: should be exactly a sub range of an existing one
If all blob records at the same row id are placeholder, it's deemed as NULL now (previously it's illegal)
Tests