Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Disadvantages:
Several improvements have been made in the TS to handle scalability, and extensibility issues.
While the system structure remains intact, "tricks" like threading and caching have been extensively applied.

It's not possible to use ISB (Input Sandbox) to ship local files as for 'normal' Jobs (this should not be considered, anyway, a disadvantage).
Local files can't be shipped through the Input Sandbox the way the Job API does for 'normal' Jobs; a transformation instead references already-uploaded sandboxes — see `Input Sandboxes`_.

------------
Architecture
Expand Down Expand Up @@ -121,6 +121,54 @@ The complete list can be found in the `DIRAC project GitHub repository <https://

* Transformation: it wraps some functionalities mostly to use the 'TransformationClient' client

---------------
Input Sandboxes
---------------

A transformation's input sandboxes are the ``SB:`` PFNs listed in its body's
``InputSandbox`` workflow parameter (``;``-joined). The sandboxes must already be
uploaded to the SandboxStore; the body references them by PFN, and every job the
transformation creates receives them — so a production can share one pre-built
payload (a compiled binary, a large config) across all its jobs.

Set the ``InputSandbox`` parameter on the body **before** the transformation is
created — it is pinned at creation, so adding it afterwards would leave the
sandboxes unpinned. Use the Job API::

job.setInputSandbox(["SB:ProductionSandboxSE|/S3/.../payload.tar.bz2"])

or add the parameter to the job's workflow directly::

from DIRAC.Core.Workflow.Parameter import Parameter
job.workflow.addParameter(
Parameter("InputSandbox", ";".join(sbPFNs), "JDL", "", "", True, False, "Input sandbox file list")
)

then use that body to create the transformation, which pins the sandboxes::

t.setBody(job.workflow.toXML())
t.addTransformation()

Because a transformation may keep submitting jobs for a long time while the
SandboxStore cleaner reclaims any sandbox not assigned to an entity, the
Transformation System **pins** these sandboxes to the transformation:

- On creation, ``TransformationManager`` assigns each ``SB:`` reference in the
body to the entity ``Transformation:<id>`` in the ``SandboxMetadataDB``. While
that mapping exists the cleaner will not remove the sandbox.
- Pinning is mandatory: if it fails (``SandboxMetadataDB`` unreachable, or the
author is not authorised for a referenced sandbox), the transformation is
**rolled back and creation fails** — a created-but-unpinned transformation
would have its sandbox cleaned and then break every job submitted afterwards.
- On clean/archive, ``TransformationCleaningAgent`` unassigns
``Transformation:<id>`` so the cleaner can reclaim the now-unused sandboxes. If
the unassignment fails, the transformation's cleaning fails and is retried —
otherwise the sandboxes would stay pinned to a gone transformation and leak.
The unassignment is idempotent, so retrying is safe.

This requires the ``SandboxMetadataDB`` to be reachable from both the
``TransformationManager`` service and the ``TransformationCleaningAgent``.

-------------
Configuration
-------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, *args, **kwargs):
self.pilotAgentsDB = None
self.taskQueueDB = None
self.storageManagementDB = None
self.sandboxDB = None

# # transformations types
self.transformationTypes = None
Expand Down Expand Up @@ -153,6 +154,21 @@ def initialize(self):
except RuntimeError:
pass

# SandboxMetadataDB is used to unassign a transformation's input sandboxes at
# clean/archive time, releasing them for the sandbox-store cleaner. The agent
# keeps running if it can't load (the unassign step is skipped), but a failure
# to unassign during cleaning is logged loudly so a leaked assignment (which
# keeps the sandbox pinned forever) is noticed.
self.sandboxDB = None
try:
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB")
if result["OK"]:
self.sandboxDB = result["Value"](parentLogger=self.log)
else:
self.log.error("Could not load SandboxMetadataDB; sandbox unassignment disabled", result["Message"])
except RuntimeError as excp:
self.log.error("Could not connect to SandboxMetadataDB; sandbox unassignment disabled", str(excp))

return S_OK()

#############################################################################
Expand Down Expand Up @@ -514,6 +530,12 @@ def archiveTransformation(self, transID):
:param int transID: transformation ID
"""
self.log.info(f"Archiving transformation {transID}")
# Release the input-sandbox pin first; if it fails, fail the whole archive so
# it is retried — otherwise the sandboxes stay pinned to a gone transformation
# and leak. unassignEntities is idempotent, so the retry is safe.
res = self._unassignTransformationSandboxes(transID)
if not res["OK"]:
return res
# Clean the jobs in the WMS and any failover requests found
res = self.cleanTransformationTasks(transID)
if not res["OK"]:
Expand All @@ -531,11 +553,41 @@ def archiveTransformation(self, transID):
self.log.info(f"Updated status of transformation {transID} to Archived")
return S_OK()

def _unassignTransformationSandboxes(self, transID):
"""Remove a transformation's input-sandbox assignment at clean/archive time.

Drops the ``Transformation:<transID>`` mapping in the SandboxMetadataDB so the
sandbox-store cleaner can reclaim the (now unused) sandboxes. Returns S_ERROR on
failure so the caller can fail the cleaning and retry: leaving the assignment in
place would pin the sandboxes forever and leak them. ``unassignEntities`` is
idempotent, so retrying — including after the mapping is already gone — is safe.

:param int transID: transformation ID
:returns: S_OK / S_ERROR
"""
if not self.sandboxDB:
return S_OK()
try:
result = self.sandboxDB.unassignEntities([f"Transformation:{transID}"])
except Exception as excp: # pylint: disable=broad-except
self.log.exception("Unexpected error unassigning sandboxes for transformation", str(transID))
return S_ERROR(f"Unexpected error unassigning sandboxes for transformation {transID}: {excp}")
if not result["OK"]:
self.log.error("Could not unassign sandboxes for transformation", f"{transID}: {result['Message']}")
return S_ERROR(f"Could not unassign sandboxes for transformation {transID}: {result['Message']}")
return S_OK()

def cleanTransformation(self, transID):
"""This removes what was produced by the supplied transformation,
leaving only some info and log in the transformation DB.
"""
self.log.info("Cleaning transformation", transID)
# Release the input-sandbox pin first; if it fails, fail the whole clean so it
# is retried — otherwise the sandboxes stay pinned to a gone transformation and
# leak. unassignEntities is idempotent, so the retry is safe.
res = self._unassignTransformationSandboxes(transID)
if not res["OK"]:
return res
res = self.getTransformationDirectories(transID)
if not res["OK"]:
self.log.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# pylint: disable=missing-docstring
from unittest.mock import MagicMock

from DIRAC.TransformationSystem.Agent.TransformationCleaningAgent import TransformationCleaningAgent

# The agent's __init__ needs a running config, so these exercise the methods with a
# stand-in ``self`` (a MagicMock) carrying only the attributes the methods touch.


def test_unassign_callsDBWithTransformationEntity():
agent = MagicMock()
agent.sandboxDB.unassignEntities.return_value = {"OK": True, "Value": 1}

res = TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345)

assert res["OK"]
agent.sandboxDB.unassignEntities.assert_called_once_with(["Transformation:12345"])
agent.log.error.assert_not_called()
agent.log.exception.assert_not_called()


def test_unassign_noDBisOk():
agent = MagicMock()
agent.sandboxDB = None
# Nothing to unassign: succeed so cleaning is not blocked in DB-less deployments.
assert TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345)["OK"]


def test_unassign_dbErrorIsLoggedAndReturnsError():
agent = MagicMock()
agent.sandboxDB.unassignEntities.return_value = {"OK": False, "Message": "boom"}

# A failed unassignment must be loud AND returned as an error so the caller fails
# the cleaning and retries (otherwise the sandboxes leak).
res = TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345)

assert not res["OK"]
agent.sandboxDB.unassignEntities.assert_called_once_with(["Transformation:12345"])
agent.log.error.assert_called_once()


def test_unassign_unexpectedExceptionIsLoggedAndReturnsError():
agent = MagicMock()
agent.sandboxDB.unassignEntities.side_effect = RuntimeError("db down")

# An unexpected exception must not propagate, but must be logged loudly and returned
# as an error so the cleaning is retried.
res = TransformationCleaningAgent._unassignTransformationSandboxes(agent, 12345)

assert not res["OK"]
agent.log.exception.assert_called_once()


def test_cleanTransformation_failsWhenUnassignFails():
# The whole clean must fail (and be retried) if the sandbox can't be unassigned,
# rather than proceeding and leaking the now-orphaned sandbox assignment.
agent = MagicMock()
agent._unassignTransformationSandboxes.return_value = {"OK": False, "Message": "boom"}

res = TransformationCleaningAgent.cleanTransformation(agent, 12345)

assert not res["OK"]
agent.getTransformationDirectories.assert_not_called()
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.JEncode import encode as jencode
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.Workflow.Workflow import fromXMLString


class TransformationManagerHandlerMixin:
sandboxDB = None

@classmethod
def initializeHandler(cls, serviceInfoDict):
"""Initialization of DB object"""
Expand All @@ -25,6 +28,24 @@ def initializeHandler(cls, serviceInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to TransformationDB: {excp}")

# SandboxMetadataDB is used to pin a transformation's input sandboxes so the
# sandbox-store cleaner does not remove them while the transformation is alive.
# We don't stop the service if it can't load (most transformations have no
# input sandboxes to pin), but a transformation that *does* reference sandboxes
# will be refused at creation time rather than left unpinned (see
# _assignInputSandboxesToTransformation).
cls.sandboxDB = None
try:
sbResult = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB")
if sbResult["OK"]:
cls.sandboxDB = sbResult["Value"](parentLogger=cls.log)
else:
cls.log.error(
"Could not load SandboxMetadataDB; input-sandbox pinning unavailable", sbResult["Message"]
)
except RuntimeError as excp:
cls.log.error("Could not connect to SandboxMetadataDB; input-sandbox pinning unavailable", str(excp))

return S_OK()

def checkPermissions(self, transName: str):
Expand Down Expand Up @@ -117,10 +138,80 @@ def export_addTransformation(
inputMetaQuery=inputMetaQuery,
outputMetaQuery=outputMetaQuery,
)
if res["OK"]:
self.log.info("Added transformation", res["Value"])
if not res["OK"]:
return res
transID = res["Value"]
self.log.info("Added transformation", transID)

# Pin any input sandboxes to the transformation. If this fails we must not
# leave a created-but-unpinned transformation: the sandbox would be cleaned
# on the usual timescale and every (1000s-10000s of) job submitted afterwards
# would fail to find it. Roll the transformation back and report the error.
assignRes = self._assignInputSandboxesToTransformation(transID, body, author, authorGroup)
if not assignRes["OK"]:
self.log.error(
"Could not pin input sandboxes; rolling back transformation", f"{transID}: {assignRes['Message']}"
)
delRes = self.transformationDB.deleteTransformation(transID, author=author)
if not delRes["OK"]:
self.log.error("Rollback failed; transformation left unpinned", f"{transID}: {delRes['Message']}")
return S_ERROR(
f"Could not pin input sandboxes ({assignRes['Message']}) and rollback failed "
f"({delRes['Message']}); transformation {transID} is orphaned and unpinned"
)
return S_ERROR(
f"Could not pin input sandboxes; transformation {transID} rolled back: {assignRes['Message']}"
)
return res

def _assignInputSandboxesToTransformation(self, transID, body, author, authorGroup):
"""Pin a transformation's input sandboxes so the sandbox-store cleaner does
not remove them while the transformation is alive.

Reads ``SB:`` references from the workflow body's ``InputSandbox`` parameter
and writes a ``Transformation:<id>`` mapping under the author's credentials
(the author uploaded the sandboxes, so ``getSandboxId`` authorises them).

Returns ``S_OK`` when there is nothing to pin (no body, no ``InputSandbox``
parameter, or no ``SB:`` references). When the body *does* reference sandboxes,
a failure to pin them is fatal and returned as ``S_ERROR``: an unpinned sandbox
is cleaned on the usual timescale and breaks every job submitted afterwards, so
the caller must roll the transformation back rather than create it unpinned.

:param int transID: the created transformation ID
:param str body: the transformation body (workflow XML)
:param str author: requester username (the submitter/owner)
:param str authorGroup: requester group
"""
if not body:
return S_OK()
try:
workflow = fromXMLString(body)
param = workflow.parameters.find("InputSandbox")
except Exception as excp: # pylint: disable=broad-except # body may not be a workflow
self.log.verbose("Could not parse transformation body for InputSandbox", str(excp))
return S_OK()
if not param:
return S_OK()
# InputSandbox is canonically a ';'-joined string, but accept a list too in
# case a future producer stores it list-form.
value = param.getValue()
entries = value if isinstance(value, list) else str(value).split(";")
sbRefs = [sb for sb in entries if isinstance(sb, str) and sb.startswith("SB:")]
if not sbRefs:
return S_OK()

# From here the transformation has sandboxes that MUST be pinned.
if not self.sandboxDB:
return S_ERROR("SandboxMetadataDB unavailable; cannot pin input sandboxes")
result = self.sandboxDB.assignSandboxesToEntities(
{f"Transformation:{transID}": [(sb, "Input") for sb in sbRefs]}, author, authorGroup
)
if not result["OK"]:
return S_ERROR(f"Could not assign input sandboxes: {result['Message']}")
self.log.info("Assigned input sandboxes to transformation", f"{transID} ({result['Value']}/{len(sbRefs)})")
return S_OK()

types_deleteTransformation = [[int, str]]

def export_deleteTransformation(self, transName):
Expand Down
Loading
Loading