diff --git a/acceptance/bundle/dms/destroy-declined/databricks.yml b/acceptance/bundle/dms/destroy-declined/databricks.yml new file mode 100644 index 00000000000..98cdf3c451f --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: dms-destroy-declined + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/destroy-declined/out.test.toml b/acceptance/bundle/dms/destroy-declined/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/destroy-declined/output.txt b/acceptance/bundle/dms/destroy-declined/output.txt new file mode 100644 index 00000000000..3b833c68997 --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/output.txt @@ -0,0 +1,56 @@ + +=== Deploy creates the DMS deployment + version 1 +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-destroy-declined/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== Destroy without --auto-approve is declined: it must not create a DMS version or delete the deployment +>>> [CLI] bundle destroy +Error: this command will destroy all resources deployed by this bundle, including workspace files in the deployment directory. +Deleting data assets such as schemas, pipelines, or volumes may cause permanent data loss and should be carefully reviewed. +To proceed, use --auto-approve. + +Exit code: 1 + +=== Only the deploy's DMS calls were recorded — no destroy version, no DELETE from the declined destroy +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "0" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/0/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-destroy-declined/default + +Deleting files... +Destroy complete! diff --git a/acceptance/bundle/dms/destroy-declined/script b/acceptance/bundle/dms/destroy-declined/script new file mode 100644 index 00000000000..9d791d212c8 --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/script @@ -0,0 +1,14 @@ +cleanup() { + trace $CLI bundle destroy --auto-approve + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy creates the DMS deployment + version 1" +trace $CLI bundle deploy + +title "Destroy without --auto-approve is declined: it must not create a DMS version or delete the deployment" +errcode trace $CLI bundle destroy + +title "Only the deploy's DMS calls were recorded — no destroy version, no DELETE from the declined destroy" +trace print_requests.py //api/2.0/bundle diff --git a/acceptance/bundle/dms/enable-after-deploy/databricks.yml b/acceptance/bundle/dms/enable-after-deploy/databricks.yml new file mode 100644 index 00000000000..cd8ff90b266 --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-enable-after-deploy + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/enable-after-deploy/out.test.toml b/acceptance/bundle/dms/enable-after-deploy/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/enable-after-deploy/output.txt b/acceptance/bundle/dms/enable-after-deploy/output.txt new file mode 100644 index 00000000000..0d9addaa6ee --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/output.txt @@ -0,0 +1,88 @@ + +=== Deploy without the feature: no DMS calls are made +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Enable experimental.record_deployment_history +=== Delete local cache (.databricks) so the next deploy cannot rely on it +=== Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default + +Deleting files... +Destroy complete! diff --git a/acceptance/bundle/dms/enable-after-deploy/script b/acceptance/bundle/dms/enable-after-deploy/script new file mode 100644 index 00000000000..c17b3b859dc --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/script @@ -0,0 +1,25 @@ +cleanup() { + trace $CLI bundle destroy --auto-approve + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy without the feature: no DMS calls are made" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock + +title "Enable experimental.record_deployment_history" +cat >> databricks.yml <<'YAML' + +experimental: + record_deployment_history: true +YAML + +title "Delete local cache (.databricks) so the next deploy cannot rely on it" +rm -rf .databricks + +title "Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache)" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/record/databricks.yml b/acceptance/bundle/dms/record/databricks.yml new file mode 100644 index 00000000000..b20e6274310 --- /dev/null +++ b/acceptance/bundle/dms/record/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: dms-record + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/record/out.test.toml b/acceptance/bundle/dms/record/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/record/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/record/output.txt b/acceptance/bundle/dms/record/output.txt new file mode 100644 index 00000000000..d9cb51d77a0 --- /dev/null +++ b/acceptance/bundle/dms/record/output.txt @@ -0,0 +1,148 @@ + +=== Deploy: a DMS deployment + version are recorded while the file lock is held +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== DMS API calls made during deploy (create deployment, create + complete version) +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "0" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/0/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +=== The workspace-filesystem lock is applied alongside DMS (deploy.lock written) +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Destroy: a DMS version is recorded while the file lock is held +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-record/default + +Deleting files... +Destroy complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DESTROY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "DELETE", + "path": "/api/2.0/bundle/deployments/[UUID]" +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} diff --git a/acceptance/bundle/dms/record/script b/acceptance/bundle/dms/record/script new file mode 100644 index 00000000000..afb9b05bdc3 --- /dev/null +++ b/acceptance/bundle/dms/record/script @@ -0,0 +1,23 @@ +cleanup() { + title "Destroy: a DMS version is recorded while the file lock is held" + trace $CLI bundle destroy --auto-approve + trace print_requests.py //api/2.0/bundle --keep + trace print_requests.py //deploy.lock + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy: a DMS deployment + version are recorded while the file lock is held" +trace $CLI bundle deploy + +title "DMS API calls made during deploy (create deployment, create + complete version)" +trace print_requests.py //api/2.0/bundle --keep + +title "The workspace-filesystem lock is applied alongside DMS (deploy.lock written)" +trace print_requests.py //deploy.lock + +title "Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment)" +rm -rf .databricks +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..84b939a7da2 --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,5 @@ +# Deployment Metadata Service (DMS) recording is only meaningful in the direct +# engine, where the deployment ID is derived from the state lineage. +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] + +RecordRequests = true diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index fd76151483c..fc142897b55 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -22,6 +22,7 @@ import ( "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/libs/agent" "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/dms" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" "github.com/databricks/cli/libs/sync" @@ -142,7 +143,12 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - // lock is acquired here + // lock is acquired here. + // + // Set up DMS recording of this deployment as a version. CreateVersion and + // its matching CompleteVersion run only after the plan is approved (below), + // so a cancelled deploy records nothing. + recorder := newDeploymentRecorder(ctx, b, engine, dms.VersionTypeDeploy) defer func() { bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) }() @@ -208,6 +214,18 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } if haveApproval { + // Record the DMS version now that the plan is approved (a cancelled deploy + // records nothing). The deployment lineage and the version's serial both + // come from the plan; CompleteVersion below finalizes this same version. + if err := recorder.CreateVersion(ctx, plan.Lineage, plan.Serial); err != nil { + logdiag.LogError(ctx, err) + return + } + defer func() { + if err := recorder.CompleteVersion(ctx, plan.Lineage, plan.Serial, !logdiag.HasError(ctx)); err != nil { + logdiag.LogError(ctx, err) + } + }() deployCore(ctx, b, plan, engine) } else { cmdio.LogString(ctx, "Deployment cancelled!") diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 74049f26f42..67fab37f284 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/bundle/direct" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dms" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" "github.com/databricks/databricks-sdk-go/apierr" @@ -127,6 +128,10 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } + // Set up DMS recording of this destroy as a version. CreateVersion and its + // matching CompleteVersion run only after the destroy is approved (below), so + // a cancelled destroy records nothing. + recorder := newDeploymentRecorder(ctx, b, engine, dms.VersionTypeDestroy) defer func() { bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) }() @@ -184,6 +189,19 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } } + // Record the DMS version now that the destroy is approved (a cancelled + // destroy records nothing). The deployment lineage and the version's + // serial both come from the plan; CompleteVersion below finalizes this + // same version and deletes the deployment record on success. + if err := recorder.CreateVersion(ctx, plan.Lineage, plan.Serial); err != nil { + logdiag.LogError(ctx, err) + return + } + defer func() { + if err := recorder.CompleteVersion(ctx, plan.Lineage, plan.Serial, !logdiag.HasError(ctx)); err != nil { + logdiag.LogError(ctx, err) + } + }() destroyCore(ctx, b, plan, engine) } else { cmdio.LogString(ctx, "Destroy cancelled!") diff --git a/bundle/phases/dms.go b/bundle/phases/dms.go new file mode 100644 index 00000000000..b5b396d282c --- /dev/null +++ b/bundle/phases/dms.go @@ -0,0 +1,35 @@ +package phases + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/engine" + "github.com/databricks/cli/libs/dms" +) + +// newDeploymentRecorder returns a dms.Recorder for the current deployment, or +// nil when DMS recording does not apply. A nil recorder is a no-op, so callers +// do not need to branch on it. +// +// Recording is enabled only when experimental.record_deployment_history is set +// AND the engine is direct: the deployment ID is the state lineage, which is +// only populated for the direct engine (the terraform engine never opens the +// state DB). Returning nil for terraform leaves those deployments untouched. +func newDeploymentRecorder(ctx context.Context, b *bundle.Bundle, eng engine.EngineType, versionType dms.VersionType) *dms.Recorder { + if b.Config.Experimental == nil || !b.Config.Experimental.RecordDeploymentHistory { + return nil + } + if !eng.IsDirect() { + return nil + } + // Seed the state lineage before the plan is computed so the plan carries it. + // CreateVersion reads the lineage and serial from the plan, the single + // source of truth for both. + b.DeploymentBundle.StateDB.GetOrInitLineage() + return dms.NewRecorder( + b.WorkspaceClient(ctx).BundleDeployments, + b.Config.Bundle.Target, + versionType, + ) +} diff --git a/libs/dms/recorder.go b/libs/dms/recorder.go new file mode 100644 index 00000000000..c02602a2773 --- /dev/null +++ b/libs/dms/recorder.go @@ -0,0 +1,203 @@ +// Package dms records bundle deployments as versions with the Deployment +// Metadata Service (DMS). +// +// It is intentionally independent of the deployment lock: a Recorder does not +// acquire or hold any lock. Callers serialize concurrent deployments (today via +// the workspace-filesystem lock). The DMS deployment is identified by the +// bundle's state lineage and each version by the state serial; both are read +// from the plan (the single source of truth) and passed to CreateVersion / +// CompleteVersion, so the recorder never derives them itself. +package dms + +import ( + "context" + "errors" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/bundledeployments" +) + +// The server expires a version's lease if it does not receive a heartbeat +// within a 2-minute TTL; we heartbeat well inside that window. +const defaultHeartbeatInterval = 30 * time.Second + +// VersionType identifies the kind of deployment a version records. +type VersionType = bundledeployments.VersionType + +const ( + VersionTypeDeploy VersionType = bundledeployments.VersionTypeVersionTypeDeploy + VersionTypeDestroy VersionType = bundledeployments.VersionTypeVersionTypeDestroy +) + +// Recorder records a single deploy/destroy as a version with DMS. The lineage +// and serial that identify the deployment and version are passed in from the +// plan, not stored here. +type Recorder struct { + svc bundledeployments.BundleDeploymentsInterface + targetName string + versionType VersionType + + // stopHeartbeat stops the heartbeat goroutine started by CreateVersion. It + // is nil until CreateVersion runs, which is how CompleteVersion knows whether + // there is anything to finalize. + stopHeartbeat context.CancelFunc +} + +// NewRecorder returns a Recorder for the given deployment target. +func NewRecorder(svc bundledeployments.BundleDeploymentsInterface, targetName string, versionType VersionType) *Recorder { + return &Recorder{ + svc: svc, + targetName: targetName, + versionType: versionType, + } +} + +// CreateVersion registers a new version with DMS, claiming it for the duration +// of the deployment. The lineage identifies the deployment and serial is the +// plan's state serial; both come from the plan. A nil Recorder is a no-op, so +// callers can leave it nil when recording is disabled. +func (r *Recorder) CreateVersion(ctx context.Context, lineage string, serial int) error { + if r == nil { + return nil + } + + if err := r.ensureDeployment(ctx, lineage); err != nil { + return err + } + + id := strconv.Itoa(serial) + // TODO: once the SDK exposes previous_version_id (universe #2061768), set it + // to the serial the plan was computed against so the server can reject a + // deployment built on a stale plan (serializability). It is required unless + // the deployment is net new, in which case it is left unset. + version, err := r.svc.CreateVersion(ctx, bundledeployments.CreateVersionRequest{ + Parent: "deployments/" + lineage, + VersionId: id, + Version: bundledeployments.Version{ + CliVersion: build.GetInfo().Version, + VersionType: r.versionType, + TargetName: r.targetName, + }, + }) + if err != nil { + return fmt.Errorf("failed to create deployment version: %w", err) + } + + log.Infof(ctx, "Created deployment version: deployment=%s version=%s", lineage, version.VersionId) + r.stopHeartbeat = startHeartbeat(ctx, r.svc, lineage, id) + return nil +} + +// ensureDeployment creates the DMS deployment record if it does not exist yet. +// GetDeployment is the only thing read from the backend: it tells us whether +// this lineage has been recorded before (recording may have been enabled after +// the first deploy). The version ID itself comes from the plan, not from the +// deployment's last_version_id. +func (r *Recorder) ensureDeployment(ctx context.Context, lineage string) error { + _, err := r.svc.GetDeployment(ctx, bundledeployments.GetDeploymentRequest{ + Name: "deployments/" + lineage, + }) + if errors.Is(err, apierr.ErrNotFound) { + _, err = r.svc.CreateDeployment(ctx, bundledeployments.CreateDeploymentRequest{ + DeploymentId: lineage, + Deployment: bundledeployments.Deployment{ + TargetName: r.targetName, + }, + }) + if err != nil { + return fmt.Errorf("failed to create deployment: %w", err) + } + return nil + } + if err != nil { + return fmt.Errorf("failed to get deployment: %w", err) + } + return nil +} + +// CompleteVersion finalizes the version created by CreateVersion. The lineage +// and serial must be the same ones passed to CreateVersion (both come from the +// same plan). A nil Recorder, or one whose CreateVersion never ran, is a no-op. +func (r *Recorder) CompleteVersion(ctx context.Context, lineage string, serial int, success bool) error { + if r == nil || r.stopHeartbeat == nil { + return nil + } + + r.stopHeartbeat() + + id := strconv.Itoa(serial) + versionName := fmt.Sprintf("deployments/%s/versions/%s", lineage, id) + + reason := bundledeployments.VersionCompleteVersionCompleteSuccess + if !success { + reason = bundledeployments.VersionCompleteVersionCompleteFailure + } + + _, err := r.svc.CompleteVersion(ctx, bundledeployments.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }) + if err != nil { + return err + } + log.Infof(ctx, "Completed deployment version: deployment=%s version=%s reason=%s", lineage, id, reason) + + // For destroy operations, delete the deployment record after the version + // completes successfully. + if success && r.versionType == VersionTypeDestroy { + err = r.svc.DeleteDeployment(ctx, bundledeployments.DeleteDeploymentRequest{ + Name: "deployments/" + lineage, + }) + if err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + + return nil +} + +// startHeartbeat starts a background goroutine that sends heartbeats to keep +// the deployment version's lease alive. Returns a cancel function to stop it. +func startHeartbeat(ctx context.Context, svc bundledeployments.BundleDeploymentsInterface, lineage, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + versionName := fmt.Sprintf("deployments/%s/versions/%s", lineage, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := svc.Heartbeat(ctx, bundledeployments.HeartbeatRequest{Name: versionName}) + if err != nil { + // A 409 ABORTED is expected if the version was completed + // between the ticker firing and the heartbeat. + if isAbortedErr(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + } else { + log.Debugf(ctx, "Deployment heartbeat sent: deployment=%s version=%s", lineage, versionID) + } + } + } + }() + + return cancel +} + +// isAbortedErr reports whether err is an HTTP 409 ABORTED from the DMS API. +func isAbortedErr(err error) bool { + apiErr, ok := errors.AsType[*apierr.APIError](err) + return ok && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" +} diff --git a/libs/testserver/bundle.go b/libs/testserver/bundle.go new file mode 100644 index 00000000000..af368e7386d --- /dev/null +++ b/libs/testserver/bundle.go @@ -0,0 +1,92 @@ +package testserver + +import ( + "encoding/json" + + "github.com/databricks/databricks-sdk-go/service/bundledeployments" +) + +// Handlers for the Deployment Metadata Service (DMS) API under /api/2.0/bundle. +// State is kept in FakeWorkspace.Deployments, keyed by deployment ID. + +func (s *FakeWorkspace) CreateDeployment(req Request) Response { + deploymentID := req.URL.Query().Get("deployment_id") + + var dep bundledeployments.Deployment + if err := json.Unmarshal(req.Body, &dep); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep.Name = "deployments/" + deploymentID + dep.Status = bundledeployments.DeploymentStatusDeploymentStatusActive + s.Deployments[deploymentID] = &dep + return Response{Body: dep} +} + +func (s *FakeWorkspace) GetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{ + "error_code": "RESOURCE_DOES_NOT_EXIST", + "message": "deployment " + deploymentID + " does not exist", + }, + } + } + return Response{Body: *dep} +} + +func (s *FakeWorkspace) DeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + delete(s.Deployments, deploymentID) + return Response{Body: map[string]any{}} +} + +func (s *FakeWorkspace) CreateVersion(req Request, deploymentID string) Response { + versionID := req.URL.Query().Get("version_id") + + var version bundledeployments.Version + if err := json.Unmarshal(req.Body, &version); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{"error_code": "RESOURCE_DOES_NOT_EXIST", "message": "deployment does not exist"}, + } + } + + // The server accepts client-chosen version IDs (the CLI sends the state + // serial), so we record whatever the client requested. + dep.LastVersionId = versionID + version.Name = "deployments/" + deploymentID + "/versions/" + versionID + version.VersionId = versionID + version.Status = bundledeployments.VersionStatusVersionStatusInProgress + return Response{Body: version} +} + +func (s *FakeWorkspace) CompleteVersion(req Request, deploymentID, versionID string) Response { + var completeReq bundledeployments.CompleteVersionRequest + _ = json.Unmarshal(req.Body, &completeReq) + + return Response{Body: bundledeployments.Version{ + Name: "deployments/" + deploymentID + "/versions/" + versionID, + VersionId: versionID, + Status: bundledeployments.VersionStatusVersionStatusCompleted, + CompletionReason: completeReq.CompletionReason, + }} +} + +func (s *FakeWorkspace) Heartbeat() Response { + return Response{Body: bundledeployments.HeartbeatResponse{}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index d2925fb1e38..2740fa6cbe7 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/databricks/databricks-sdk-go/service/apps" + "github.com/databricks/databricks-sdk-go/service/bundledeployments" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -179,6 +180,9 @@ type FakeWorkspace struct { PostgresSyncedTables map[string]postgres.SyncedTable PostgresOperations map[string]postgres.Operation + // Deployment Metadata Service (DMS) deployment records, keyed by deployment ID. + Deployments map[string]*bundledeployments.Deployment + // Branches and endpoints that the server provisioned implicitly together // with their parent (e.g. the production branch on a new project, or the // primary endpoint on a new branch). The real backend rejects independent @@ -319,6 +323,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresRoles: map[string]postgres.Role{}, PostgresSyncedTables: map[string]postgres.SyncedTable{}, PostgresOperations: map[string]postgres.Operation{}, + Deployments: map[string]*bundledeployments.Deployment{}, postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index ee56cefe3e6..6d48b4eb970 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -235,6 +235,26 @@ func AddDefaultHandlers(server *Server) { return req.Workspace.JobsCreate(req) }) + // Deployment Metadata Service (DMS) endpoints. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.CreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.GetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.CreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.CompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.Heartbeat() + }) + server.Handle("POST", "/api/2.2/jobs/delete", func(req Request) any { var request jobs.DeleteJob if err := json.Unmarshal(req.Body, &request); err != nil {