import numpy as np
import logging
import matplotlib.pyplot as plt
import os
from pathos.multiprocessing import ProcessPool
from pathos.helpers import cpu_count
from itertools import repeat
from typing import Callable, Union, List, Optional, Any, Tuple
from pathlib import Path
from numpy import ndarray
from .matrix import Matrix
from .rebin import rebin_2D
from .action import Action
if 'JPY_PARENT_PID' in os.environ:
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
LOG = logging.getLogger(__name__)
logging.captureWarnings(True)
[docs]class Ensemble:
"""Generates perturbated matrices to estimate uncertainty
Note:
When adding any functionality that runs withing the parallelized loop
make sure to use the random generator exposed via the arguments. If
one uses np.random instead, this will be the same an exact copy for
each process.
Attributes:
raw (Matrix): The raw matrix used as model for the ensemble. If a
background is provided, this will initially be the the
"prompt+bg" matrix.
bg: Background matrix to subtract.
bg_ratio: Prompt is obtainied by `raw - bg_ratio * bg`. Defaults to 1.
path (str): The path used for saving the generated matrices.
unfolder (Unfolder): An instance of Unfolder which is used in
the unfolding of the generated matrices. Only has to support
obj.unfold(raw: Matrix).
first_generation_method (FirstGeneration): An instance of
FirstGeneration for applying the first generation method
to unfolded matrices. Only has to be callable
with (unfolded: Matrix)
regenerate (bool): If true, regenerate the matrices instead of
loading from `path`.
std_raw (Matrix): The computed standard deviation of the raw ensemble.
std_unfolded (Matrix): The computed standard deviation of the unfolded
ensemble.
std_firstgen (Matrix): The computed standard deviation of the
first generation method matrices.
firstgen_ensemble (np.ndarray): The entire firstgen ensemble.
action_raw (Action[Matrix]): An arbitrary action to apply to each
generated raw matrix. Defaults to NOP.
action_prompt_w_bg (Action[Matrix]): An arbitrary action to apply to
each generated prompt_w_bg matrix. Defaults to NOP.
action_bg (Action[Matrix]): An arbitrary action to apply to
each generated bg matrix. Defaults to NOP.
action_unfolded (Action[Matrix]): An arbitrary action to apply to each
generated unfolded matrix. Defaults to NOP.
action_firstgen (Action[Matrix]): An arbitrary action to apply to each
generated firstgen matrix. Defaults to NOP.
nprocesses (int): Number of processes for multiprocessing.
Defaults to number of available cpus-1 (with mimimum 1).
seed (int): Random seed for reproducibility of results
TODO:
- Separate each step - generation, unfolded, firstgening?
- (Re)generation with book keeping is a recurring pattern.
Try to abstract it away.
"""
def __init__(self, raw: Optional[Matrix] = None,
bg: Optional[Matrix] = None,
bg_ratio: float = 1,
path: Optional[Union[str, Path]] = 'saved_run/ensemble'):
""" Sets up attributes and loads a saved ensemble if provided.
Args:
raw: The model matrix to peturbate. If a background is provided,
this is the "prompt+bg" matrix.
bg: Background matrix to subtract.
bg_ratio: Prompt is obtainied by `raw - bg_ratio * bg`. Defaults to
1. This is the case for equal time gate length of `prompt+bg`
and `bg`.
path: The path where to save the ensemble. If set,
the ensemble will try to load from the path, but will
fail *silently* if it is unable to. It is recommended to call
load([path]) explicitly.
"""
self.raw: Optional[Matrix] = raw
self.bg: Optional[Matrix] = bg
self.bg_ratio: Optional[float] = bg_ratio
self.prompt_w_bg: Optional[Matrix] = raw
self.unfolder: Optional[Callable[[Matrix], Matrix]] = None
self.first_generation_method: \
Optional[Callable[[Matrix], Matrix]] = None
self.size = 0
self.regenerate = False
self.action_prompt_w_bg = Action('matrix')
self.action_bg = Action('matrix')
self.action_raw = Action('matrix')
self.action_unfolded = Action('matrix')
self.action_firstgen = Action('matrix')
self.std_raw: Optional[Matrix] = None
self.std_unfolded: Optional[Matrix] = None
self.std_firstgen: Optional[Matrix] = None
self.raw_ensemble: Optional[Matrix] = None
self.unfolded_ensemble: Optional[Matrix] = None
self.firstgen_ensemble: Optional[Matrix] = None
self.seed: int = 987654
self.nprocesses: int = cpu_count()-1 if cpu_count() > 1 else 1
if path is None:
self.path = None
else:
self.path = Path(path)
self.path.mkdir(exist_ok=True, parents=True)
self.raw.state = "raw"
[docs] def load(self, path: Optional[Union[str, Path]] = None):
""" Loads a saved ensamble. Alternative to `regenerate`.
Currently only supports '.npy' format.
Args:
path: The path to the ensemble directory. If the
value is None, 'self.path' will be used.
"""
path = Path(path) if path is not None else Path(self.path)
LOG.info(f"loading from {path}")
self.raw = Matrix(path=path / 'raw.npy')
self.firstgen = Matrix(path=path / 'firstgen.npy')
raws = list(path.glob("raw_[0-9]*.*"))
unfolds = list(path.glob("unfolded_[0-9]*.*"))
firsts = list(path.glob("firstgen_[0-9]*.*"))
assert len(raws) == len(unfolds) == len(firsts), "Corrupt ensemble"
assert len(raws) > 0, "Found no saved ensemble members"
self.size = len(raws)
self.raw_ensemble = np.empty((self.size, *self.raw.shape))
self.unfolded_ensemble = np.empty_like(self.raw_ensemble)
self.firstgen_ensemble = np.empty_like(self.raw_ensemble)
for i, raw in enumerate(raws):
self.raw_ensemble[i, :, :] = Matrix(path=raw).values
for i, unfolded in enumerate(unfolds):
self.unfolded_ensemble[i, :, :] = Matrix(path=unfolded).values
for i, firstgen in enumerate(firsts):
self.firstgen_ensemble[i, :, :] = Matrix(path=firstgen).values
self.std_raw = Matrix(path=path / 'raw_std.npy')
self.std_unfolded = Matrix(path=path / 'unfolded_std.npy')
self.std_firstgen = Matrix(path=path / 'firstgen_std.npy')
[docs] def generate(self, number: int, method: str = 'poisson',
regenerate: bool = False) -> None:
"""Generates an ensemble of matrices and estimates standard deviation
Perturbs the initial raw matrix using either a Gaussian or Poisson
process, unfolds them and applies the first generation method to them.
Uses the variation to estimate standard deviation of each step.
Args:
number: The number of perturbed matrices to generate.
method: The stochastic method to use to generate the perturbations
Can be 'gaussian' or 'poisson'.
regenerate: Whether to use already generated files (False) or
generate them all anew (True).
"""
assert self.raw is not None, "Set the raw matrix"
assert self.unfolder is not None, "Set unfolder"
assert self.first_generation_method is not None, \
"Set first generation method"
self.size = number
self.regenerate = regenerate
LOG.info(f"Start normalization with {self.nprocesses} cpus")
pool = ProcessPool(nodes=self.nprocesses)
ss = np.random.SeedSequence(self.seed)
iterator = pool.imap(self.step, range(number), ss.spawn(number),
repeat(method))
ensembles = np.array(list(tqdm(iterator, total=number)))
pool.close()
pool.join()
pool.clear()
raw_ensemble = ensembles[:, 0, :, :]
unfolded_ensemble = ensembles[:, 1, :, :]
firstgen_ensemble = ensembles[:, 2, :, :]
# TODO Move this to a save step
self.raw.save(self.path / 'raw.npy')
# saving for firstgen is in step due to pickling
self.firstgen = Matrix(path=self.path / 'firstgen.npy')
# Calculate standard deviation
raw_ensemble_std = np.std(raw_ensemble, axis=0)
raw_std = Matrix(raw_ensemble_std, self.raw.Eg, self.raw.Ex,
state='std')
raw_std.save(self.path / "raw_std.npy")
unfolded_ensemble_std = np.std(unfolded_ensemble, axis=0)
unfolded_std = Matrix(unfolded_ensemble_std, self.raw.Eg,
self.raw.Ex, state='std')
unfolded_std.save(self.path / "unfolded_std.npy")
firstgen_ensemble_std = np.std(firstgen_ensemble, axis=0)
firstgen_std = Matrix(firstgen_ensemble_std, self.firstgen.Eg,
self.firstgen.Ex, state='std')
firstgen_std.save(self.path / "firstgen_std.npy")
self.std_raw = raw_std
self.std_unfolded = unfolded_std
self.std_firstgen = firstgen_std
self.raw_ensemble = raw_ensemble
self.unfolded_ensemble = unfolded_ensemble
self.firstgen_ensemble = firstgen_ensemble
[docs] def step(self, step: int, rsequence: np.random.SeedSequence,
method: str):
"""Single step in `self.generate`
Args:
step (int): step (loop) number
rsequence (np.random.SeedSequence): SeedSequence for random seed
method (str): see `self.generate`
Returns:
raw, unfolded, firstgen
"""
LOG.info(f"Generating/loading {step}")
rstate = np.random.default_rng(rsequence)
if self.bg is not None:
prompt_w_bg = self.generate_perturbed(step, method,
state="prompt+bg",
rstate=rstate)
bg = self.generate_perturbed(step, method, state="bg",
rstate=rstate)
raw = self.subtract_bg(step, prompt_w_bg, bg)
else:
raw = self.generate_perturbed(step, method, state="raw",
rstate=rstate)
unfolded = self.unfold(step, raw)
firstgen = self.first_generation(step, unfolded)
if step == 0: # workaround
firstgen.save(self.path / 'firstgen.npy')
assert(raw.shape == unfolded.shape and raw.shape == firstgen.shape), \
("For now, all matrices have to have the same shape. Currently, "
f"shapes: raw: {raw.shape}, unfolded: {unfolded.shape} and "
f"firstgen: {firstgen.shape}")
return raw.values, unfolded.values, firstgen.values
[docs] def generate_perturbed(self, step: int, method: str, state: str,
rstate: np.random.Generator) -> Matrix:
"""Generate a perturbated matrix
Looks for an already generated file before generating the matrix.
Args:
step: The identifier of the matrix to generate
method: The name of the method to use. Can be either
"gaussian" or "poisson"
state: Either "raw", "prompt+bg" or "bg".
rstate: numpy random generator (random state)
Returns:
The generated matrix
"""
allowed = ["raw", "prompt+bg", "bg"]
if state not in allowed:
raise NotImplementedError(f"Matrix must be a state in {allowed}")
LOG.debug(f"Working on {state} ensemble {step}")
path = self.path / f"{state}_{step}.npy"
mat = self.load_matrix(path)
if mat is None:
LOG.debug(f"(Re)generating {path} using {method} process")
if method == 'gaussian':
values = self.generate_gaussian(state, rstate)
elif method == 'poisson':
values = self.generate_poisson(state, rstate)
else:
raise ValueError(f"Method {method} is not supported")
base_mat = self.matrix_from_state(state)
mat = Matrix(values, Eg=base_mat.Eg, Ex=base_mat.Ex, state=state)
mat.save(path)
self.action_from_state(state).act_on(mat)
return mat
[docs] def subtract_bg(self, step: int, matrix: Matrix, bg: Matrix) -> Matrix:
""" Subtract bg from "raw" (prompt+bg) matrix
Looks for an already generated file before generating the matrix.
Args:
step: The identifier of the matrix to act on.
matrix: The matrix to subtract from (usually, prompt+bg)
bg: The bg matrix.
Returns:
raw: The prompts only matrix ("raw" matrix).
"""
LOG.debug(f"Subtracting bg from prompt+bg {step}")
path = self.path / f"raw_{step}.npy"
raw = self.load_matrix(path)
if raw is None:
LOG.debug("Raw matrix")
raw = matrix - self.bg_ratio * bg
raw.remove_negative()
raw.save(path)
self.action_raw.act_on(raw)
return raw
[docs] def unfold(self, step: int, raw: Matrix) -> Matrix:
"""Unfolds the raw matrix
Looks for an already generated file before generating the matrix.
Args:
step: The identifier of the matrix to unfold.
raw: The raw matrix to unfold
Returns:
The unfolded matrix
"""
LOG.debug(f"Unfolding raw {step}")
path = self.path / f"unfolded_{step}.npy"
unfolded = self.load_matrix(path)
if unfolded is None:
LOG.debug("Unfolding matrix")
unfolded = self.unfolder(raw)
unfolded.save(path)
self.action_unfolded.act_on(unfolded)
return unfolded
[docs] def first_generation(self, step: int, unfolded: Matrix) -> Matrix:
"""Applies first generation method to an unfolded matrix
Looks for an already generated file before applying first generation.
Args:
step: The identifier of the matrix to unfold.
unfolded: The matrix to apply first generation method to.
Returns:
The resulting matrix
"""
LOG.debug(f"Performing first generation on unfolded {step}")
path = self.path / f"firstgen_{step}.npy"
firstgen = self.load_matrix(path)
if firstgen is None:
LOG.debug("Calculating first generation matrix")
firstgen = self.first_generation_method(unfolded)
firstgen.save(path)
self.action_firstgen.act_on(firstgen)
return firstgen
[docs] def rebin(self, out_array: np.ndarray, member: str) -> None:
""" Rebins the first generations matrixes and recals std
Args:
out_array: mid-bin energy calibration of output
matrix along rebin axis
member: which member to rebin, currently only FG available
"""
if member != 'firstgen':
raise NotImplementedError("Not implemented for raw and unfolding "
"yet. if done, need to redo unfolding "
"and/or first gen method")
ensemble = self.firstgen_ensemble
matrix = self.firstgen
do_Ex = not np.array_equal(out_array, matrix.Ex)
do_Eg = not np.array_equal(out_array, matrix.Eg)
if (not do_Ex) and (not do_Eg):
return
rebinned = np.zeros((self.size, out_array.size, out_array.size))
for i in tqdm(range(self.size)):
values = ensemble[i]
if do_Ex:
values = rebin_2D(values, mids_in=matrix.Ex,
mids_out=out_array, axis=0)
if do_Eg:
values = rebin_2D(values, mids_in=matrix.Eg,
mids_out=out_array, axis=1)
rebinned[i] = values
# correct fg matrix (different attribute) and axis
values = matrix.values
if do_Ex:
values = rebin_2D(values, mids_in=matrix.Ex,
mids_out=out_array, axis=0)
if do_Eg:
values = rebin_2D(values, mids_in=matrix.Eg,
mids_out=out_array, axis=1)
matrix = Matrix(values, out_array, out_array)
# recalculate std
firstgen_ensemble_std = np.std(rebinned, axis=0)
firstgen_std = Matrix(firstgen_ensemble_std, matrix.Eg,
matrix.Ex, state='std')
firstgen_std.save(self.path / "firstgen_std.npy")
self.firstgen = matrix
self.firstgen_ensemble = rebinned
self.std_firstgen = firstgen_std
[docs] def generate_gaussian(self, state: str,
rstate: Optional[np.random.Generator] = np.random.default_rng) -> np.ndarray: # noqa
"""Generates an array with Gaussian perturbations of a matrix.
Note that entries are truncated at 0 (only positive).
Args:
state: State of the matrx/which matrix should be taken as a base
rstate (optional): numpy random generator (random state)
Returns:
The resulting array
"""
mat = self.matrix_from_state(state)
std = np.sqrt(np.where(mat.values > 0, mat.values, 0))
perturbed = rstate.normal(size=mat.shape, loc=mat.values,
scale=std)
perturbed[perturbed < 0] = 0
return perturbed
[docs] def generate_poisson(self, state: str,
rstate: Optional[np.random.Generator] = np.random.default_rng) -> np.ndarray: # noqa
"""Generates an array with Poisson perturbations of a matrix
Args:
state: State of the matrx/which matrix should be taken as a base
rstate (optional): numpy random generator (random state)
Returns:
The resulting array
"""
mat = self.matrix_from_state(state)
std = np.where(mat.values > 0, mat.values, 0)
perturbed = rstate.poisson(std)
return perturbed
[docs] def load_matrix(self, path: Union[Path, str]) -> Union[Matrix, None]:
"""Check if file exists and should not be regenerated
Args:
path: Path to file to load
Returns:
Matrix if file exists and can be loaded, None otherwise.
"""
path = Path(path)
if path.exists() and not self.regenerate:
LOG.debug(f"Loading {path}")
return Matrix(path=path)
[docs] def get_raw(self, index: Union[int, List[int]]) -> Union[Matrix,
List[Matrix]]:
"""Get the raw matrix(ces) created in ensemble generation.
Args:
index: The index of the ensemble. If an iterable,
a list of matrices will be returned.
Returns:
The matrix(ces) corresponding to index(ces)
"""
try:
matrices = []
for i in index:
matrices.append(Matrix(self.raw_ensemble[i], self.raw.Eg,
self.raw.Ex))
return matrices
except TypeError:
pass
return Matrix(self.raw_ensemble[index], self.raw.Eg,
self.raw.Ex)
[docs] def get_unfolded(self,
index: Union[int, List[int]]) -> Union[Matrix,
List[Matrix]]:
"""Get the unfolded matrix(ces) created in ensemble generation.
Args:
index: The index of the ensemble. If an iterable,
a list of matrices will be returned.
Returns:
The matrix(ces) corresponding to index(ces)
"""
try:
matrices = []
for i in index:
matrices.append(Matrix(self.unfolded_ensemble[i], self.raw.Eg,
self.raw.Ex))
return matrices
except TypeError:
pass
return Matrix(self.unfolded_ensemble[index], self.raw.Eg,
self.raw.Ex, state='unfolded')
[docs] def get_firstgen(self,
index: Union[int, List[int]]) -> Union[Matrix,
List[Matrix]]:
"""Get the first generation matrix(ces) created in ensemble generation.
Args:
index: The index of the ensemble. If an iterable,
a list of matrices will be returned.
Returns:
The matrix(ces) corresponding to index(ces)
"""
try:
matrices = []
for i in index:
matrices.append(Matrix(self.firstgen_ensemble[i],
self.firstgen.Eg,
self.firstgen.Ex))
return matrices
except TypeError:
pass
return Matrix(self.firstgen_ensemble[index],
self.firstgen.Eg,
self.firstgen.Ex, state='firstgen')
[docs] def action_from_state(self, state: str) -> Action:
""" Return the action corresponding to a given state
Args:
state: The state
Returns:
action: The corresponding action
"""
if state == "raw":
action = self.action_raw
elif state == "prompt+bg":
action = self.action_prompt_w_bg
elif state == "bg":
action = self.action_bg
elif state == "unfolded":
action = self.action_unfolded
elif state == "firstgen":
action = self.action_firstgen
else:
raise NotImplementedError(f"State {state} is not a known state")
return action
[docs] def matrix_from_state(self, state: str) -> Action:
""" Return the matrix corresponding to a given state
Args:
state: The state
Returns:
Matrix: The corresponding matrix
"""
if state == "raw":
matrix = self.raw
elif state == "prompt+bg":
matrix = self.prompt_w_bg
elif state == "bg":
matrix = self.bg
# elif state == "unfolded":
# matrix = self.unfolded
# elif state == "firstgen":
# matrix = self.firstgen
else:
raise NotImplementedError(f"State {state} is not a known state")
return matrix
[docs] def plot(self, *, ax: Any = None,
vmin: Optional[float] = None,
vmax: Optional[float] = None,
add_cbar: bool = True,
scale_by: str = 'all',
**kwargs) -> Tuple[Any, ndarray]:
""" Plot the computed standard deviations
Args:
ax (optional): A matplotlib axis to plot onto.
vmin (optional, float): The lower cutoff for colors.
vmax (optional, float): The upper cutoff for colors.
add_cbar (optional, bool): Whether to add a colorbar.
Defaults to True.
scale_by (optional, str): Which std matrix to set color
limits by. Can be "raw", "unfolded", "firstgen" or "all".
Defaults to "all".
Returns:
The matplotlib figure and axis
"""
if ax is not None:
if len(ax) < 3:
raise ValueError("Three axes must be provided")
fig = ax.figure
else:
fig, ax = plt.subplots(ncols=3, sharey=True,
constrained_layout=True)
extrema = lambda x: (np.min(x), np.max(x)) # noqa
choices = {"raw": extrema(self.std_raw.values),
"unfolded": extrema(self.std_unfolded.values),
"firstgen": extrema(self.std_firstgen.values)}
choices["all"] = extrema([v for v in choices.values()])
if scale_by not in choices:
raise ValueError(f"`scale_by` can not be {scale_by}")
vminset = True
if vmin is None:
vminset = False
vmin = choices[scale_by][0]
vmin = 1 if vmin <= 0 else vmin
vmaxset = True
if vmax is None:
vmaxset = False
vmax = choices[scale_by][1]
# Actual plotting
self.std_raw.plot(ax=ax[0], title='Raw', add_cbar=False,
vmin=vmin, vmax=vmax, **kwargs)
self.std_unfolded.plot(ax=ax[1], title='Unfolded', add_cbar=False,
vmin=vmin, vmax=vmax, **kwargs)
im, _, _ = self.std_firstgen.plot(ax=ax[2], title='First Generation',
vmin=vmin, vmax=vmax,
add_cbar=False, **kwargs)
# Y labels only clutter
ax[1].set_ylabel(None)
ax[2].set_ylabel(None)
# Handle the colorbar
if add_cbar:
if vminset and vmaxset:
fig.colorbar(im, extend='both')
elif vminset:
fig.colorbar(im, extend='min')
elif vmaxset:
fig.colorbar(im, extend='max')
else:
fig.colorbar(im)
fig.suptitle("Standard Deviation")
return fig, ax