MDEV-37009 bulk insert into vector index#5150
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces bulk insert support for hierarchical navigable small world (mhnsw) indexes during table copy operations. The reviewer identified several critical issues, including compilation errors from accessing bulk_insert_active through the wrong pointer, an uninitialized member variable, and an ignored return value during bulk insert initialization. Suggestions were also made to reset the active flag on failure and correct formatting to match the repository's style.
| DBUG_ASSERT(s->hlindexes() == (hlindex != NULL)); | ||
| if (hlindex && hlindex->in_use) | ||
| { | ||
| if (hlindex->bulk_insert_active) | ||
| return mhnsw_bulk_insert_row(this, key_info + s->keys); | ||
| else | ||
| return mhnsw_insert(this, key_info + s->keys); | ||
| } | ||
| return 0; |
There was a problem hiding this comment.
The member bulk_insert_active was added to the TABLE struct in sql/table.h, but here it is accessed via hlindex->bulk_insert_active. Since hlindex is not of type TABLE*, this will cause a compilation error. It should be accessed directly as bulk_insert_active (or this->bulk_insert_active).
Additionally, please update the indentation to 2 spaces to match the repository's implicit style guide.
DBUG_ASSERT(s->hlindexes() == (hlindex != NULL));
if (hlindex && hlindex->in_use)
{
if (bulk_insert_active)
return mhnsw_bulk_insert_row(this, key_info + s->keys);
else
return mhnsw_insert(this, key_info + s->keys);
}
return 0;| int TABLE::hlindexes_bulk_insert_begin(ha_rows rows) | ||
| { | ||
| if (hlindex && hlindex->in_use) | ||
| { | ||
| hlindex->bulk_insert_active= true; | ||
| return mhnsw_bulk_insert_begin(this, key_info + s->keys, rows); | ||
| } | ||
| return 0; | ||
| } | ||
|
|
||
| int TABLE::hlindexes_bulk_insert_end() | ||
| { | ||
| if (hlindex && hlindex->in_use) | ||
| { | ||
| hlindex->bulk_insert_active= false; | ||
| return mhnsw_bulk_insert_end(this, key_info + s->keys); | ||
| } | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
There are three issues here:
bulk_insert_activeis a member ofTABLE, nothlindex, so accessing it viahlindex->bulk_insert_activewill cause a compilation error.- If
mhnsw_bulk_insert_beginfails (returns a non-zero error code),bulk_insert_activeshould be reset tofalseso that subsequent inserts do not incorrectly attempt bulk insertion. - The indentation should be updated to 2 spaces to match the repository's implicit style guide.
int TABLE::hlindexes_bulk_insert_begin(ha_rows rows)
{
if (hlindex && hlindex->in_use)
{
bulk_insert_active= true;
int err= mhnsw_bulk_insert_begin(this, key_info + s->keys, rows);
if (err)
bulk_insert_active= false;
return err;
}
return 0;
}
int TABLE::hlindexes_bulk_insert_end()
{
if (hlindex && hlindex->in_use)
{
bulk_insert_active= false;
return mhnsw_bulk_insert_end(this, key_info + s->keys);
}
return 0;
}| if (!to->s->long_unique_table) | ||
| { | ||
| to->file->ha_start_bulk_insert(from->file->stats.records, | ||
| ignore ? 0 : HA_CREATE_UNIQUE_INDEX_BY_SORT); | ||
| bulk_insert_started= 1; | ||
| to->file->ha_start_bulk_insert(from->file->stats.records, | ||
| ignore ? 0 : HA_CREATE_UNIQUE_INDEX_BY_SORT); | ||
| bulk_insert_started= 1; | ||
|
|
||
| if (to->s->hlindexes()) | ||
| { | ||
| to->hlindexes_bulk_insert_begin(from->file->stats.records); | ||
| hlindex_bulk_started= 1; | ||
| } | ||
| } |
There was a problem hiding this comment.
The return value of to->hlindexes_bulk_insert_begin is currently ignored. If the bulk insert initialization fails, we should handle the error, print it, and avoid setting hlindex_bulk_started to 1 so that we do not proceed with the bulk insert or attempt to end it later.
Additionally, please update the indentation to 2 spaces to match the repository's implicit style guide.
if (!to->s->long_unique_table)
{
to->file->ha_start_bulk_insert(from->file->stats.records,
ignore ? 0 : HA_CREATE_UNIQUE_INDEX_BY_SORT);
bulk_insert_started= 1;
if (to->s->hlindexes())
{
if ((error= to->hlindexes_bulk_insert_begin(from->file->stats.records)))
{
to->file->print_error(error, MYF(0));
}
else
{
hlindex_bulk_started= 1;
}
}
}| if (hlindex_bulk_started && to->hlindexes_bulk_insert_end() && error <= 0) | ||
| { | ||
| if (!thd->is_error()) | ||
| to->file->print_error(my_errno, MYF(0)); | ||
| error= 1; | ||
| } | ||
| hlindex_bulk_started=0; |
There was a problem hiding this comment.
Please update the indentation to 2 spaces and add proper spacing around operators (e.g., hlindex_bulk_started = 0;) to match the repository's implicit style guide.
if (hlindex_bulk_started && to->hlindexes_bulk_insert_end() && error <= 0)
{
if (!thd->is_error())
to->file->print_error(my_errno, MYF(0));
error= 1;
}
hlindex_bulk_started= 0;| */ | ||
| bool alias_name_used; /* true if table_name is alias */ | ||
| bool get_fields_in_item_tree; /* Signal to fix_field */ | ||
| bool bulk_insert_active; /* mhnsw bulk_insert_started flag */ |
There was a problem hiding this comment.
…orrectly set the grefs already faster by ~15%
2c44b07 to
ffaa6a1
Compare
iMineLink
left a comment
There was a problem hiding this comment.
Thank you for sharing the first draft @shabbann, I did a preliminary review and found minor things and a missing initialization of MHNSW_Share::bulk_active worth checking.
Possibly out of scope, but it may be interesting to enable falling back to normal insert if some memory budget is exceeded for the bulk insert, to avoid unexpected OOM issues.
| @@ -510,7 +510,7 @@ class MHNSW_Share : public Sql_alloc | |||
| const uint M; | |||
| metric_type metric; | |||
| bool use_subdist; | |||
|
|
|||
| bool bulk_active; | |||
There was a problem hiding this comment.
Can you explicitely initialize this to false?
|
|
||
| if (to->s->hlindexes()) | ||
| { | ||
| to->hlindexes_bulk_insert_begin(from->file->stats.records); |
There was a problem hiding this comment.
I think error handling is needed here.
| return err; | ||
| } | ||
|
|
||
| if (int err= graph->file->ha_end_bulk_insert()) |
There was a problem hiding this comment.
I think this needs to be placed inside a SCOPE_EXIT or similar, to avoid premature exit on error if node->save() fails in the loop above.
| if (my_init_dynamic_array(PSI_INSTRUMENT_MEM, &bulk->nodes, sizeof(FVectorNode*), | ||
| rows + rows * 0.1, rows, MYF(0))) | ||
| { | ||
| delete bulk; |
There was a problem hiding this comment.
I think this delete bulk statements and the one few lines below can be removed.
| @@ -1012,6 +1012,8 @@ int FVectorNode::load_from_record(TABLE *graph) | |||
| FVector *vec_ptr= FVector::align_ptr(tref() + tref_len()); | |||
| memcpy(vec_ptr->data(), v->ptr(), v->length()); | |||
| vec_ptr->postprocess(ctx->use_subdist, ctx->vec_len); | |||
| if (ctx->metric == COSINE) | |||
| vec_ptr->abs2= 0.5f; | |||
There was a problem hiding this comment.
I see you already prepared #5184 to fix this separately, good.
| delete bulk; | ||
| return err; | ||
| } | ||
|
|
There was a problem hiding this comment.
Here you may add DBUG_ASSERT(!bulk->ctx->start); to document that the bulk build assumes an empty target graph
@iMineLink
Yes good idea thank you for bringing it up. I will work on a way to implement it. |
| bulk->ctx= ctx; | ||
| bulk->estimated_rows= rows; | ||
| if (my_init_dynamic_array(PSI_INSTRUMENT_MEM, &bulk->nodes, sizeof(FVectorNode*), | ||
| rows + rows / 10, rows, MYF(0))) |
There was a problem hiding this comment.
@iMineLink I forgot to ask you about this when I pushed the commit last week
so here I know that rows is an estimate so it can be less than actual rows by 50 or something, and then we will have to reallocate the whole array and make its size 2*rows
So I added rows/10 to make sure this doesn't happen. Is there is a better way to do it?
There was a problem hiding this comment.
It's OK like this, I couldn't find myself a better way. FYI, it's also not approximate for all engines (for InnoDB it is), check HA_STATS_RECORDS_IS_EXACT. Leaving a comment here would help future readers on the why 10% margin is added to rows.
iMineLink
left a comment
There was a problem hiding this comment.
Hi! Thanks for checking my previous review, I have a few more comments on what's already here in the draft. Keep up the good work!
| MHNSW_Share *ctx; | ||
| int err= MHNSW_Share::acquire(&ctx, table, true); | ||
| if (err && err != HA_ERR_END_OF_FILE && err != HA_ERR_KEY_NOT_FOUND) | ||
| return err; |
There was a problem hiding this comment.
On error here, you need to release the context as well (check mhnsw_insert(), but here we release only on error)
| } | ||
|
|
||
| bulk->ctx= ctx; | ||
| bulk->estimated_rows= rows; |
There was a problem hiding this comment.
If this is now write-only, it can be removed
| Level Code Message | ||
| Note 1105 MHNSW: Bulk insert disabled because estimated memory usage (1335571) exceeds mhnsw_max_cache_size (1048576). Falling back to normal insert. | ||
| # Test search using the fallback-built index to ensure it is healthy and correct | ||
| select id, vec_distance_euclidean(v, concat(repeat(x'00', 3996), x'0000803f')) d from t1 order by d limit 3; |
There was a problem hiding this comment.
To make this result deterministic, you can set limit 1 to filter only d=0
| # Adding index with small cache size should trigger fallback and show a warning | ||
| alter table t1 add vector index (v) m=200; | ||
| Warnings: | ||
| Note 1105 MHNSW: Bulk insert disabled because estimated memory usage (1335571) exceeds mhnsw_max_cache_size (1048576). Falling back to normal insert. |
There was a problem hiding this comment.
Exact memory usage will depend on the used engine I guess, maybe substitute this value with a placeholder to make the test pass on all engines
| id d | ||
| 2 0 | ||
| 1 1 | ||
| 3 1.41421 |
There was a problem hiding this comment.
Here I think both id=3 and id=4 have the same distance to the target, you can limit 2 to get deterministic results
| "exceeds mhnsw_max_cache_size (%llu). Falling back to normal insert.", | ||
| (ulonglong)estimated_mem, (ulonglong)mhnsw_max_cache_size); | ||
| ctx->release(table); | ||
| return 1; |
There was a problem hiding this comment.
Here maybe instead of using returning error 1, you can for example reset the context to null and return 0, to signal to the caller that the condition is "fallback to normal insert" rather than a "true error". This requires a bit of restructuring and is minor though (the caller should set bulk_insert_active only when the context is not-null).
| bulk->ctx= ctx; | ||
| bulk->estimated_rows= rows; | ||
| if (my_init_dynamic_array(PSI_INSTRUMENT_MEM, &bulk->nodes, sizeof(FVectorNode*), | ||
| rows + rows / 10, rows, MYF(0))) |
There was a problem hiding this comment.
It's OK like this, I couldn't find myself a better way. FYI, it's also not approximate for all engines (for InnoDB it is), check HA_STATS_RECORDS_IS_EXACT. Leaving a comment here would help future readers on the why 10% margin is added to rows.
- removed error return (1) and refactored hlindex_bulk_insert_begin to use context instead
d51f90b to
fcdc5f3
Compare
| { | ||
| mysql_mutex_t cache_lock; // for node_cache and stats | ||
| mysql_mutex_t node_lock[8]; | ||
| mysql_mutex_t node_lock[32]; |
There was a problem hiding this comment.
My CPU(AMD Ryzen 7 5800H) got 16 threads so currently 32 here works without any issues. I don't know how we can decide what value to choose though.
There was a problem hiding this comment.
add a comment, like // XXX how to choose what's the best here?
| neighbors.num= 0; | ||
|
|
||
| while (pq.elements() && neighbors.num < max_neighbor_connections) | ||
| size_t temp_num = 0; |
There was a problem hiding this comment.
I know in normal insert this code is redundant and could slow it a bit (seconds). Should we use if(bulk) do this or create a new function select_neighbors_bulk to keep the code clean?
There was a problem hiding this comment.
Do whatever you prefer for now, we can polish the details later after you'll finalize the approach and get all the speed you want.
| SCOPE_EXIT([memroot_sv](){ root_free_to_savepoint(&memroot_sv); }); | ||
| uint N= std::thread::hardware_concurrency(); | ||
| uint total_nodes= bulk->nodes.elements - 1; | ||
| uint workers= std::min(N, total_nodes); |
There was a problem hiding this comment.
Currently, I use all available threads.
There was a problem hiding this comment.
if you consider the case when total_nodes might be less than N, may be let's just not do parallel when the number of nodes is too small? LIke, if it's less than 100*N or some other arbitrary practical threshold, let's not bother with all this parallel machinery?
| args[i].start_idx = current_start; | ||
| args[i].end_idx = current_start + count; | ||
| args[i].error= 0; | ||
| current_start += count; |
There was a problem hiding this comment.
The problem with this approach is that we can't guarantee that graph layers is updated correctly, and we may end up jumping directly from max_layer=L0 to L5. This could slightly degrade recall sometimes, but I can't find a solution to this
There was a problem hiding this comment.
One solution is to sort the nodes by descending layer and use a shared current_idx, where every thread picks nodes[current_idx++] and insert it. This would actually improve the recall a bit, but it would make the graph build slower
There was a problem hiding this comment.
right, sort the nodes by descending layer, you've got this idea too :)
But no need to have a shared idx. Every thread gets its range of nodes and every thread independently sorts its range of nodes. They all should have approximately the same number of nodes on each later.
except that you can insert the start node before starting threads, in single-threaded mode.
There was a problem hiding this comment.
Pull request overview
Adds support for bulk insertion into MHNSW vector indexes during ALTER TABLE, aiming to speed up index creation by batching/parallelizing the in-memory graph build and then persisting it, with fallback to normal per-row insertion when bulk mode is not feasible.
Changes:
- Introduces a bulk-insert lifecycle for MHNSW (
begin/row/end) and wires it intoALTER TABLEcopy phase. - Implements a multi-threaded in-memory build for the MHNSW graph, followed by bulk persistence into the hlindex table.
- Adds MTR coverage for the memory-budget fallback behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| sql/vector_mhnsw.h | Declares MHNSW bulk insert entry points. |
| sql/vector_mhnsw.cc | Implements bulk build (threaded) + persistence; adds synchronization-related changes to neighbor publication. |
| sql/table.h | Adds per-hlindex bulk_insert_active flag; declares hlindex bulk begin/end helpers. |
| sql/sql_table.cc | Starts/stops hlindex bulk mode during ALTER TABLE data copy. |
| sql/sql_base.cc | Routes hlindex insert path to either bulk-row collection or normal insert; implements begin/end wrappers. |
| mysql-test/main/vector_bulk.test | Adds test for bulk-build fallback when mhnsw_max_cache_size is too small. |
| mysql-test/main/vector_bulk.result | Expected output for the new test. |
| mysql-test/main/vector_bulk.combinations | Runs the test under multiple storage engines. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (ctx->byte_len != res->length()) | ||
| return my_errno= HA_ERR_CRASHED; |
| if (insert_dynamic(&bulk->nodes, (uchar*)&node)) | ||
| return HA_ERR_OUT_OF_MEM; |
| if (hlindex_bulk_started && to->hlindexes_bulk_insert_end() && error <= 0) | ||
| { | ||
| if (!thd->is_error()) | ||
| to->file->print_error(my_errno, MYF(0)); | ||
| error= 1; | ||
| } | ||
| hlindex_bulk_started=0; |
There was a problem hiding this comment.
I don't think copilot is completely right here, expressions are evaluated from left to right, so to->hlindexes_bulk_insert_end() will be called before checking error <= 0.
Still the condition looks strange. If hlindexes_bulk_insert_end() succeeds there will be no error printed even if errror <=0 ? And if hlindexes_bulk_insert_end() fails but error is positive, there will be no error printed again?
| uint N= std::thread::hardware_concurrency(); | ||
| uint total_nodes= bulk->nodes.elements - 1; | ||
| uint workers= std::min(N, total_nodes); | ||
|
|
| int err= mysql_thread_create(0, &threads[i], nullptr, bulk_build_thread, &args[i]); | ||
| if (err) | ||
| { | ||
| for (uint j= 0; j < workers_spawned; j++) | ||
| pthread_join(threads[j], nullptr); | ||
| return err; |
| // Publish the new neighbors atomically | ||
| for (size_t i= 0; i < temp_num; i++) | ||
| neighbors.links[i]= temp_links[i]; | ||
|
|
||
| std::atomic_thread_fence(std::memory_order_release); | ||
| neighbors.num= temp_num; |
There was a problem hiding this comment.
I just checked the reference for atomic_thread_fence and realized I had misunderstood how it works. I will work on fixing this
| FVectorNode *first_target= *(FVectorNode**)dynamic_element(&bulk->nodes, 0, FVectorNode**); | ||
| ctx->start= first_target; | ||
|
|
vuvova
left a comment
There was a problem hiding this comment.
Ok, I've left some comments, and there were some comments I didn't leave — this is still work-in-progress, we're not trying to make it ready for merging yet.
See below.
| @@ -12659,11 +12663,17 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to, | |||
|
|
|||
| from->file->info(HA_STATUS_VARIABLE); | |||
| to->file->extra(HA_EXTRA_PREPARE_FOR_ALTER_TABLE); | |||
| if (!to->s->long_unique_table && !to->s->hlindexes()) | |||
| if (!to->s->long_unique_table) | |||
There was a problem hiding this comment.
when you remove something, check in the git history why it was added. The check above was in the commit with the comment "Don't enable bulk insert when altering a table containing vector index. InnoDB can't handle situation when bulk insert is enabled for one table but disabled for another"
That is, you should first do hlindexes_bulk_insert_begin() and if it succeeds, then do ha_start_bulk_insert(). To avoid the case when bulk insert was only started partially.
There was a problem hiding this comment.
I checked and actually talked about this in Zulip :)
but yeah I forgot to make sure we only enable it when hlindex_bulk_begin succeed
| if (hlindex_bulk_started && to->hlindexes_bulk_insert_end() && error <= 0) | ||
| { | ||
| if (!thd->is_error()) | ||
| to->file->print_error(my_errno, MYF(0)); | ||
| error= 1; | ||
| } | ||
| hlindex_bulk_started=0; |
There was a problem hiding this comment.
I don't think copilot is completely right here, expressions are evaluated from left to right, so to->hlindexes_bulk_insert_end() will be called before checking error <= 0.
Still the condition looks strange. If hlindexes_bulk_insert_end() succeeds there will be no error printed even if errror <=0 ? And if hlindexes_bulk_insert_end() fails but error is positive, there will be no error printed again?
| { | ||
| mysql_mutex_t cache_lock; // for node_cache and stats | ||
| mysql_mutex_t node_lock[8]; | ||
| mysql_mutex_t node_lock[32]; |
There was a problem hiding this comment.
add a comment, like // XXX how to choose what's the best here?
| size_t cur_num= neighbors[layer].num; | ||
| neighbors[layer].links[cur_num]= other; | ||
| std::atomic_thread_fence(std::memory_order_release); | ||
| neighbors[layer].num= cur_num + 1; |
There was a problem hiding this comment.
This looks very fishy. May be it's safe, but looks fishy. Please add a comment before the method, explaining how it's used in parallel build and what the invariants are
There was a problem hiding this comment.
It is indeed fishy. I will work on it
There was a problem hiding this comment.
Thread fence without corresponding atomic operation(s) can't be safe by specification.
| neighbors.num= 0; | ||
|
|
||
| while (pq.elements() && neighbors.num < max_neighbor_connections) | ||
| size_t temp_num = 0; |
There was a problem hiding this comment.
Do whatever you prefer for now, we can polish the details later after you'll finalize the approach and get all the speed you want.
| return 0; | ||
| } | ||
|
|
||
| uint N= std::thread::hardware_concurrency(); |
There was a problem hiding this comment.
add a comment, like // XXX how many threads to use?
let's not bother with this now. Still I expect some users will be unhappy with this "greedy" behavior, so eventually we'll need to think about it. But it's not a requirement for the merge, we can accept the feature with your "use all cores" behavior.
| FVectorNode *node= new (ctx->alloc_node()) | ||
| FVectorNode(ctx, table->file->ref, target_layer, res->ptr()); | ||
|
|
||
| if (insert_dynamic(&bulk->nodes, (uchar*)&node)) |
There was a problem hiding this comment.
Do you really need an array of nodes? node_cache is an array of all nodes and Hash_set<> has method at(i) that allows to use it as an array.
Node's index might change if you modify the hash (insert/delete) but it doesn't happen in the parallel phase, does it?
There was a problem hiding this comment.
Few more thoughts. You know layers for every node before the parallel phase starts. So
- you don't need
update_start_parallel, you can update the start to the node with the highers layer in the single-threaded phase. Building the graph is slow, it's done in parallel, but just keeping a running max layer and a running start node is cheap. - if you know all layers in advance, then perhaps it'd make sense to insert nodes in that order? No idea if it'll help, but as we have this information, it's worth trying to see if we can use it. If you do it, then you might need an array of nodes, because you'll need to sort it by layer.
| SCOPE_EXIT([memroot_sv](){ root_free_to_savepoint(&memroot_sv); }); | ||
| uint N= std::thread::hardware_concurrency(); | ||
| uint total_nodes= bulk->nodes.elements - 1; | ||
| uint workers= std::min(N, total_nodes); |
There was a problem hiding this comment.
if you consider the case when total_nodes might be less than N, may be let's just not do parallel when the number of nodes is too small? LIke, if it's less than 100*N or some other arbitrary practical threshold, let's not bother with all this parallel machinery?
| args[i].start_idx = current_start; | ||
| args[i].end_idx = current_start + count; | ||
| args[i].error= 0; | ||
| current_start += count; |
There was a problem hiding this comment.
right, sort the nodes by descending layer, you've got this idea too :)
But no need to have a shared idx. Every thread gets its range of nodes and every thread independently sorts its range of nodes. They all should have approximately the same number of nodes on each later.
except that you can insert the start node before starting threads, in single-threaded mode.
Add support for bulk insertion into MHNSW vector indexes during ALTER TABLE