Source code for ewoksid14.timepix4.t4_hist
from typing import Optional
from typing import Tuple
from typing import Union
import numpy
import polars
from . import t4_camera
from . import t4_plutils
from . import t4_profile
[docs]
def hist_2d(events: Union[polars.DataFrame, polars.LazyFrame]):
"""
Returns array (n_rows, n_cols).
"""
# Count hits per [y, x] pixel
counts = t4_plutils.collect(
events.filter(t4_camera.is_photon_pixels)
.group_by(["y", "x"])
.agg(count=polars.count())
)
# Add counts to histogram pixels
h2d = numpy.zeros((t4_camera.n_rows, t4_camera.n_cols), dtype=int)
y = counts["y"].cast(polars.Int32).to_numpy()
x = counts["x"].cast(polars.Int32).to_numpy()
h2d[y, x] = counts["count"].to_numpy()
return h2d
[docs]
def hist_3d(
events: Union[polars.DataFrame, polars.LazyFrame],
n_toa: Optional[int] = None,
toa_min: Optional[float] = None,
toa_max: Optional[float] = None,
toa_bin_size: Optional[float] = None,
relative_toa: bool = True,
) -> Tuple[numpy.ndarray, numpy.ndarray]:
"""
Returns array (n_rows, n_cols, n_toa) and TOA edges.
"""
if not ((toa_bin_size is None) ^ (n_toa is None)):
raise ValueError("Provide either n_toa or toa_bin_size")
toa_col = "rel_toa_ns" if relative_toa else "toa48_ns"
# Compute TOA range
if toa_min is None:
toa_min = events.select(polars.col(toa_col).min())
toa_min = t4_plutils.collect(toa_min).item()
if toa_max is None:
toa_max = events.select(polars.col(toa_col).max())
toa_max = t4_plutils.collect(toa_max).item()
if toa_min is None or toa_max is None or toa_min >= toa_max:
raise ValueError(f"Invalid TOA range [{toa_min}, {toa_max}]")
# Compute TOA bin edges
if toa_bin_size is None:
toa_edges = numpy.linspace(toa_min, toa_max, n_toa + 1)
n_toa = len(toa_edges) - 1
toa_bin_size = toa_edges[1] - toa_edges[0]
else:
n_toa = int(numpy.floor((toa_max - toa_min) / toa_bin_size))
toa_edges = toa_min + numpy.arange(n_toa + 1) * toa_bin_size
toa_max = toa_edges[-1]
print(
f"3D histogram with {toa_col!r} {toa_min} -> {toa_max} ns in {n_toa} bins of {toa_bin_size:.2f} ns"
)
# Add TOA bin index for each event
# TODO: sometimes there are a few very high values so Int32 is not enough for casting.
toa_bin = (
((polars.col(toa_col) - toa_min) / toa_bin_size).floor().cast(polars.Int64)
)
within_toa_range = (polars.col("toa_bin") >= 0) & (polars.col("toa_bin") < n_toa)
events = events.with_columns(toa_bin=toa_bin).filter(within_toa_range)
# Count hits per [y, x, TOA] voxel
counts = t4_plutils.collect(
events.filter(t4_camera.is_photon_pixels)
.group_by(["y", "x", "toa_bin"])
.agg(count=polars.count())
)
# Add counts to histogram voxels
h3d = numpy.zeros((t4_camera.n_rows, t4_camera.n_cols, n_toa), dtype=int)
y = counts["y"].cast(polars.Int32).to_numpy()
x = counts["x"].cast(polars.Int32).to_numpy()
toa = counts["toa_bin"].cast(polars.Int32).to_numpy()
h3d[y, x, toa] = counts["count"].to_numpy()
t4_profile.print_ndarray_stats(h3d)
return h3d, toa_edges