Source code for ewoksid14.timepix4.t4_hist

from typing import Optional
from typing import Tuple
from typing import Union

import numpy
import polars

from . import t4_camera
from . import t4_plutils
from . import t4_profile


[docs] def hist_2d(events: Union[polars.DataFrame, polars.LazyFrame]): """ Returns array (n_rows, n_cols). """ # Count hits per [y, x] pixel counts = t4_plutils.collect( events.filter(t4_camera.is_photon_pixels) .group_by(["y", "x"]) .agg(count=polars.count()) ) # Add counts to histogram pixels h2d = numpy.zeros((t4_camera.n_rows, t4_camera.n_cols), dtype=int) y = counts["y"].cast(polars.Int32).to_numpy() x = counts["x"].cast(polars.Int32).to_numpy() h2d[y, x] = counts["count"].to_numpy() return h2d
[docs] def hist_3d( events: Union[polars.DataFrame, polars.LazyFrame], n_toa: Optional[int] = None, toa_min: Optional[float] = None, toa_max: Optional[float] = None, toa_bin_size: Optional[float] = None, relative_toa: bool = True, ) -> Tuple[numpy.ndarray, numpy.ndarray]: """ Returns array (n_rows, n_cols, n_toa) and TOA edges. """ if not ((toa_bin_size is None) ^ (n_toa is None)): raise ValueError("Provide either n_toa or toa_bin_size") toa_col = "rel_toa_ns" if relative_toa else "toa48_ns" # Compute TOA range if toa_min is None: toa_min = events.select(polars.col(toa_col).min()) toa_min = t4_plutils.collect(toa_min).item() if toa_max is None: toa_max = events.select(polars.col(toa_col).max()) toa_max = t4_plutils.collect(toa_max).item() if toa_min is None or toa_max is None or toa_min >= toa_max: raise ValueError(f"Invalid TOA range [{toa_min}, {toa_max}]") # Compute TOA bin edges if toa_bin_size is None: toa_edges = numpy.linspace(toa_min, toa_max, n_toa + 1) n_toa = len(toa_edges) - 1 toa_bin_size = toa_edges[1] - toa_edges[0] else: n_toa = int(numpy.floor((toa_max - toa_min) / toa_bin_size)) toa_edges = toa_min + numpy.arange(n_toa + 1) * toa_bin_size toa_max = toa_edges[-1] print( f"3D histogram with {toa_col!r} {toa_min} -> {toa_max} ns in {n_toa} bins of {toa_bin_size:.2f} ns" ) # Add TOA bin index for each event # TODO: sometimes there are a few very high values so Int32 is not enough for casting. toa_bin = ( ((polars.col(toa_col) - toa_min) / toa_bin_size).floor().cast(polars.Int64) ) within_toa_range = (polars.col("toa_bin") >= 0) & (polars.col("toa_bin") < n_toa) events = events.with_columns(toa_bin=toa_bin).filter(within_toa_range) # Count hits per [y, x, TOA] voxel counts = t4_plutils.collect( events.filter(t4_camera.is_photon_pixels) .group_by(["y", "x", "toa_bin"]) .agg(count=polars.count()) ) # Add counts to histogram voxels h3d = numpy.zeros((t4_camera.n_rows, t4_camera.n_cols, n_toa), dtype=int) y = counts["y"].cast(polars.Int32).to_numpy() x = counts["x"].cast(polars.Int32).to_numpy() toa = counts["toa_bin"].cast(polars.Int32).to_numpy() h3d[y, x, toa] = counts["count"].to_numpy() t4_profile.print_ndarray_stats(h3d) return h3d, toa_edges