Source code for ewoksid14.tasks.hdf5_to_spec

import logging
import os
from typing import Any
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union

from blissdata.h5api import dynamic_hdf5
from ewokscore.task import Task

from ..io.spec import save_as_spec
from ..io.spec import spectimeformat

logger = logging.getLogger(__name__)


[docs] class Hdf5ToSpec( Task, input_names=["filename", "output_filename"], optional_input_names=[ "scan_numbers", "retry_timeout", "retry_period", "mca_counter", "mca_calibration", ], output_names=["output_filenames"], ): """Convert Bliss HDF5 scans to SPEC files, one file per scan and only MCA data."""
[docs] def run(self): filename: str = self.inputs.filename scan_numbers: Optional[Sequence[int]] = self.get_input_value( "scan_numbers", None ) mca_counter: str = self.get_input_value("mca_counter", "Counts") retry_timeout: Optional[float] = self.get_input_value("retry_timeout", None) retry_period: Optional[float] = self.get_input_value("retry_period", 1) mca_calibration: Optional[float] = self.get_input_value("mca_calibration", None) output_filename: str = self.inputs.output_filename dirname = os.path.dirname(output_filename) if dirname: os.makedirs(dirname, exist_ok=True) output_filename, ext = os.path.splitext(output_filename) output_filenames = list() failed_scans = list() with dynamic_hdf5.File( filename, retry_timeout=retry_timeout, retry_period=retry_period ) as nxroot: if scan_numbers: scan_names = [f"{scannr}.1" for scannr in scan_numbers] else: scan_names = nxroot for scan_name in scan_names: scan_number = int(float(scan_name)) # Required data try: title = _asstr(nxroot[f"/{scan_name}/title"][()]) start_time = _asstr(nxroot[f"/{scan_name}/start_time"][()]) end_time = _asstr(nxroot[f"/{scan_name}/end_time"][()]) mca = nxroot[f"/{scan_name}/measurement/{mca_counter}"][-1] except Exception as e: failed_scans.append(scan_number) logger.error( "Processing scan %s::/%s failed (%s)", filename, scan_name, e ) continue # Optional calibration if not mca_calibration: names = ["a", "b", "c"] mca_calibration = _read_mca_parameters( nxroot, f"/{scan_name}/instrument/calibration", names, scan_name, filename, defaults=[0, 1, 0], label="calibration coefficient", ) # Optional count times names = ["preset_time", "live_time", "real_time"] mca_count_times = _read_mca_parameters( nxroot, f"/{scan_name}/instrument/acqtime", names, scan_name, filename, defaults=[float("nan")] * 3, label="count time", ) # Save SPEC file scan_output_filename = f"{output_filename}_{scan_number:02d}{ext}" metadata = {"Finished": spectimeformat(end_time)} save_as_spec( scan_output_filename, mca, title=f"{scan_number} {title}", date=start_time, detector_name=mca_counter, metadata=metadata, calibration=mca_calibration, count_times=mca_count_times, ) output_filenames.append(scan_output_filename) if failed_scans: raise RuntimeError(f"Failed scans (see logs why): {failed_scans}") self.outputs.output_filenames = output_filenames
def _asstr(data: Union[str, bytes]) -> Any: if isinstance(data, bytes): return data.decode() return data def _read_mca_parameters( nxroot: Any, base_path: str, names: List[str], scan_name: str, filename: str, defaults: List[Any], label: str, ) -> List[float]: if len(names) != len(defaults): raise ValueError("The length of 'names' and 'defaults' must be the same") values = defaults for i, name in enumerate(names): try: values[i] = nxroot[f"{base_path}/{name}"][()] except Exception as e: logger.warning( "Cannot read MCA %s %r from scan %s::/%s (%s)", label, name, filename, scan_name, e, ) return values