import logging
import os
from typing import Any
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union
from blissdata.h5api import dynamic_hdf5
from ewokscore.task import Task
from ..io.spec import save_as_spec
from ..io.spec import spectimeformat
logger = logging.getLogger(__name__)
[docs]
class Hdf5ToSpec(
Task,
input_names=["filename", "output_filename"],
optional_input_names=[
"scan_numbers",
"retry_timeout",
"retry_period",
"mca_counter",
"mca_calibration",
],
output_names=["output_filenames"],
):
"""Convert Bliss HDF5 scans to SPEC files, one file per scan and only MCA data."""
[docs]
def run(self):
filename: str = self.inputs.filename
scan_numbers: Optional[Sequence[int]] = self.get_input_value(
"scan_numbers", None
)
mca_counter: str = self.get_input_value("mca_counter", "Counts")
retry_timeout: Optional[float] = self.get_input_value("retry_timeout", None)
retry_period: Optional[float] = self.get_input_value("retry_period", 1)
mca_calibration: Optional[float] = self.get_input_value("mca_calibration", None)
output_filename: str = self.inputs.output_filename
dirname = os.path.dirname(output_filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
output_filename, ext = os.path.splitext(output_filename)
output_filenames = list()
failed_scans = list()
with dynamic_hdf5.File(
filename, retry_timeout=retry_timeout, retry_period=retry_period
) as nxroot:
if scan_numbers:
scan_names = [f"{scannr}.1" for scannr in scan_numbers]
else:
scan_names = nxroot
for scan_name in scan_names:
scan_number = int(float(scan_name))
# Required data
try:
title = _asstr(nxroot[f"/{scan_name}/title"][()])
start_time = _asstr(nxroot[f"/{scan_name}/start_time"][()])
end_time = _asstr(nxroot[f"/{scan_name}/end_time"][()])
mca = nxroot[f"/{scan_name}/measurement/{mca_counter}"][-1]
except Exception as e:
failed_scans.append(scan_number)
logger.error(
"Processing scan %s::/%s failed (%s)", filename, scan_name, e
)
continue
# Optional calibration
if not mca_calibration:
names = ["a", "b", "c"]
mca_calibration = _read_mca_parameters(
nxroot,
f"/{scan_name}/instrument/calibration",
names,
scan_name,
filename,
defaults=[0, 1, 0],
label="calibration coefficient",
)
# Optional count times
names = ["preset_time", "live_time", "real_time"]
mca_count_times = _read_mca_parameters(
nxroot,
f"/{scan_name}/instrument/acqtime",
names,
scan_name,
filename,
defaults=[float("nan")] * 3,
label="count time",
)
# Save SPEC file
scan_output_filename = f"{output_filename}_{scan_number:02d}{ext}"
metadata = {"Finished": spectimeformat(end_time)}
save_as_spec(
scan_output_filename,
mca,
title=f"{scan_number} {title}",
date=start_time,
detector_name=mca_counter,
metadata=metadata,
calibration=mca_calibration,
count_times=mca_count_times,
)
output_filenames.append(scan_output_filename)
if failed_scans:
raise RuntimeError(f"Failed scans (see logs why): {failed_scans}")
self.outputs.output_filenames = output_filenames
def _asstr(data: Union[str, bytes]) -> Any:
if isinstance(data, bytes):
return data.decode()
return data
def _read_mca_parameters(
nxroot: Any,
base_path: str,
names: List[str],
scan_name: str,
filename: str,
defaults: List[Any],
label: str,
) -> List[float]:
if len(names) != len(defaults):
raise ValueError("The length of 'names' and 'defaults' must be the same")
values = defaults
for i, name in enumerate(names):
try:
values[i] = nxroot[f"{base_path}/{name}"][()]
except Exception as e:
logger.warning(
"Cannot read MCA %s %r from scan %s::/%s (%s)",
label,
name,
filename,
scan_name,
e,
)
return values