Source code for ompy.unfolder

import numpy as np
import pandas
import logging
import warnings
from typing import Optional, Tuple, Callable
import termtables as tt
from scipy.ndimage import gaussian_filter1d
from copy import copy
from .gauss_smoothing import gauss_smoothing_matrix_1D
from .library import div0, i_from_E
from .matrix import Matrix
from .matrixstate import MatrixState
from .rebin import rebin_1D


LOG = logging.getLogger(__name__)
logging.captureWarnings(True)


[docs]class Unfolder: """Performs Guttormsen unfolding The algorithm is roughly as follows: Define matrix Rij as the response in channel i when the detector is hit by γ-rays with energy corresponding to channel j. Each response function is normalized by Σi Rij = 1. The folding is then:: f = Ru where f is the folded spectrum and u the unfolded. If we obtain better and better trial spectra u, we can fold and compare them to the observed spectrum r. As an initial guess, the trial function is u⁰ = r, and the subsequent being:: uⁱ = uⁱ⁻¹ + (r - fⁱ⁻¹) until fⁱ≈r. Note that no actions are performed on the matrices; they must already be cut into the desired size. Attributes: num_iter (int, optional): The number of iterations to perform. defaults to 200. The best iteration is then selected based on the `score` method R (Matrix): The response matrix weight_fluctuation (float, optional): A attempt to penalize fluctuations. Defaults to 1e-3 minimum_iterations (int, optional): Minimum number of iterations. Defaults to 5. use_compton_subtraction (bool, optional): Set usage of Compton subtraction method. Defaults to `True`. response_tab (DataFrame, optional): If `use_compton_subtraction=True` a table with information ('E', 'eff_tot', ...) must be provided. FWHM_tweak_multiplier (Dict, optional): See compton subtraction method Necessary keys: `["fe", "se", "de", "511"]`. Magne's suggestion:: FWHM_tweak_multiplier = {"fe": 1., "se": 1.1, "de": 1.3, "511": 0.9} iscores (np.ndarray, optional): Selected iteration number in the unfolding. The unfolding and selection is performed independently for each `Ex` row, thus this is an array with `len(raw.Ex)`. TODO: - Unfolding for a single spectrum (currently needs to be mocked as a Matrix). """ def __init__(self, num_iter: int = 200, response: Optional[Matrix] = None): """Unfolds the gamma-detector response of a spectrum Args: num_iter: see above reponse: see above """ self.num_iter = num_iter self.weight_fluctuation: float = 1e-3 self.minimum_iterations: int = 5 self._R: Optional[Matrix] = response self.raw: Optional[Matrix] = None self.r: Optional[np.ndarray] = None self.use_compton_subtraction: bool = True self.response_tab: Optional[pandas.DataFrame] = None self.FWHM_tweak_multiplier: Optional[dict[str, float]] = None self.iscores: Optional[np.ndarray] = None
[docs] def __call__(self, matrix: Matrix) -> Matrix: """ Wrapper for `self.apply()` """ return self.apply(matrix)
[docs] def update_values(self): """Verify internal consistency and set default values Raises: ValueError: If the raw matrix and response matrix have different calibrations. """ # Ensure that the given matrix is in fact raw if self.raw.state != MatrixState.RAW: warnings.warn("Trying to unfold matrix that is not raw") assert self.R is not None, "Response R must be set" assert self.R.shape[0] == self.R.shape[1],\ "Response R must be a square matrix" LOG.debug("Comparing calibration of raw against response") if len(self.raw.Eg) != len(self.R.Eg): raise ValueError("Must have equal number of energy bins.") if not np.allclose(self.raw.Eg, self.R.Eg): raise ValueError("Must have equal energy binning.") LOG.debug("Check for negative counts.") if np.any(self.raw.values < 0) or np.any(self.R.values < 0): raise ValueError("Raw and response cannot have negative counts." "Consider using fill_negatives and " "remove_negatives on the input matixes.") self.r = self.raw.values
[docs] def apply(self, raw: Matrix, response: Optional[Matrix] = None) -> Matrix: """ Run unfolding. Selected iteration for each `Ex` row can be obtained via `self.iscores`, see above. Args: raw (Matrix): Raw matrix to unfold response (Matrix, optional): Response matrix. Defaults to the matrix provided in initialization. Returns: Matrix: Unfolded matrix TODO: - Use better criteria for terminating """ if response is not None: self.R = response self.raw = copy(raw) # Set up the arrays self.update_values() unfolded_cube = np.zeros((self.num_iter, *self.r.shape)) chisquare = np.zeros((self.num_iter, self.r.shape[0])) fluctuations = np.ones((self.num_iter, self.r.shape[0])) fluctuations /= self.fluctuations(self.r) folded = np.zeros_like(self.r) # Use u⁰ = r as initial guess unfolded = self.r for i in range(self.num_iter): unfolded, folded = self.step(unfolded, folded, i) unfolded_cube[i, :, :] = unfolded chisquare[i, :] = self.chi_square(folded) fluctuations[i, :] *= self.fluctuations(unfolded) if LOG.level >= logging.DEBUG: chisq = np.mean(chisquare[i, :]) fluct = np.nanmean(fluctuations[i, :]) LOG.debug(f"Iteration {i}: χ² {chisq:.2e}; Fluct: {fluct:.2e}") # Score the solutions based on χ² value for each Ex bin # and select the best one. iscores = self.score(chisquare, fluctuations) self.iscores = iscores # keep if interesting for later unfolded = np.zeros_like(self.r) for iEx in range(self.r.shape[0]): unfolded[iEx, :] = unfolded_cube[iscores[iEx], iEx, :] if LOG.level >= logging.DEBUG: print_array = np.column_stack((np.arange(len(self.raw.Ex)), self.raw.Ex.astype(int), iscores)) LOG.debug("Selecting following iterations: \n%s", tt.to_string(print_array, header=('i', 'Ex', 'iteration')) ) if self.use_compton_subtraction: unfolded = self.compton_subtraction(unfolded) unfolded = Matrix(unfolded, Eg=self.raw.Eg, Ex=self.raw.Ex) unfolded.state = "unfolded" self.remove_negative(unfolded) return unfolded
[docs] def step(self, unfolded: np.ndarray, folded: np.ndarray, step: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Perform a single step of Guttormsen unfolding .. parsed-literal:: Performs the steps u = u + (r - f), for step > 0 f = uR Args: unfolded (np.ndarray): unfolded spectrum of iteration `step-1` folded (np.ndarray): folded spectrum of of iteration `step-1` step (np.ndarray): Iteration number Returns: Arrays with counts for the unfolded and folded matrix (unfolded, folded) """ if step > 0: # since the initial guess is the raw spectrum unfolded = unfolded + (self.r - folded) folded = unfolded@self.R.values return unfolded, folded
[docs] def chi_square(self, folded: np.ndarray) -> np.ndarray: """ Compute Χ² of the folded spectrum Uses the familiar Χ² = Σᵢ (fᵢ-rᵢ)²/rᵢ """ chi2 = div0(np.power(folded - self.r, 2), np.where(self.r > 0, self.r, 0)).sum(axis=1) return chi2
[docs] def fluctuations(self, counts: np.ndarray, sigma: float = 50) -> np.ndarray: """ Calculates fluctuations in each Ex bin gamma spectrum S by summing the relative diff between the spectrum and a smoothed version of it. ∑ | (S - ⟨S⟩) / ⟨S⟩ | Args: counts (np.ndarray): counts of spectrum to act on (calibration from `self.raw`). sigma (float, optional): width of the gaussian for the smoothing. Defaults to 20 Returns: np.ndarray: column vector of fluctuations in each Ex bin """ a1 = self.raw.Eg[1] - self.raw.Eg[0] couns_smoothed = gaussian_filter1d(counts, sigma=sigma / a1, axis=1) fluctuations = div0(couns_smoothed - counts, couns_smoothed) fluctuations = np.sum(np.abs(fluctuations), axis=1) return fluctuations
[docs] def score(self, chisquare: np.ndarray, fluctuations: np.ndarray) -> np.ndarray: """ Calculates the score of each unfolding iteration for each Ex bin based on a weighting of chisquare and fluctuations. Args: chisquare (np.ndarray): chisquare between raw and unfolded fluctuations (np.ndarray): measure of fluctuations (to be penalized) Returns: score (the lower the better) """ # Check that it's consistent with chosen max number of iterations: if self.minimum_iterations > self.num_iter: self.minimum_iterations = self.num_iter score_matrix = ((1 - self.weight_fluctuation) * chisquare + self.weight_fluctuation * fluctuations) # Get index of best (lowest) score for each Ex bin: best_iteration = np.argmin(score_matrix, axis=0) # Enforce minimum_iterations: best_iteration = np.where( self.minimum_iterations > best_iteration, self.minimum_iterations * np.ones(len(best_iteration), dtype=int), best_iteration) return best_iteration
[docs] def fold(self, matrix: Matrix) -> Matrix: """ Folds a matrix with the loaded response Args: matrix (Matrix): Input matrix Returns: folded (Matrix): Folded matrix """ assert self.R is not None, "Need to load response first" folded = matrix.copy() folded.values = matrix.values@self.R.values return folded
@property def R(self) -> Matrix: return self._R @R.setter def R(self, response: Matrix) -> None: # TODO Make setable self._R = response
[docs] def compton_subtraction(self, unfolded: np.ndarray) -> np.ndarray: """ Compton Subtraction Method in Unfolding of Guttormsen et al (NIM 1996) Args: unfolded (ndarray): unfolded spectrum Returns: unfolded spectrum, with compton subtraction applied We follow the notation of Guttormsen et al (NIM 1996) in what follows. `u0` is the unfolded spectrum from above, `r` is the raw spectrum, .. parsed-literal:: w = us + ud + ua is the folding contributions from everything except Compton, i.e. us = single escape, ua = double escape, ua = annihilation (511). `v = pf*u0 + w == uf + w` is the estimated "raw minus Compton" spectrum `c` is the estimated Compton spectrum. Note: - The tweaking of the FWHM ("facFWHM" in Mama) has been delegated to the creation of the response matrix. If one wants to unfold with, say, 1/10 of the "real" FWHM, this this should be provided as input here already. - We apply smoothing to the different peak structures as described in the article. However, you may also "tweak" the FWHMs per peak for something Magne thinks is a better result. TODO: - When we unfolding with a reduced FWHM, should the compton method still work with the actual fhwm? """ LOG.debug("Applying Compton subtraction method") if self.response_tab is None: raise ValueError("`response_tab` needs to be set for this method") tab = self.response_tab assert (tab.E == self.R.Eg).all(), \ "Energies of response table have to match the Eg's"\ "of the response matrix." FWHM = tab.fwhm_abs.values eff = tab.eff_tot.values pf = tab.pFE.values ps = tab.pSE.values pd = tab.pDE.values pa = tab.p511.values keys_needed = ["fe", "se", "de", "511"] if self.FWHM_tweak_multiplier is None: FWHM_tweak = dict() FWHM_tweak["fe"] = 1 FWHM_tweak["se"] = 1 FWHM_tweak["de"] = 1 FWHM_tweak["511"] = 1 else: if all(key in self.FWHM_tweak_multiplier for key in keys_needed): FWHM_tweak = self.FWHM_tweak_multiplier else: raise ValueError("FWHM_tweak_multiplier needs to contain each" "of this keys: {}".format(keys_needed)) r = self.raw.values u0 = unfolded Eg = tab.E.values # Full-energy, smoothing but no shift: uf = pf * u0 uf = gauss_smoothing_matrix_1D(uf, Eg, 0.5*FWHM*FWHM_tweak["fe"]) # Single escape, smoothing and shift: us = ps * u0 us = gauss_smoothing_matrix_1D(us, Eg, 0.5*FWHM*FWHM_tweak["se"]) us = shift_matrix(us, Eg, energy_shift=-511) # Double escape, smoothing and shift: ud = pd * u0 ud = gauss_smoothing_matrix_1D(ud, Eg, 0.5*FWHM*FWHM_tweak["de"]) ud = shift_matrix(ud, Eg, energy_shift=-1024) # 511, smoothing, but no shift: ua = np.zeros(u0.shape) i511 = i_from_E(511, Eg) ua[:, i511] = np.sum(pa * u0, axis=1) ua = gauss_smoothing_matrix_1D(ua, Eg, 1.0*FWHM*FWHM_tweak["511"]) # Put it all together: w = us + ud + ua v = uf + w c = r - v # Smoothe the Compton part, which is the main trick: c = gauss_smoothing_matrix_1D(c, Eg, 1.0*FWHM) # Channel 0 is missing from resp.dat # Add Ex channel to array, also correcting for efficiency. # u = div0((r - c - w), np.append(0, pf)) u = div0((r - c - w), pf) unfolded = div0(u, eff) return unfolded
[docs] def remove_negative(self, matrix: Matrix): """ Wrapper for Matrix.remove_negative() Put in as an extra method to facilitate replacing this by eg. `fill_and_remove_negatve` Args: matrix: Input matrix """ matrix.remove_negative()
def shift(counts_in, E_array_in, energy_shift): """ Shift the counts_in array by amount energy_shift. The function is actually a wrapper for the rebin() function that "fakes" the input energy calibration to give a shift. It is similar to the rebin_and_shift() function defined above, but even simpler. Args: counts_in (numpy array, float): Array of counts E_array_in (numpy array, float): Energies of input counts energy_shift (float): Amount to shift the counts by. Negative means shift to lower energies. Default is 0. """ E_array_in_shifted = E_array_in + energy_shift counts_out = rebin_1D(counts_in, E_array_in_shifted, E_array_in) return counts_out def shift_matrix(counts_in_matrix, E_array_in, energy_shift): """ Function which takes a matrix of counts and shifts it along axis 1. """ counts_out_matrix = np.zeros(counts_in_matrix.shape) for i in range(counts_in_matrix.shape[0]): counts_out_matrix[i, :] = shift(counts_in_matrix[i, :], E_array_in, energy_shift=energy_shift) return counts_out_matrix