Source code for ewoksid14.tasks.hdf5_to_spec
import os
import logging
from typing import Optional, Sequence, Any, Union
from ewokscore.task import Task
from blissdata.h5api import dynamic_hdf5
from .io import save_as_spec
logger = logging.getLogger(__name__)
[docs]
class Hdf5ToSpec(
Task,
input_names=["filename", "output_filename"],
optional_input_names=[
"scan_numbers",
"retry_timeout",
"retry_period",
"mca_counter",
"mca_calibration",
],
output_names=["output_filenames"],
):
[docs]
def run(self):
filename: str = self.inputs.filename
scan_numbers: Optional[Sequence[int]] = self.get_input_value(
"scan_numbers", None
)
mca_counter: str = self.get_input_value("mca_counter", "Counts")
retry_timeout: Optional[float] = self.get_input_value("retry_timeout", None)
retry_period: Optional[float] = self.get_input_value("retry_period", 1)
mca_calibration: Optional[float] = self.get_input_value("mca_calibration", None)
output_filename: str = self.inputs.output_filename
dirname = os.path.dirname(output_filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
output_filename, ext = os.path.splitext(output_filename)
output_filenames = list()
failed_scans = list()
with dynamic_hdf5.File(
filename, retry_timeout=retry_timeout, retry_period=retry_period
) as nxroot:
if scan_numbers:
scan_names = [f"{scannr}.1" for scannr in scan_numbers]
else:
scan_names = nxroot
for scan_name in scan_names:
scan_number = int(float(scan_name))
try:
title = _asstr(nxroot[f"/{scan_name}/title"][()])
start_time = _asstr(nxroot[f"/{scan_name}/start_time"][()])
end_time = _asstr(nxroot[f"/{scan_name}/end_time"][()])
mca = nxroot[f"/{scan_name}/measurement/{mca_counter}"][-1]
except Exception as e:
failed_scans.append(scan_number)
logger.error(
"Processing scan %s::/%s failed (%s)", filename, scan_name, e
)
continue
scan_output_filename = f"{output_filename}_{scan_number:02d}{ext}"
metadata = {"Finished": end_time}
save_as_spec(
scan_output_filename,
mca,
title=f"{scan_number} {title}",
date=start_time,
detector_name=mca_counter,
metadata=metadata,
calibration=mca_calibration,
)
output_filenames.append(scan_output_filename)
if failed_scans:
raise RuntimeError(f"Failed scans (see logs why): {failed_scans}")
self.outputs.output_filenames = output_filenames
def _asstr(data: Union[str, bytes]) -> Any:
if isinstance(data, bytes):
return data.decode()
return data