From 3c9d4202b178f6be1eea51a27425bfce1f58835f Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Fri, 13 Feb 2026 16:59:39 +0000 Subject: [PATCH 1/7] Add strategy service and trigger --- setup.py | 1 + src/dlstbx/services/strategy.py | 170 ++++++++++++++++++++++++++++++++ src/dlstbx/services/trigger.py | 57 +++++++++++ 3 files changed, 228 insertions(+) create mode 100644 src/dlstbx/services/strategy.py diff --git a/setup.py b/setup.py index 97d80d313..494011aa4 100644 --- a/setup.py +++ b/setup.py @@ -119,6 +119,7 @@ "DLSNotifyGDA = dlstbx.services.notifygda:DLSNotifyGDA", "DLSPerImageAnalysis = dlstbx.services.per_image_analysis:DLSPerImageAnalysis", "DLSReverseBridge = dlstbx.services.bridge_reverse:DLSReverseBridge", + "DLSStrategy = dlstbx.services.strategy:DLSStrategy", "DLSTrigger = dlstbx.services.trigger:DLSTrigger", "DLSTriggerXChem = dlstbx.services.trigger_xchem:DLSTriggerXChem", "DLSValidation = dlstbx.services.validation:DLSValidation", diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py new file mode 100644 index 000000000..d425246f4 --- /dev/null +++ b/src/dlstbx/services/strategy.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import workflows.recipe +from workflows.services.common_service import CommonService + +from dlstbx.util import ChainMapWithReplacement + + +def apply_limit(parameter: float, limits: tuple[float, float]) -> float: + return max(limits[0], min(limits[1], parameter)) + + +def scale_parameter(value: float, scale_factor: float, limits) -> tuple[float, float]: + ref_value = value * scale_factor + scaled_value = apply_limit(ref_value, limits) + if scaled_value == 0: + raise ValueError("Scaled value cannot be zero") + inverse_scale_factor = ref_value / scaled_value + return (scaled_value, inverse_scale_factor) + + +def get_resolution_scale(resolution: float) -> float: + return resolution**2 - 0.4 * resolution + 0.5 + + +def get_wavelength_scale(wavelength: float, default_wavelength: float) -> float: + return (default_wavelength / wavelength) ** 2 + + +class DLSStrategy(CommonService): + """Service for creating data collection strategies.""" + + # Human readable service name + _service_name = "Strategy" + + # Logger name + _logger_name = "dlstbx.services.strategy" + + def initializing(self): + """Subscribe to channel.""" + self.log.info("Strategy service starting") + workflows.recipe.wrap_subscribe( + self._transport, + "strategy", + self.generate_strategy, + acknowledgement=True, + log_extender=self.extend_log, + ) + + def generate_strategy( + self, rw: workflows.recipe.RecipeWrapper, header: dict, message: dict + ): + """Generate a strategy from the results of an upstream pipeline""" + self.log.info("Received strategy request, generating strategy") + parameters = ChainMapWithReplacement( + message.get("parameters", {}) if isinstance(message, dict) else {}, + rw.recipe_step.get("parameters", {}), + substitutions=rw.environment, + ) + self.log.info(f"Received parameters for strategy generation:\n{parameters}") + # Conditionally acknowledge receipt of the message + txn = self._transport.transaction_begin(subscription_id=header["subscription"]) + self._transport.ack(header, transaction=txn) + + wavelength = parameters["wavelength"] + default_wavelength = parameters["default_wavelength"] + resolution = max(parameters["resolution"] - 0.5, 0.9) + scale = 1.0 + scale *= get_wavelength_scale(wavelength, default_wavelength) + self.log.info(f"Scale factor from wavelength: {scale:.3f}") + scale *= get_resolution_scale(resolution) + self.log.info(f"Scale factor from resolution: {scale:.3f}") + + tranmission_limits = (0.0, 1.0) + exposure_time_limits = (0.01, 1.0) + transmission = 0.1 + exposure_time = 0.1 + + # Runs twice to ensure that limits are applied correctly to both parameters, as they are interdependent - is this necessary? + for _ in range(2): + if scale > 1.0: + transmission, scale = scale_parameter( + transmission, scale, tranmission_limits + ) + exposure_time, scale = scale_parameter( + exposure_time, scale, exposure_time_limits + ) + else: + exposure_time, scale = scale_parameter( + exposure_time, scale, exposure_time_limits + ) + transmission, scale = scale_parameter( + transmission, scale, tranmission_limits + ) + self.log.info( + f"Exposure time scaled to {exposure_time:.3f} s, transmission scaled to {transmission:.3f}, scale factor now {scale:.3f}" + ) + + ispyb_command_list = [] + + # Step 1: Store screeningOutput results, linked to the screeningId + # Keep the screeningOutputId + d = { + "program": "udc-strategy", + "strategysuccess": 1, + "ispyb_command": "insert_screening_output", + "screening_id": "$ispyb_screening_id", + "store_result": "ispyb_screening_output_id", + } + ispyb_command_list.append(d) + + # Step 2: Store screeningStrategy results, linked to the screeningOutputId + # Keep the screeningStrategyId + d = { + "program": "udc-strategy", + "ispyb_command": "insert_screening_strategy", + "screening_output_id": "$ispyb_screening_output_id", + "store_result": "ispyb_screening_strategy_id", + } + ispyb_command_list.append(d) + + # Step 3: Store screeningStrategyWedge results, linked to the screeningStrategyId + # Keep the screeningStrategyWedgeId + d = { + "wedgenumber": 1, + "resolution": resolution, + "phi": 0.0, + "chi": 0.0, + "ispyb_command": "insert_screening_strategy_wedge", + "screening_strategy_id": "$ispyb_screening_strategy_id", + "store_result": "ispyb_screening_strategy_wedge_id", + } + ispyb_command_list.append(d) + + # Step 4: Store second screeningStrategySubWedge results, linked to the screeningStrategyWedgeId + # Keep the screeningStrategyWedgeId + d = { + "subwedgenumber": 1, + "rotationaxis": "Omega", + "axisstart": 0.0, + "axisend": 360.0, + "exposuretime": exposure_time, + "transmission": transmission, + "oscillationrange": 0.1, + "numberOfImages": 3600, + "resolution": resolution, + "chi": 0.0, + "ispyb_command": "insert_screening_strategy_sub_wedge", + "screening_strategy_wedge_id": "$ispyb_screening_strategy_wedge_id", + "store_result": "ispyb_screening_strategy_sub_wedge_id", + } + ispyb_command_list.append(d) + + d = { + "ispyb_command": "update_processing_status", + "program_id": "$ispyb_autoprocprogram_id", + "message": "Processing successful", + "status": "success", + } + ispyb_command_list.append(d) + + # Send results onwards + rw.set_default_channel("ispyb") + rw.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}, transaction=txn) + self.log.info(f"Sent {len(ispyb_command_list)} commands to ISPyB") + self.log.debug(f"Commands sent to ISPyB:\n{ispyb_command_list}") + + # Commit transaction + self._transport.transaction_commit(txn) + self.log.info("Strategy generation complete") diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index d7257aad0..474a0fe75 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -272,6 +272,14 @@ class AlignCrystalParameters(pydantic.BaseModel): symlink: str = pydantic.Field(default="") +class StrategyParameters(pydantic.BaseModel): + dcid: int = pydantic.Field(gt=0) + comment: Optional[str] = None + experiment_type: str + wavelength: float = pydantic.Field(gt=0) + default_wavelength: float = pydantic.Field(gt=0) + + class DLSTrigger(CommonService): """A service that creates and runs downstream processing jobs.""" @@ -2780,3 +2788,52 @@ def trigger_align_crystal( self.log.info(f"Align_crystal trigger: Processing job {jobid} triggered") return {"success": True, "return_value": jobid} + + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) + def trigger_strategy( + self, + rw: workflows.recipe.RecipeWrapper, + *, + parameters: StrategyParameters, + session: sqlalchemy.orm.session.Session, + **kwargs, + ): + if parameters.experiment_type != "Characterization": + self.log.info( + f"Skipping strategy trigger: experiment type {parameters.experiment_type} not supported" + ) + return {"success": True} + + resolution = 2.2 # TODO - get resolution from parameters or from ispyb + + jp = self.ispyb.mx_processing.get_job_params() + jp["comments"] = parameters.comment + jp["datacollectionid"] = parameters.dcid + jp["display_name"] = "udc-strategy" + jp["recipe"] = "postprocessing-udc-strategy" + self.log.info(jp) + jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) + self.log.debug(f"Strategy trigger: generated JobID {jobid}") + + strategy_parameters = { + "resolution": resolution, + "wavelength": parameters.wavelength, + "default_wavelength": parameters.default_wavelength, + } + + for key, value in strategy_parameters.items(): + jpp = self.ispyb.mx_processing.get_job_parameter_params() + jpp["job_id"] = jobid + jpp["parameter_key"] = key + jpp["parameter_value"] = value + jppid = self.ispyb.mx_processing.upsert_job_parameter(list(jpp.values())) + self.log.debug( + f"Strategy trigger: generated JobParameterID {jppid} with {key}={value}" + ) + + message = {"recipes": [], "parameters": {"ispyb_process": jobid}} + rw.transport.send("processing_recipe", message) + + self.log.info(f"Strategy trigger: Processing job {jobid} triggered") + + return {"success": True, "return_value": jobid} From e47d6a46deb39c567d8b019251afcbe9fc1a5b82 Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Thu, 19 Feb 2026 16:33:18 +0000 Subject: [PATCH 2/7] Read in params from agamemnon recipes --- src/dlstbx/services/strategy.py | 256 ++++++++++++++++++++------------ src/dlstbx/services/trigger.py | 6 +- 2 files changed, 162 insertions(+), 100 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index d425246f4..2f32e4553 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -1,6 +1,10 @@ from __future__ import annotations +from pathlib import Path + import workflows.recipe +import yaml +from pydantic import BaseModel, Field, ValidationError from workflows.services.common_service import CommonService from dlstbx.util import ChainMapWithReplacement @@ -10,9 +14,11 @@ def apply_limit(parameter: float, limits: tuple[float, float]) -> float: return max(limits[0], min(limits[1], parameter)) -def scale_parameter(value: float, scale_factor: float, limits) -> tuple[float, float]: +def scale_parameter( + value: float, scale_factor: float, limits: tuple[float, float] | None = None +) -> tuple[float, float]: ref_value = value * scale_factor - scaled_value = apply_limit(ref_value, limits) + scaled_value = apply_limit(ref_value, limits) if limits else ref_value if scaled_value == 0: raise ValueError("Scaled value cannot be zero") inverse_scale_factor = ref_value / scaled_value @@ -27,6 +33,31 @@ def get_wavelength_scale(wavelength: float, default_wavelength: float) -> float: return (default_wavelength / wavelength) ** 2 +def parse_agamemnon_recipe(recipe_path: Path) -> list[dict]: + with open(recipe_path, "r") as f: + recipe = yaml.safe_load(f) + return recipe + + +class AgamemnonParameters(BaseModel): + chi: float + comment: str + exposure_time: float = Field(gt=0) + dose: float = Field(gt=0) + kappa: float + number_of_images: int = Field(gt=0) + omega_increment: float = Field(gt=0) + omega_overlap: float + omega_start: float + phi_increment: float + phi_overlap: float + phi_start: float + scan_axis: str + transmission: float = Field(gt=0) + two_theta: float + wavelength: float = Field(gt=0) + + class DLSStrategy(CommonService): """Service for creating data collection strategies.""" @@ -54,7 +85,7 @@ def generate_strategy( self.log.info("Received strategy request, generating strategy") parameters = ChainMapWithReplacement( message.get("parameters", {}) if isinstance(message, dict) else {}, - rw.recipe_step.get("parameters", {}), + rw.recipe_step["parameters"].get("ispyb_parameters", {}), substitutions=rw.environment, ) self.log.info(f"Received parameters for strategy generation:\n{parameters}") @@ -62,102 +93,131 @@ def generate_strategy( txn = self._transport.transaction_begin(subscription_id=header["subscription"]) self._transport.ack(header, transaction=txn) - wavelength = parameters["wavelength"] - default_wavelength = parameters["default_wavelength"] - resolution = max(parameters["resolution"] - 0.5, 0.9) - scale = 1.0 - scale *= get_wavelength_scale(wavelength, default_wavelength) - self.log.info(f"Scale factor from wavelength: {scale:.3f}") - scale *= get_resolution_scale(resolution) - self.log.info(f"Scale factor from resolution: {scale:.3f}") - - tranmission_limits = (0.0, 1.0) - exposure_time_limits = (0.01, 1.0) - transmission = 0.1 - exposure_time = 0.1 - - # Runs twice to ensure that limits are applied correctly to both parameters, as they are interdependent - is this necessary? - for _ in range(2): - if scale > 1.0: - transmission, scale = scale_parameter( - transmission, scale, tranmission_limits - ) - exposure_time, scale = scale_parameter( - exposure_time, scale, exposure_time_limits - ) - else: - exposure_time, scale = scale_parameter( - exposure_time, scale, exposure_time_limits - ) - transmission, scale = scale_parameter( - transmission, scale, tranmission_limits + beamline = ( + parameters["beamline"][0] + if isinstance(parameters["beamline"], list) + else parameters["beamline"] + ) + wavelength = ( + float(parameters["wavelength"][0]) + if isinstance(parameters["wavelength"], list) + else float(parameters["wavelength"]) + ) + resolution_estimate = ( + float(parameters["resolution"][0]) + if isinstance(parameters["resolution"], list) + else float(parameters["resolution"]) + ) + resolution = max((resolution_estimate) - 0.5, 0.9) + # TODO get limits from filesystem + tranmission_limits = (0.0001, 1.0) + exposure_time_limits = (0.001, 1.0) + + recipes = ("OSC.yaml", "Ligand binding.yaml") + recipe_aliases = {"OSC.yaml": "OSC", "Ligand binding.yaml": "Ligand"} + + for recipe in recipes: + ispyb_command_list = [] + recipe_path = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes/{recipe}") + recipe_steps = parse_agamemnon_recipe(recipe_path) + + # Step 1: Create screeningOutput record for recipe, linked to the screeningId + # Keep the screeningOutputId + d = { + "program": "udc-strategy", + "strategysuccess": 1, + "ispyb_command": "insert_screening_output", + "screening_id": "$ispyb_screening_id", + "store_result": "ispyb_screening_output_id", + } + ispyb_command_list.append(d) + + # Step 2: Store screeningStrategy results, linked to the screeningOutputId + # Keep the screeningStrategyId + d = { + "program": f"udc-strategy: {recipe_aliases[recipe]}", + "ispyb_command": "insert_screening_strategy", + "screening_output_id": "$ispyb_screening_output_id", + "store_result": "ispyb_screening_strategy_id", + } + ispyb_command_list.append(d) + + for n_step, recipe_step in enumerate(recipe_steps, start=1): + try: + recipe_step = AgamemnonParameters(**recipe_step) + except ValidationError as e: + self.log.error(f"Invalid recipe step in {recipe_path}: {e}") + # TODO handle this error - Send a message to ISPyB to log the failure and skip the rest of the recipe steps. + break + scale = 1.0 + default_wavelength = recipe_step.wavelength + scale *= get_wavelength_scale(wavelength, default_wavelength) + scale *= get_resolution_scale(resolution) + + dose, _ = scale_parameter(recipe_step.dose, scale) + + rotation_axis = recipe_step.scan_axis + rotation_start = recipe_step.__getattribute__(f"{rotation_axis}_start") + rotation_increment = recipe_step.__getattribute__( + f"{rotation_axis}_increment" ) - self.log.info( - f"Exposure time scaled to {exposure_time:.3f} s, transmission scaled to {transmission:.3f}, scale factor now {scale:.3f}" - ) - - ispyb_command_list = [] - - # Step 1: Store screeningOutput results, linked to the screeningId - # Keep the screeningOutputId - d = { - "program": "udc-strategy", - "strategysuccess": 1, - "ispyb_command": "insert_screening_output", - "screening_id": "$ispyb_screening_id", - "store_result": "ispyb_screening_output_id", - } - ispyb_command_list.append(d) - - # Step 2: Store screeningStrategy results, linked to the screeningOutputId - # Keep the screeningStrategyId - d = { - "program": "udc-strategy", - "ispyb_command": "insert_screening_strategy", - "screening_output_id": "$ispyb_screening_output_id", - "store_result": "ispyb_screening_strategy_id", - } - ispyb_command_list.append(d) - - # Step 3: Store screeningStrategyWedge results, linked to the screeningStrategyId - # Keep the screeningStrategyWedgeId - d = { - "wedgenumber": 1, - "resolution": resolution, - "phi": 0.0, - "chi": 0.0, - "ispyb_command": "insert_screening_strategy_wedge", - "screening_strategy_id": "$ispyb_screening_strategy_id", - "store_result": "ispyb_screening_strategy_wedge_id", - } - ispyb_command_list.append(d) - - # Step 4: Store second screeningStrategySubWedge results, linked to the screeningStrategyWedgeId - # Keep the screeningStrategyWedgeId - d = { - "subwedgenumber": 1, - "rotationaxis": "Omega", - "axisstart": 0.0, - "axisend": 360.0, - "exposuretime": exposure_time, - "transmission": transmission, - "oscillationrange": 0.1, - "numberOfImages": 3600, - "resolution": resolution, - "chi": 0.0, - "ispyb_command": "insert_screening_strategy_sub_wedge", - "screening_strategy_wedge_id": "$ispyb_screening_strategy_wedge_id", - "store_result": "ispyb_screening_strategy_sub_wedge_id", - } - ispyb_command_list.append(d) - - d = { - "ispyb_command": "update_processing_status", - "program_id": "$ispyb_autoprocprogram_id", - "message": "Processing successful", - "status": "success", - } - ispyb_command_list.append(d) + transmission = recipe_step.transmission + exposure_time = recipe_step.exposure_time + + # Runs twice to ensure that limits are applied correctly to both parameters, as they are interdependent - is this necessary? + for _ in range(2): + if scale > 1.0: + transmission, scale = scale_parameter( + transmission, scale, limits=tranmission_limits + ) + exposure_time, scale = scale_parameter( + exposure_time, scale, limits=exposure_time_limits + ) + else: + exposure_time, scale = scale_parameter( + exposure_time, scale, exposure_time_limits + ) + transmission, scale = scale_parameter( + transmission, scale, limits=tranmission_limits + ) + self.log.debug( + f"Exposure time scaled to {exposure_time:.3f} s, transmission scaled to {transmission:.3f}, scale factor now {scale:.3f}" + ) + + # Step 3: Store screeningStrategyWedge results, linked to the screeningStrategyId + # Keep the screeningStrategyWedgeId + d = { + "wedgenumber": n_step, + "resolution": resolution, + "phi": recipe_step.phi_start, + "chi": recipe_step.chi, + "kappa": recipe_step.kappa, + "wavelength": wavelength, + "dosetotal": dose, + "ispyb_command": "insert_screening_strategy_wedge", + "screening_strategy_id": "$ispyb_screening_strategy_id", + "store_result": f"ispyb_screening_strategy_wedge_id_{n_step}", + } + ispyb_command_list.append(d) + + # Step 4: Store second screeningStrategySubWedge results, linked to the screeningStrategyWedgeId + # Keep the screeningStrategyWedgeId + d = { + "subwedgenumber": 1, + "rotationaxis": recipe_step.scan_axis, + "axisstart": rotation_start, + "axisend": rotation_start + + rotation_increment * recipe_step.number_of_images, + "exposuretime": exposure_time, + "transmission": transmission, + "oscillationrange": rotation_increment, + "numberOfImages": recipe_step.number_of_images, + "resolution": resolution, + "ispyb_command": "insert_screening_strategy_sub_wedge", + "screening_strategy_wedge_id": f"$ispyb_screening_strategy_wedge_id_{n_step}", + "store_result": f"ispyb_screening_strategy_sub_wedge_id_{n_step}", + } + ispyb_command_list.append(d) # Send results onwards rw.set_default_channel("ispyb") diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 474a0fe75..77611ba3c 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -274,10 +274,10 @@ class AlignCrystalParameters(pydantic.BaseModel): class StrategyParameters(pydantic.BaseModel): dcid: int = pydantic.Field(gt=0) + beamline: str comment: Optional[str] = None experiment_type: str wavelength: float = pydantic.Field(gt=0) - default_wavelength: float = pydantic.Field(gt=0) class DLSTrigger(CommonService): @@ -2804,6 +2804,8 @@ def trigger_strategy( ) return {"success": True} + # TODO Add check to see if UDC strategy has already run for this data collection. + resolution = 2.2 # TODO - get resolution from parameters or from ispyb jp = self.ispyb.mx_processing.get_job_params() @@ -2816,9 +2818,9 @@ def trigger_strategy( self.log.debug(f"Strategy trigger: generated JobID {jobid}") strategy_parameters = { + "beamline": parameters.beamline, "resolution": resolution, "wavelength": parameters.wavelength, - "default_wavelength": parameters.default_wavelength, } for key, value in strategy_parameters.items(): From 6172aedd4d5b9097df987856173b8ac8305f138d Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Fri, 20 Feb 2026 16:09:50 +0000 Subject: [PATCH 3/7] Get beamline limits from filesystem --- src/dlstbx/services/strategy.py | 126 ++++++++++++++++++++++++++++---- 1 file changed, 112 insertions(+), 14 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index 2f32e4553..e99d89a84 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -10,19 +10,27 @@ from dlstbx.util import ChainMapWithReplacement -def apply_limit(parameter: float, limits: tuple[float, float]) -> float: - return max(limits[0], min(limits[1], parameter)) - - def scale_parameter( value: float, scale_factor: float, limits: tuple[float, float] | None = None ) -> tuple[float, float]: + def apply_limit(parameter: float, limits: tuple[float, float]) -> float: + lower_limit, upper_limit = limits + if lower_limit is not None: + parameter = max(lower_limit, parameter) + if upper_limit is not None: + parameter = min(upper_limit, parameter) + return parameter + ref_value = value * scale_factor - scaled_value = apply_limit(ref_value, limits) if limits else ref_value + if limits is not None: + scaled_value = apply_limit(ref_value, limits) + else: + scaled_value = ref_value if scaled_value == 0: raise ValueError("Scaled value cannot be zero") - inverse_scale_factor = ref_value / scaled_value - return (scaled_value, inverse_scale_factor) + # Scale factor to apply to opposite parameter to achieve the desired scaling effect, accounting for limits + corrective_scale_factor = ref_value / scaled_value + return scaled_value, corrective_scale_factor def get_resolution_scale(resolution: float) -> float: @@ -39,6 +47,59 @@ def parse_agamemnon_recipe(recipe_path: Path) -> list[dict]: return recipe +def parse_config_file(config_file: Path) -> dict: + config = {} + + for record in open(config_file, errors="ignore"): + if "#" in record: + record = record.split("#")[0] + record = record.strip() + if not record: + continue + if "=" not in record: + continue + + key, value = record.split("=") + key = key.strip() + value = value.strip() + + if key == "include": + if value.startswith(".."): + include = config_file.parent / value + name = Path(value).name.split(".")[0] + included = parse_config_file(include) + for k in included: + config[f"{name}.{k}"] = included[k] + continue + + config[key] = value + # Resolve references to other variables + for key, val in config.items(): + if isinstance(val, str) and val[:2] == "${" and val[-1] == "}": + try: + config[key] = config[val[2:-1]] + except KeyError: + continue + return config + + +LimitMapping = tuple[str, tuple[str, str]] +LIMITS_MAPPINGS_LIST: list[LimitMapping] = [ + ( + "exposure_time", + ("gda.exptTableModel.minImageTime", "gda.exptTableModel.maxImageTime"), + ), + ( + "exposure_time", + ("gda.mx.udc.minImageTime", "gda.mx.udc.maxImageTime"), + ), + ( + "transmission", + ("gda.mx.udc.minTransmission", "gda.mx.udc.maxTransmission"), + ), +] + + class AgamemnonParameters(BaseModel): chi: float comment: str @@ -109,15 +170,51 @@ def generate_strategy( else float(parameters["resolution"]) ) resolution = max((resolution_estimate) - 0.5, 0.9) - # TODO get limits from filesystem - tranmission_limits = (0.0001, 1.0) - exposure_time_limits = (0.001, 1.0) + + # beamline_limits = { + # "exposure_time": (None, None), + # "transmission": (0.0001, 1.0) + # } + + beamline_config = parse_config_file( + Path( + f"/dls_sw/{beamline}/software/daq_configuration/domain/domain.properties" + ) + ) + # TODO - Refactor these monstrocities + transmission_limits = ( + float(beamline_config.get("gda.mx.udc.minTransmission", 0.0)), + float(beamline_config.get("gda.mx.udc.maxTransmission", 1.0)), + ) + exposure_time_limits = ( + float( + beamline_config.get( + "gda.mx.udc.minImageTime", + beamline_config.get("gda.exptTableModel.minImageTime", 0.0), + ) + ), + float( + beamline_config.get( + "gda.mx.udc.maxImageTime", + beamline_config.get( + "gda.exptTableModel.maxImageTime", float("inf") + ), + ) + ), + ) + + # for mapping in LIMITS_MAPPINGS_LIST: + # parameter_name, keys = mapping + # for index, key in enumerate(keys): + # if key in beamline_config: + # beamline_limits[parameter_name][index] = float(beamline_config[key]) + # self.log.debug(f"Limits for {parameter_name} set to {beamline_limits[parameter_name]} from keys {keys}") recipes = ("OSC.yaml", "Ligand binding.yaml") recipe_aliases = {"OSC.yaml": "OSC", "Ligand binding.yaml": "Ligand"} + ispyb_command_list = [] for recipe in recipes: - ispyb_command_list = [] recipe_path = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes/{recipe}") recipe_steps = parse_agamemnon_recipe(recipe_path) @@ -168,17 +265,17 @@ def generate_strategy( for _ in range(2): if scale > 1.0: transmission, scale = scale_parameter( - transmission, scale, limits=tranmission_limits + transmission, scale, limits=transmission_limits ) exposure_time, scale = scale_parameter( exposure_time, scale, limits=exposure_time_limits ) else: exposure_time, scale = scale_parameter( - exposure_time, scale, exposure_time_limits + exposure_time, scale, limits=exposure_time_limits ) transmission, scale = scale_parameter( - transmission, scale, limits=tranmission_limits + transmission, scale, limits=transmission_limits ) self.log.debug( f"Exposure time scaled to {exposure_time:.3f} s, transmission scaled to {transmission:.3f}, scale factor now {scale:.3f}" @@ -194,6 +291,7 @@ def generate_strategy( "kappa": recipe_step.kappa, "wavelength": wavelength, "dosetotal": dose, + "comments": recipe_aliases[recipe], "ispyb_command": "insert_screening_strategy_wedge", "screening_strategy_id": "$ispyb_screening_strategy_id", "store_result": f"ispyb_screening_strategy_wedge_id_{n_step}", From 12827639f894056207521d7a867d1a09bee59092 Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Tue, 24 Feb 2026 13:50:03 +0000 Subject: [PATCH 4/7] Get resolution from query --- src/dlstbx/services/trigger.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 77611ba3c..70e9ef411 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -21,13 +21,14 @@ AutoProcProgramAttachment, AutoProcScaling, AutoProcScalingHasInt, + AutoProcScalingStatistics, BLSession, DataCollection, ProcessingJob, Proposal, Protein, ) -from sqlalchemy import or_ +from sqlalchemy import func, or_ from sqlalchemy.orm import Load, contains_eager, joinedload from workflows.recipe.wrapper import RecipeWrapper from workflows.services.common_service import CommonService @@ -277,6 +278,7 @@ class StrategyParameters(pydantic.BaseModel): beamline: str comment: Optional[str] = None experiment_type: str + program_id: int = pydantic.Field(gt=0) wavelength: float = pydantic.Field(gt=0) @@ -2804,9 +2806,28 @@ def trigger_strategy( ) return {"success": True} - # TODO Add check to see if UDC strategy has already run for this data collection. + # Get resolution estimate from ispyb records for upstream pipeline - returns None if not found. + resolution = ( + session.query(func.min(AutoProcScalingStatistics.resolutionLimitHigh)) + .join( + AutoProcScaling, + AutoProcScaling.autoProcScalingId + == AutoProcScalingStatistics.autoProcScalingId, + ) + .join(AutoProc, AutoProc.autoProcId == AutoProcScaling.autoProcId) + .join( + AutoProcProgram, + AutoProcProgram.autoProcProgramId == AutoProc.autoProcProgramId, + ) + .filter(AutoProcProgram.autoProcProgramId == parameters.program_id) + .scalar() + ) - resolution = 2.2 # TODO - get resolution from parameters or from ispyb + if not resolution: + self.log.info( + f"Skipping strategy trigger: no resolution estimate found for dcid={parameters.dcid} auto_proc_program_id={parameters.program_id}" + ) + return {"success": True} jp = self.ispyb.mx_processing.get_job_params() jp["comments"] = parameters.comment From bc865fe925129f03bf1ba0145b955ca5cab75aa6 Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Wed, 25 Feb 2026 12:02:10 +0000 Subject: [PATCH 5/7] Handle errors --- src/dlstbx/services/strategy.py | 112 ++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 50 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index e99d89a84..dc522960f 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -41,10 +41,29 @@ def get_wavelength_scale(wavelength: float, default_wavelength: float) -> float: return (default_wavelength / wavelength) ** 2 -def parse_agamemnon_recipe(recipe_path: Path) -> list[dict]: +class AgamemnonParameters(BaseModel): + chi: float + comment: str + exposure_time: float = Field(gt=0) + dose: float = Field(gt=0) + kappa: float + number_of_images: int = Field(gt=0) + omega_increment: float = Field(gt=0) + omega_overlap: float + omega_start: float + phi_increment: float + phi_overlap: float + phi_start: float + scan_axis: str + transmission: float = Field(gt=0) + two_theta: float + wavelength: float = Field(gt=0) + + +def parse_agamemnon_recipe(recipe_path: Path) -> list[AgamemnonParameters]: with open(recipe_path, "r") as f: recipe = yaml.safe_load(f) - return recipe + return [AgamemnonParameters(**step) for step in recipe] def parse_config_file(config_file: Path) -> dict: @@ -100,25 +119,6 @@ def parse_config_file(config_file: Path) -> dict: ] -class AgamemnonParameters(BaseModel): - chi: float - comment: str - exposure_time: float = Field(gt=0) - dose: float = Field(gt=0) - kappa: float - number_of_images: int = Field(gt=0) - omega_increment: float = Field(gt=0) - omega_overlap: float - omega_start: float - phi_increment: float - phi_overlap: float - phi_start: float - scan_axis: str - transmission: float = Field(gt=0) - two_theta: float - wavelength: float = Field(gt=0) - - class DLSStrategy(CommonService): """Service for creating data collection strategies.""" @@ -139,11 +139,25 @@ def initializing(self): log_extender=self.extend_log, ) + def failure( + self, rw: workflows.recipe.RecipeWrapper, message: str, transaction: int + ): + """Handle failure by sending a message to ISPyB via 'failure' output to log the failure.""" + rw.send_to( + "failure", + { + "message": f"{message}", + }, + transaction=transaction, + ) + self._transport.transaction_commit(transaction) + def generate_strategy( self, rw: workflows.recipe.RecipeWrapper, header: dict, message: dict ): """Generate a strategy from the results of an upstream pipeline""" self.log.info("Received strategy request, generating strategy") + parameters = ChainMapWithReplacement( message.get("parameters", {}) if isinstance(message, dict) else {}, rw.recipe_step["parameters"].get("ispyb_parameters", {}), @@ -171,16 +185,17 @@ def generate_strategy( ) resolution = max((resolution_estimate) - 0.5, 0.9) - # beamline_limits = { - # "exposure_time": (None, None), - # "transmission": (0.0001, 1.0) - # } - - beamline_config = parse_config_file( - Path( - f"/dls_sw/{beamline}/software/daq_configuration/domain/domain.properties" - ) + beamline_config_file = Path( + f"/dls_sw/{beamline}/software/daq_configuration/domain/domain.properties" ) + if not beamline_config_file.is_file(): + self.log.error( + f"Beamline configuration file {beamline_config_file} not found, terminating strategy generation" + ) + self.failure(rw, "Beamline configuration file not found", txn) + return + beamline_config = parse_config_file(beamline_config_file) + # TODO - Refactor these monstrocities transmission_limits = ( float(beamline_config.get("gda.mx.udc.minTransmission", 0.0)), @@ -203,20 +218,23 @@ def generate_strategy( ), ) - # for mapping in LIMITS_MAPPINGS_LIST: - # parameter_name, keys = mapping - # for index, key in enumerate(keys): - # if key in beamline_config: - # beamline_limits[parameter_name][index] = float(beamline_config[key]) - # self.log.debug(f"Limits for {parameter_name} set to {beamline_limits[parameter_name]} from keys {keys}") - - recipes = ("OSC.yaml", "Ligand binding.yaml") - recipe_aliases = {"OSC.yaml": "OSC", "Ligand binding.yaml": "Ligand"} + recipes = {"OSC.yaml": "OSC", "Ligand binding.yaml": "Ligand"} ispyb_command_list = [] - for recipe in recipes: - recipe_path = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes/{recipe}") - recipe_steps = parse_agamemnon_recipe(recipe_path) + for recipe, recipe_alias in recipes.items(): + recipe_path = Path(f"/dls/tmp/dwe15129/agamemnon-recipes/{recipe}") + if not recipe_path.is_file(): + self.log.error( + f"Recipe file {recipe_path} not found, terminating strategy generation" + ) + self.failure(rw, f"Recipe file for '{recipe_alias}' not found", txn) + return + try: + recipe_steps = parse_agamemnon_recipe(recipe_path) + except ValidationError as e: + self.log.error(f"Invalid recipe step in {recipe_path}: {e}") + self.failure(rw, f"Invalid recipe step in '{recipe_alias}'", txn) + return # Step 1: Create screeningOutput record for recipe, linked to the screeningId # Keep the screeningOutputId @@ -232,7 +250,7 @@ def generate_strategy( # Step 2: Store screeningStrategy results, linked to the screeningOutputId # Keep the screeningStrategyId d = { - "program": f"udc-strategy: {recipe_aliases[recipe]}", + "program": f"udc-strategy: {recipe_alias}", "ispyb_command": "insert_screening_strategy", "screening_output_id": "$ispyb_screening_output_id", "store_result": "ispyb_screening_strategy_id", @@ -240,12 +258,6 @@ def generate_strategy( ispyb_command_list.append(d) for n_step, recipe_step in enumerate(recipe_steps, start=1): - try: - recipe_step = AgamemnonParameters(**recipe_step) - except ValidationError as e: - self.log.error(f"Invalid recipe step in {recipe_path}: {e}") - # TODO handle this error - Send a message to ISPyB to log the failure and skip the rest of the recipe steps. - break scale = 1.0 default_wavelength = recipe_step.wavelength scale *= get_wavelength_scale(wavelength, default_wavelength) @@ -291,7 +303,7 @@ def generate_strategy( "kappa": recipe_step.kappa, "wavelength": wavelength, "dosetotal": dose, - "comments": recipe_aliases[recipe], + "comments": recipe_alias, "ispyb_command": "insert_screening_strategy_wedge", "screening_strategy_id": "$ispyb_screening_strategy_id", "store_result": f"ispyb_screening_strategy_wedge_id_{n_step}", From ae639ff83f5fec1c004d66c4b9bc90c439805e35 Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Wed, 25 Feb 2026 12:17:56 +0000 Subject: [PATCH 6/7] Add check for beamlines --- src/dlstbx/services/trigger.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 70e9ef411..a10a04bf1 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -2806,6 +2806,12 @@ def trigger_strategy( ) return {"success": True} + if parameters.beamline not in ["i03", "i04"]: + self.log.info( + f"Skipping strategy trigger: beamline {parameters.beamline} not supported" + ) + return {"success": True} + # Get resolution estimate from ispyb records for upstream pipeline - returns None if not found. resolution = ( session.query(func.min(AutoProcScalingStatistics.resolutionLimitHigh)) From 8350807edd74f39a9d8c03ad47d697d93e32bff7 Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Wed, 25 Feb 2026 12:29:37 +0000 Subject: [PATCH 7/7] Refactoring --- src/dlstbx/services/strategy.py | 50 +++++++++++++-------------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index dc522960f..f53414c58 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -102,21 +102,16 @@ def parse_config_file(config_file: Path) -> dict: return config -LimitMapping = tuple[str, tuple[str, str]] -LIMITS_MAPPINGS_LIST: list[LimitMapping] = [ - ( - "exposure_time", - ("gda.exptTableModel.minImageTime", "gda.exptTableModel.maxImageTime"), - ), - ( - "exposure_time", - ("gda.mx.udc.minImageTime", "gda.mx.udc.maxImageTime"), - ), - ( - "transmission", - ("gda.mx.udc.minTransmission", "gda.mx.udc.maxTransmission"), - ), -] +def get_beamline_param( + config: dict, param_names: tuple[str, ...], default: float +) -> float: + """ + Get a beamline parameter from the config, trying multiple possible parameter names and returning the first one found, or a default value if none are found. + """ + for param_name in param_names: + if param_name in config: + return float(config[param_name]) + return default class DLSStrategy(CommonService): @@ -196,25 +191,20 @@ def generate_strategy( return beamline_config = parse_config_file(beamline_config_file) - # TODO - Refactor these monstrocities transmission_limits = ( - float(beamline_config.get("gda.mx.udc.minTransmission", 0.0)), - float(beamline_config.get("gda.mx.udc.maxTransmission", 1.0)), + get_beamline_param(beamline_config, ("gda.mx.udc.minTransmission",), 0.0), + get_beamline_param(beamline_config, ("gda.mx.udc.maxTransmission",), 1.0), ) exposure_time_limits = ( - float( - beamline_config.get( - "gda.mx.udc.minImageTime", - beamline_config.get("gda.exptTableModel.minImageTime", 0.0), - ) + get_beamline_param( + beamline_config, + ("gda.mx.udc.minExposureTime", "gda.exptTableModel.minExposureTime"), + 0.0, ), - float( - beamline_config.get( - "gda.mx.udc.maxImageTime", - beamline_config.get( - "gda.exptTableModel.maxImageTime", float("inf") - ), - ) + get_beamline_param( + beamline_config, + ("gda.mx.udc.maxExposureTime", "gda.exptTableModel.maxExposureTime"), + float("inf"), ), )