import datetime
import logging
import numbers
import os
from itertools import zip_longest
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import Union
import h5py
logger = logging.getLogger(__name__)
[docs]
def mca_data_to_spec_string(
mca: Sequence[float],
title: Optional[str] = None,
filename: Optional[str] = None,
date: Optional[Union[str, datetime.datetime, datetime.date]] = None,
calibration: Optional[Sequence[float]] = None,
detector_name: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> str:
"""Official file format specs: https://certif.com/downloads/css_docs/spec_man.pdf
:param mca: MCA spectrum
:param title: scan title
:param filename: name by which the file will be created
:param date: start date of the scan
:param calibration: MCA calibration as the sequence [zero, gain, quad],
all of them optional so that
energy = zero + gain*channels + quad*channels^2
:param detector_name: name of the MCA detector
:param metadata: saved as comments
:returns: SPEC serialized string
"""
nchan_per_line = 16
nchan_tot = len(mca)
if title is None:
title = "ct"
if filename is None:
filename = "unspecified"
if detector_name is None:
detector_name = "MCA0"
date = spectimeformat(date)
calib = [0, 1, 0] # zero, gain, quad
if calibration:
if len(calibration) > 3:
raise ValueError("MCA calibration requires 3 only coefficients")
calib = [
p if p is not None else pdefault
for p, pdefault in zip_longest(calibration, calib)
]
header = [f"#F {filename}", f"#D {date}", "", f"#S {title}", f"#D {date}"]
if metadata:
for k, v in metadata.items():
header.append(f"#C {k} = {v}")
header.append("#N 1")
header.append(f"#@MCA {nchan_per_line}C")
header.append(f"#@CHANN {nchan_tot} 0 {nchan_tot-1} 1")
header.append(f"#@CALIB {' '.join(map(str, calib))}")
header.append("#@MCA_NB 1")
header.append(f"#L {detector_name}")
mcastring = "\n".join(header)
mcastring += "\n@A"
if isinstance(mca[0], numbers.Integral):
fmt = " %d"
else:
# fmt = " %.4f"
fmt = " %.8g"
for idx in range(0, nchan_tot, nchan_per_line):
if idx + nchan_per_line - 1 < nchan_tot:
for i in range(0, nchan_per_line):
mcastring += fmt % mca[idx + i]
if idx + nchan_per_line != nchan_tot:
mcastring += "\\"
else:
for i in range(idx, nchan_tot):
mcastring += fmt % mca[i]
mcastring += "\n"
return mcastring
[docs]
def save_as_spec(filename: str, mca: Sequence[float], **kwargs) -> None:
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
content = mca_data_to_spec_string(mca, filename=filename, **kwargs)
with open(filename, "w") as f:
f.write(content)
[docs]
class SpecScanData(NamedTuple):
scan_name: str
scan_title: str
start_time: str
scan_data: Dict[str, list]
n_points: int
@property
def scan_number(self) -> int:
return int(float(self.scan_name))
@property
def spec_title(self) -> str:
return f"#S {self.scan_number} {self.scan_title}"
[docs]
def scan_data_to_spec_string(scan: SpecScanData) -> str:
lines = []
lines.append(scan.spec_title)
lines.append(f"#D {scan.start_time}")
lines.append(f"#T {scan.scan_data['Seconds'][0]} (Seconds)")
keys = list(scan.scan_data)
lines.append("#L " + " ".join(keys))
for k in range(scan.n_points):
row = [str(scan.scan_data[name][k]) for name in keys]
lines.append(" ".join(row) + "")
lines.append("#C")
lines.append("")
lines.append("")
return "\n".join(lines)
[docs]
def save_as_spec2(filename: str, scans: List[SpecScanData]) -> str:
"""
Write the extracted scan data into an ASCII file in SPEC format.
"""
filename = os.path.abspath(filename)
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
existing_content = ""
if os.path.exists(filename):
with open(filename, "r") as f:
existing_content = f.read()
else:
with open(filename, "a") as f:
date = spectimeformat(None)
f.write(f"#F {filename}\n")
f.write(f"#D {date}\n")
f.write("\n")
with open(filename, "a") as f:
for scan in scans:
if scan.n_points == 0:
continue
if scan.spec_title in existing_content:
# Scan was already saved
continue
content = scan_data_to_spec_string(scan)
f.write(content)
return filename
[docs]
def asstr(data: Union[str, bytes]) -> Any:
if isinstance(data, bytes):
return data.decode()
return data