diff --git a/docs/source/AdministratorGuide/Systems/Transformation/index.rst b/docs/source/AdministratorGuide/Systems/Transformation/index.rst index 150a707be5f..def9aae7459 100644 --- a/docs/source/AdministratorGuide/Systems/Transformation/index.rst +++ b/docs/source/AdministratorGuide/Systems/Transformation/index.rst @@ -51,7 +51,7 @@ Disadvantages: Several improvements have been made in the TS to handle scalability, and extensibility issues. While the system structure remains intact, "tricks" like threading and caching have been extensively applied. -It's not possible to use ISB (Input Sandbox) to ship local files as for 'normal' Jobs (this should not be considered, anyway, a disadvantage). +Local files can't be shipped through the Input Sandbox the way the Job API does for 'normal' Jobs; a transformation instead references already-uploaded sandboxes — see `Input Sandboxes`_. ------------ Architecture @@ -121,6 +121,54 @@ The complete list can be found in the `DIRAC project GitHub repository `` in the ``SandboxMetadataDB``. While + that mapping exists the cleaner will not remove the sandbox. +- Pinning is mandatory: if it fails (``SandboxMetadataDB`` unreachable, or the + author is not authorised for a referenced sandbox), the transformation is + **rolled back and creation fails** — a created-but-unpinned transformation + would have its sandbox cleaned and then break every job submitted afterwards. +- On clean/archive, ``TransformationCleaningAgent`` unassigns + ``Transformation:`` so the cleaner can reclaim the now-unused sandboxes. If + the unassignment fails, the transformation's cleaning fails and is retried — + otherwise the sandboxes would stay pinned to a gone transformation and leak. + The unassignment is idempotent, so retrying is safe. + +This requires the ``SandboxMetadataDB`` to be reachable from both the +``TransformationManager`` service and the ``TransformationCleaningAgent``. + ------------- Configuration ------------- diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index 0a8a84e0ab4..c670cf8c908 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -71,6 +71,7 @@ def __init__(self, *args, **kwargs): self.pilotAgentsDB = None self.taskQueueDB = None self.storageManagementDB = None + self.sandboxDB = None # # transformations types self.transformationTypes = None @@ -153,6 +154,21 @@ def initialize(self): except RuntimeError: pass + # SandboxMetadataDB is used to unassign a transformation's input sandboxes at + # clean/archive time, releasing them for the sandbox-store cleaner. The agent + # keeps running if it can't load (the unassign step is skipped), but a failure + # to unassign during cleaning is logged loudly so a leaked assignment (which + # keeps the sandbox pinned forever) is noticed. + self.sandboxDB = None + try: + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB") + if result["OK"]: + self.sandboxDB = result["Value"](parentLogger=self.log) + else: + self.log.error("Could not load SandboxMetadataDB; sandbox unassignment disabled", result["Message"]) + except RuntimeError as excp: + self.log.error("Could not connect to SandboxMetadataDB; sandbox unassignment disabled", str(excp)) + return S_OK() ############################################################################# @@ -514,6 +530,12 @@ def archiveTransformation(self, transID): :param int transID: transformation ID """ self.log.info(f"Archiving transformation {transID}") + # Release the input-sandbox pin first; if it fails, fail the whole archive so + # it is retried — otherwise the sandboxes stay pinned to a gone transformation + # and leak. unassignEntities is idempotent, so the retry is safe. + res = self._unassignTransformationSandboxes(transID) + if not res["OK"]: + return res # Clean the jobs in the WMS and any failover requests found res = self.cleanTransformationTasks(transID) if not res["OK"]: @@ -531,11 +553,41 @@ def archiveTransformation(self, transID): self.log.info(f"Updated status of transformation {transID} to Archived") return S_OK() + def _unassignTransformationSandboxes(self, transID): + """Remove a transformation's input-sandbox assignment at clean/archive time. + + Drops the ``Transformation:`` mapping in the SandboxMetadataDB so the + sandbox-store cleaner can reclaim the (now unused) sandboxes. Returns S_ERROR on + failure so the caller can fail the cleaning and retry: leaving the assignment in + place would pin the sandboxes forever and leak them. ``unassignEntities`` is + idempotent, so retrying — including after the mapping is already gone — is safe. + + :param int transID: transformation ID + :returns: S_OK / S_ERROR + """ + if not self.sandboxDB: + return S_OK() + try: + result = self.sandboxDB.unassignEntities([f"Transformation:{transID}"]) + except Exception as excp: # pylint: disable=broad-except + self.log.exception("Unexpected error unassigning sandboxes for transformation", str(transID)) + return S_ERROR(f"Unexpected error unassigning sandboxes for transformation {transID}: {excp}") + if not result["OK"]: + self.log.error("Could not unassign sandboxes for transformation", f"{transID}: {result['Message']}") + return S_ERROR(f"Could not unassign sandboxes for transformation {transID}: {result['Message']}") + return S_OK() + def cleanTransformation(self, transID): """This removes what was produced by the supplied transformation, leaving only some info and log in the transformation DB. """ self.log.info("Cleaning transformation", transID) + # Release the input-sandbox pin first; if it fails, fail the whole clean so it + # is retried — otherwise the sandboxes stay pinned to a gone transformation and + # leak. unassignEntities is idempotent, so the retry is safe. + res = self._unassignTransformationSandboxes(transID) + if not res["OK"]: + return res res = self.getTransformationDirectories(transID) if not res["OK"]: self.log.error( diff --git a/src/DIRAC/TransformationSystem/Agent/test/Test_TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/test/Test_TransformationCleaningAgent.py new file mode 100644 index 00000000000..769030c4524 --- /dev/null +++ b/src/DIRAC/TransformationSystem/Agent/test/Test_TransformationCleaningAgent.py @@ -0,0 +1,63 @@ +# pylint: disable=missing-docstring +from unittest.mock import MagicMock + +from DIRAC.TransformationSystem.Agent.TransformationCleaningAgent import TransformationCleaningAgent + +# The agent's __init__ needs a running config, so these exercise the methods with a +# stand-in ``self`` (a MagicMock) carrying only the attributes the methods touch. + + +def test_unassign_callsDBWithTransformationEntity(): + agent = MagicMock() + agent.sandboxDB.unassignEntities.return_value = {"OK": True, "Value": 1} + + res = TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345) + + assert res["OK"] + agent.sandboxDB.unassignEntities.assert_called_once_with(["Transformation:12345"]) + agent.log.error.assert_not_called() + agent.log.exception.assert_not_called() + + +def test_unassign_noDBisOk(): + agent = MagicMock() + agent.sandboxDB = None + # Nothing to unassign: succeed so cleaning is not blocked in DB-less deployments. + assert TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345)["OK"] + + +def test_unassign_dbErrorIsLoggedAndReturnsError(): + agent = MagicMock() + agent.sandboxDB.unassignEntities.return_value = {"OK": False, "Message": "boom"} + + # A failed unassignment must be loud AND returned as an error so the caller fails + # the cleaning and retries (otherwise the sandboxes leak). + res = TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345) + + assert not res["OK"] + agent.sandboxDB.unassignEntities.assert_called_once_with(["Transformation:12345"]) + agent.log.error.assert_called_once() + + +def test_unassign_unexpectedExceptionIsLoggedAndReturnsError(): + agent = MagicMock() + agent.sandboxDB.unassignEntities.side_effect = RuntimeError("db down") + + # An unexpected exception must not propagate, but must be logged loudly and returned + # as an error so the cleaning is retried. + res = TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345) + + assert not res["OK"] + agent.log.exception.assert_called_once() + + +def test_cleanTransformation_failsWhenUnassignFails(): + # The whole clean must fail (and be retried) if the sandbox can't be unassigned, + # rather than proceeding and leaking the now-orphaned sandbox assignment. + agent = MagicMock() + agent._unassignTransformationSandboxes.return_value = {"OK": False, "Message": "boom"} + + res = TransformationCleaningAgent.cleanTransformation(agent, 12345) + + assert not res["OK"] + agent.getTransformationDirectories.assert_not_called() diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 16e053d420f..fd1b8a9a22d 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -9,9 +9,12 @@ from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import encode as jencode from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.Core.Workflow.Workflow import fromXMLString class TransformationManagerHandlerMixin: + sandboxDB = None + @classmethod def initializeHandler(cls, serviceInfoDict): """Initialization of DB object""" @@ -25,6 +28,24 @@ def initializeHandler(cls, serviceInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to TransformationDB: {excp}") + # SandboxMetadataDB is used to pin a transformation's input sandboxes so the + # sandbox-store cleaner does not remove them while the transformation is alive. + # We don't stop the service if it can't load (most transformations have no + # input sandboxes to pin), but a transformation that *does* reference sandboxes + # will be refused at creation time rather than left unpinned (see + # _assignInputSandboxesToTransformation). + cls.sandboxDB = None + try: + sbResult = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB") + if sbResult["OK"]: + cls.sandboxDB = sbResult["Value"](parentLogger=cls.log) + else: + cls.log.error( + "Could not load SandboxMetadataDB; input-sandbox pinning unavailable", sbResult["Message"] + ) + except RuntimeError as excp: + cls.log.error("Could not connect to SandboxMetadataDB; input-sandbox pinning unavailable", str(excp)) + return S_OK() def checkPermissions(self, transName: str): @@ -117,10 +138,80 @@ def export_addTransformation( inputMetaQuery=inputMetaQuery, outputMetaQuery=outputMetaQuery, ) - if res["OK"]: - self.log.info("Added transformation", res["Value"]) + if not res["OK"]: + return res + transID = res["Value"] + self.log.info("Added transformation", transID) + + # Pin any input sandboxes to the transformation. If this fails we must not + # leave a created-but-unpinned transformation: the sandbox would be cleaned + # on the usual timescale and every (1000s-10000s of) job submitted afterwards + # would fail to find it. Roll the transformation back and report the error. + assignRes = self._assignInputSandboxesToTransformation(transID, body, author, authorGroup) + if not assignRes["OK"]: + self.log.error( + "Could not pin input sandboxes; rolling back transformation", f"{transID}: {assignRes['Message']}" + ) + delRes = self.transformationDB.deleteTransformation(transID, author=author) + if not delRes["OK"]: + self.log.error("Rollback failed; transformation left unpinned", f"{transID}: {delRes['Message']}") + return S_ERROR( + f"Could not pin input sandboxes ({assignRes['Message']}) and rollback failed " + f"({delRes['Message']}); transformation {transID} is orphaned and unpinned" + ) + return S_ERROR( + f"Could not pin input sandboxes; transformation {transID} rolled back: {assignRes['Message']}" + ) return res + def _assignInputSandboxesToTransformation(self, transID, body, author, authorGroup): + """Pin a transformation's input sandboxes so the sandbox-store cleaner does + not remove them while the transformation is alive. + + Reads ``SB:`` references from the workflow body's ``InputSandbox`` parameter + and writes a ``Transformation:`` mapping under the author's credentials + (the author uploaded the sandboxes, so ``getSandboxId`` authorises them). + + Returns ``S_OK`` when there is nothing to pin (no body, no ``InputSandbox`` + parameter, or no ``SB:`` references). When the body *does* reference sandboxes, + a failure to pin them is fatal and returned as ``S_ERROR``: an unpinned sandbox + is cleaned on the usual timescale and breaks every job submitted afterwards, so + the caller must roll the transformation back rather than create it unpinned. + + :param int transID: the created transformation ID + :param str body: the transformation body (workflow XML) + :param str author: requester username (the submitter/owner) + :param str authorGroup: requester group + """ + if not body: + return S_OK() + try: + workflow = fromXMLString(body) + param = workflow.parameters.find("InputSandbox") + except Exception as excp: # pylint: disable=broad-except # body may not be a workflow + self.log.verbose("Could not parse transformation body for InputSandbox", str(excp)) + return S_OK() + if not param: + return S_OK() + # InputSandbox is canonically a ';'-joined string, but accept a list too in + # case a future producer stores it list-form. + value = param.getValue() + entries = value if isinstance(value, list) else str(value).split(";") + sbRefs = [sb for sb in entries if isinstance(sb, str) and sb.startswith("SB:")] + if not sbRefs: + return S_OK() + + # From here the transformation has sandboxes that MUST be pinned. + if not self.sandboxDB: + return S_ERROR("SandboxMetadataDB unavailable; cannot pin input sandboxes") + result = self.sandboxDB.assignSandboxesToEntities( + {f"Transformation:{transID}": [(sb, "Input") for sb in sbRefs]}, author, authorGroup + ) + if not result["OK"]: + return S_ERROR(f"Could not assign input sandboxes: {result['Message']}") + self.log.info("Assigned input sandboxes to transformation", f"{transID} ({result['Value']}/{len(sbRefs)})") + return S_OK() + types_deleteTransformation = [[int, str]] def export_deleteTransformation(self, transName): diff --git a/src/DIRAC/TransformationSystem/Service/test/Test_TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/test/Test_TransformationManagerHandler.py new file mode 100644 index 00000000000..7b4172b827a --- /dev/null +++ b/src/DIRAC/TransformationSystem/Service/test/Test_TransformationManagerHandler.py @@ -0,0 +1,146 @@ +# pylint: disable=missing-docstring +from unittest.mock import MagicMock, patch + +from DIRAC.Core.Security.Properties import SecurityProperty +from DIRAC.Core.Workflow.Workflow import Workflow +from DIRAC.Core.Workflow.Parameter import Parameter +from DIRAC.TransformationSystem.Service.TransformationManagerHandler import TransformationManagerHandlerMixin + +# The handler needs a running service to initialise, so these exercise the methods with +# a stand-in ``self`` (a MagicMock) carrying only the attributes the methods touch. + + +def _body_with_input_sandbox(value): + wf = Workflow() + wf.setName("testprod") + wf.addParameter(Parameter("InputSandbox", value, "JDL", "", "", True, False, "Input sandbox")) + return wf.toXMLString() + + +def test_assign_extractsSBRefsAndDelegates(): + handler = MagicMock() + handler.sandboxDB.assignSandboxesToEntities.return_value = {"OK": True, "Value": 1} + + body = _body_with_input_sandbox("SB:SE|/p/opts.tar.bz2;local.txt") + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, body, "alice", "lhcb_prod" + ) + + assert res["OK"] + handler.sandboxDB.assignSandboxesToEntities.assert_called_once_with( + {"Transformation:321": [("SB:SE|/p/opts.tar.bz2", "Input")]}, "alice", "lhcb_prod" + ) + + +def test_assign_noSBRefsIsNoop(): + handler = MagicMock() + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, _body_with_input_sandbox("local.txt;lfn:/foo"), "alice", "g" + ) + assert res["OK"] + handler.sandboxDB.assignSandboxesToEntities.assert_not_called() + + +def test_assign_noInputSandboxParamIsNoop(): + handler = MagicMock() + wf = Workflow() + wf.setName("testprod") + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, wf.toXMLString(), "a", "g" + ) + assert res["OK"] + handler.sandboxDB.assignSandboxesToEntities.assert_not_called() + + +def test_assign_nonWorkflowBodyIsNoop(): + handler = MagicMock() + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, "not-a-workflow", "a", "g" + ) + assert res["OK"] + handler.sandboxDB.assignSandboxesToEntities.assert_not_called() + + +def test_assign_sbRefsButNoSandboxDBIsError(): + # SB: refs present but no SandboxMetadataDB: must NOT silently succeed, because the + # sandboxes would then be cleaned and break every downstream job. + handler = MagicMock() + handler.sandboxDB = None + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, _body_with_input_sandbox("SB:SE|/p/1"), "a", "g" + ) + assert not res["OK"] + + +def test_assign_dbErrorIsError(): + handler = MagicMock() + handler.sandboxDB.assignSandboxesToEntities.return_value = {"OK": False, "Message": "boom"} + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, _body_with_input_sandbox("SB:SE|/p/1"), "a", "g" + ) + assert not res["OK"] + assert "boom" in res["Message"] + handler.sandboxDB.assignSandboxesToEntities.assert_called_once() + + +def test_assign_multipleSBRefs(): + handler = MagicMock() + handler.sandboxDB.assignSandboxesToEntities.return_value = {"OK": True, "Value": 2} + body = _body_with_input_sandbox("SB:SE|/a.tar.bz2;local.txt;SB:SE|/b.tar.bz2") + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, body, "alice", "lhcb_prod" + ) + assert res["OK"] + handler.sandboxDB.assignSandboxesToEntities.assert_called_once_with( + {"Transformation:321": [("SB:SE|/a.tar.bz2", "Input"), ("SB:SE|/b.tar.bz2", "Input")]}, + "alice", + "lhcb_prod", + ) + + +def test_assign_acceptsListValuedInputSandbox(): + # Belt-and-suspenders: if a producer ever stores InputSandbox list-form rather + # than the canonical ';'-joined string, we still extract the SB: refs. + handler = MagicMock() + handler.sandboxDB.assignSandboxesToEntities.return_value = {"OK": True, "Value": 1} + + param = MagicMock() + param.getValue.return_value = ["SB:SE|/a.tar.bz2", "local.txt"] + workflow = MagicMock() + workflow.parameters.find.return_value = param + + with patch( + "DIRAC.TransformationSystem.Service.TransformationManagerHandler.fromXMLString", + return_value=workflow, + ): + res = TransformationManagerHandlerMixin._assignInputSandboxesToTransformation( + handler, 321, "", "alice", "lhcb_prod" + ) + + assert res["OK"] + handler.sandboxDB.assignSandboxesToEntities.assert_called_once_with( + {"Transformation:321": [("SB:SE|/a.tar.bz2", "Input")]}, "alice", "lhcb_prod" + ) + + +def test_addTransformation_rollsBackWhenPinningFails(): + # The critical safety property: if a transformation references sandboxes that + # cannot be pinned, the transformation must be deleted and an error returned, + # never left created-but-unpinned. + handler = MagicMock() + handler.transformationDB.addTransformation.return_value = {"OK": True, "Value": 4242} + handler.transformationDB.deleteTransformation.return_value = {"OK": True} + handler.getRemoteCredentials.return_value = { + "username": "alice", + "group": "lhcb_prod", + "properties": [SecurityProperty.PRODUCTION_MANAGEMENT], + } + handler._assignInputSandboxesToTransformation.return_value = {"OK": False, "Message": "no pin"} + + body = _body_with_input_sandbox("SB:SE|/p/opts.tar.bz2") + res = TransformationManagerHandlerMixin.export_addTransformation( + handler, "prod", "desc", "long", "MCSimulation", "Standard", "Manual", "", body=body + ) + + assert not res["OK"] + handler.transformationDB.deleteTransformation.assert_called_once_with(4242, author="alice") diff --git a/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py b/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py index 8208bf5c4eb..83346a567d9 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py @@ -44,3 +44,42 @@ def test_SandboxMetadataDB(): res = smDB.getUnusedSandboxes() assert res["OK"], res["Message"] + + +def test_SandboxMetadataDB_transformationAssignment(): + smDB = SandboxMetadataDB() + + owner = "adminusername" + ownerGroup = "dirac_admin" + VO = "vo" + + sbSE = "ProductionSandboxSE" + sbPFN = "/sb/pfn/trans1.tar.bz2" + + res = smDB.registerAndGetSandbox(owner, ownerGroup, VO, sbSE, sbPFN, 123) + assert res["OK"], res["Message"] + sbId, _newSandbox = res["Value"] + + entityId = "Transformation:999" + assignTo = {entityId: [(f"SB:{sbSE}|{sbPFN}", "Input")]} + + res = smDB.assignSandboxesToEntities(assignTo, owner, ownerGroup) + assert res["OK"], res["Message"] + assert res["Value"] == 1 + + # While the mapping exists the sandbox is reachable via the transformation entity + res = smDB.getSandboxesAssignedToEntity(entityId, owner, ownerGroup, VO) + assert res["OK"], res["Message"] + assert (sbSE, sbPFN, "Input") in res["Value"] + + # Removing the mapping leaves no rows for that entity + res = smDB.unassignEntities([entityId]) + assert res["OK"], res["Message"] + + res = smDB.getSandboxesAssignedToEntity(entityId, owner, ownerGroup, VO) + assert res["OK"], res["Message"] + assert not res["Value"] + + # cleanup + res = smDB.deleteSandboxes([sbId]) + assert res["OK"], res["Message"]