Source code for ompy.normalizer_nld

import numpy as np
import copy
import logging
import termtables as tt
import json
import pymultinest
import matplotlib.pyplot as plt
import warnings
from contextlib import redirect_stdout
from numpy import ndarray
from scipy.optimize import differential_evolution
from typing import Optional, Tuple, Any, Union, Callable, Dict
from pathlib import Path

from .stats import truncnorm_ppf
from .vector import Vector
from .library import self_if_none
from .spinfunctions import SpinFunctions
from .filehandling import load_discrete
from .models import ResultsNormalized, NormalizationParameters
from .abstract_normalizer import AbstractNormalizer

TupleDict = Dict[str, Tuple[float, float]]


[docs]class NormalizerNLD(AbstractNormalizer): """ Normalizes NLD to empirical data Normalizes nld/gsf according to:: nld' = nld * A * np.exp(alpha * Ex), and This is the transformation eq (3), Schiller2000 Takes empirical data in form of an array of discrete levels, neutron separation energy Sn, a model to estimate what the NLD is at Sn, and several parameters for the model as well as bounds on normalization parameters. As a consequence of a complex problem, this class has a complex interface. Much of the book-keeping associated with normalization has been automated, but there is still a lot of settings and parameters for the user to take care of. Some default values has been seen, but the user must be _EXTREMELY_ careful when evaluating the output. Attributes: discrete (Vector): The discrete NLD at lower energies. [MeV] nld (Vector): The NLD to normalize. Gets converted to [MeV] from [keV]. norm_pars (NormalizationParameters): Normalization parameters like experimental D₀, and spin(-cut) model bounds (Dict[str, Tuple[float, float]): The bounds on each of the parameters. Its keys are 'A', 'alpha', 'T', and 'D0'. The values are on the form (min, max). model (Callable[..., ndarray]): The model to use at high energies to estimate the NLD. Defaults to constant temperature model. de_kwargs(dict): Additional keywords to differential evolution. Defaults to `{"seed": 65424}`. Note that `bounds` has been taken out as a separate attribute, but is a keyword of de. multinest_path (Path): Where to save the multinest output. defaults to 'multinest'. multinest_kwargs (dict): Additional keywords to multinest. Defaults to `{"seed": 65498, "resume": False}` res (ResultsNormalized): Results of the normalization smooth_levels_fwhm (float): FWHM with which the discrete levels shall be smoothed when loading from file. Defaults to 0.1 MeV. path (Path): The path save the results. """ LOG = logging.getLogger(__name__) # overwrite parent variable logging.captureWarnings(True) def __init__(self, *, nld: Optional[Vector] = None, discrete: Optional[Union[str, Vector]] = None, path: Optional[Union[str, Path]] = 'saved_run/normalizers', regenerate: bool = False, norm_pars: Optional[NormalizationParameters] = None) -> None: """ Normalizes nld ang gSF. Note: The prefered syntax is `Normalizer(nld=...)` If neither is given, the nld (and other parameters) can be explicity be set later by:: `normalizer.normalize(..., nld=...)` or:: `normalizer.nld = ...` In the later case you *might* have to send in a copy if it's a mutable to ensure it is not changed. Args: extractor: see above nld: see above discrete: see above path: see above norm_pars: see above TODO: - parameter to limit the number of multinest samples to store. Note that the samples should be shuffled to retain some "random" samples from the pdf (not the importance weighted) """ super().__init__(regenerate) # Create the private variables self._discrete = None self._discrete_path = None self._D0 = None self._smooth_levels_fwhm = None self.norm_pars = norm_pars self.bounds = {'A': [0.1, 1e3], 'alpha': [1e-1, 20], 'T': [0.1, 1], 'Eshift': [-5, 5]} # D0 bounds set later self.model: Optional[Callable[..., ndarray]] = self.const_temperature # self.curried_model = lambda *arg: None self.de_kwargs = {"seed": 65424} self.multinest_path = Path('multinest') self.multinest_kwargs: dict = {"seed": 65498, "resume": False} # Handle the method parameters self.smooth_levels_fwhm = 0.1 self.nld = None if nld is None else nld.copy() self.discrete = discrete self.res = ResultsNormalized(name="Results NLD") self.limit_low = None self.limit_high = None self.std_fake = None # See `normalize` if path is None: self.path = None else: self.path = Path(path) self.path.mkdir(exist_ok=True, parents=True)
[docs] def __call__(self, *args, **kwargs) -> None: """ Wrapper around normalize """ self.normalize(*args, **kwargs)
[docs] def normalize(self, *, limit_low: Optional[Tuple[float, float]] = None, limit_high: Optional[Tuple[float, float]] = None, nld: Optional[Vector] = None, discrete: Optional[Vector] = None, bounds: Optional[TupleDict] = None, norm_pars: Optional[NormalizationParameters] = None, num: int = 0) -> None: """ Normalize NLD to a low and high energy region Args: limit_low: The limits (start, stop) where to normalize to discrete levels. limit_high: The limits (start, stop) where to normalize to a theoretical model and neutron separation energy at high energies. nld: The nuclear level density vector to normalize. discrete: The discrete level density at low energies to normalize to. bounds: The bounds of the parameters norm_pars (NormalizationParameters): Normalization parameters like experimental D₀, and spin(-cut) model num (optional): Loop number, defauts to 0 regenerate: Whether to use already generated files (False) or generate them all anew (True). """ if not self.regenerate: try: self.load() return except FileNotFoundError: pass # Update internal state self.limit_low = self.self_if_none(limit_low) self.limit_high = self.self_if_none(limit_high) limit_low = self.limit_low limit_high = self.limit_high discrete = self.self_if_none(discrete) discrete.to_MeV() nld = self.self_if_none(nld) self.norm_pars = self.self_if_none(norm_pars) self.norm_pars.is_changed(include=["D0", "Sn", "spincutModel", "spincutPars"]) # check that set self.bounds = self.self_if_none(bounds) # ensure that it's updated if running again self.res = ResultsNormalized(name="Results NLD") self.LOG.info(f"\n\n---------\nNormalizing nld #{num}") nld = nld.copy() self.LOG.debug("Setting NLD, convert to MeV") nld.to_MeV() # Need to give some sort of standard deviation for sensible results # Otherwise deviations at higher level density will have an # uncreasonably high weight. if self.std_fake is None: self.std_fake = False if self.std_fake or nld.std is None: self.std_fake = True nld.std = nld.values * 0.3 # x% is an arb. choice self.nld = nld # Use DE to get an inital guess before optimizing args, guess = self.initial_guess(limit_low, limit_high) # Optimize using multinest popt, samples = self.optimize(num, args, guess) transformed = nld.transform(popt['A'][0], popt['alpha'][0], inplace=False) if self.std_fake: nld.std = None transformed.std = None self.res.nld = transformed self.res.pars = popt self.res.samples = samples ext_model = lambda E: self.model(E, T=popt['T'][0], Eshift=popt['Eshift'][0]) self.res.nld_model = ext_model self.save() # save instance
[docs] def initial_guess(self, limit_low: Optional[Tuple[float, float]] = None, limit_high: Optional[Tuple[float, float]] = None ) -> Tuple[Tuple[float, float, float, float], Dict[str, float]]: """ Find an inital guess for the constant, α, T and D₀ Uses differential evolution to perform the guessing. Args: limit_low: The limits (start, stop) where to normalize to discrete levels. limit_high: The limits (start, stop) where to normalize to a theoretical model and neutron separation energy at high energies. Returns: The arguments used for chi^2 minimization and the minimizer. """ limit_low = self.self_if_none(limit_low) limit_high = self.self_if_none(limit_high) bounds = list(self.bounds.values()) spinParsstring = json.dumps(self.norm_pars.spincutPars, indent=4, sort_keys=True) self.LOG.debug("Using bounds %s", bounds) self.LOG.debug("Using spincutModel %s", self.norm_pars.spincutModel) self.LOG.debug("Using spincutPars %s", spinParsstring) nld_low = self.nld.cut(*limit_low, inplace=False) discrete = self.discrete.cut(*limit_low, inplace=False) nld_high = self.nld.cut(*limit_high, inplace=False) nldSn = self.nldSn_from_D0(**self.norm_pars.asdict())[1] rel_uncertainty = self.norm_pars.D0[1]/self.norm_pars.D0[0] nldSn = np.array([nldSn, nldSn * rel_uncertainty]) def neglnlike(*args, **kwargs): return - self.lnlike(*args, **kwargs) args = (nld_low, nld_high, discrete, self.model, self.norm_pars.Sn[0], nldSn) res = differential_evolution(neglnlike, bounds=bounds, args=args, **self.de_kwargs) self.LOG.info("DE results:\n%s", tt.to_string([res.x.tolist()], header=['A', 'α [MeV⁻¹]', 'T [MeV]', 'Eshift [MeV]'])) p0 = dict(zip(["A", "alpha", "T", "Eshift"], (res.x).T)) for key, res in p0.items(): if res in self.bounds[key]: self.LOG.warning(f"DE result for {key} is at edge its bound:" f"{self.bounds[key]}. This will probably lead" f"to wrong estimations in multinest, too.") return args, p0
[docs] def optimize(self, num: int, args, guess: Dict[str, float]) -> Tuple[Dict[str, float], Dict[str, float]]: """Find parameters given model constraints and an initial guess Employs Multinest Args: num (int): Loop number args_nld (Iterable): Additional arguments for the nld lnlike guess (Dict[str, float]): The initial guess of the parameters Returns: Tuple: - popt (Dict[str, Tuple[float, float]]): Median and 1sigma of the parameters - samples (Dict[str, List[float]]): Multinest samples. Note: They are still importance weighted, not random draws from the posterior. Raises: ValueError: Invalid parameters for automatic prior Note: You might want to adjust the priors for your specific case! Here we just propose a general solution that might often work out of the box. """ if guess['alpha'] < 0: raise NotImplementedError("Prior selection not implemented for " "α < 0") alpha_exponent = np.log10(guess['alpha']) if guess['T'] < 0: raise ValueError("Prior selection not implemented for T < 0; " "negative temperature is unphysical") T_exponent = np.log10(guess['T']) A = guess['A'] # truncations from absolute values lower_A, upper_A = 0., np.inf mu_A, sigma_A = A, 10*A a_A = (lower_A - mu_A) / sigma_A b_A = (upper_A - mu_A) / sigma_A lower_Eshift, upper_Eshift = -5., 5 mu_Eshift, sigma_Eshift = 0, 5 a_Eshift = (lower_Eshift - mu_Eshift) / sigma_Eshift b_Eshift = (upper_Eshift - mu_Eshift) / sigma_Eshift def prior(cube, ndim, nparams): # NOTE: You may want to adjust this for your case! # truncated normal prior cube[0] = truncnorm_ppf(cube[0], a_A, b_A)*sigma_A+mu_A # log-uniform prior # if alpha = 1e2, it's between 1e1 and 1e3 cube[1] = 10**(cube[1]*2 + (alpha_exponent-1)) # log-uniform prior # if T = 1e2, it's between 1e1 and 1e3 cube[2] = 10**(cube[2]*2 + (T_exponent-1)) # truncated normal prior cube[3] = truncnorm_ppf(cube[3], a_Eshift, b_Eshift)*sigma_Eshift + mu_Eshift if np.isinf(cube[3]): self.LOG.debug("Encountered inf in cube[3]:\n%s", cube[3]) def loglike(cube, ndim, nparams): return self.lnlike(cube, *args) self.multinest_path.mkdir(exist_ok=True) path = self.multinest_path / f"nld_norm_{num}_" assert len(str(path)) < 60, "Total path length too long for multinest" self.LOG.info("Starting multinest") self.LOG.debug("with following keywords %s:", self.multinest_kwargs) # Hack where stdout from Multinest is redirected as info messages self.LOG.write = lambda msg: (self.LOG.info(msg) if msg != '\n' else None) with redirect_stdout(self.LOG): pymultinest.run(loglike, prior, len(guess), outputfiles_basename=str(path), **self.multinest_kwargs) # Save parameters for analyzer names = list(guess.keys()) json.dump(names, open(str(path) + 'params.json', 'w')) analyzer = pymultinest.Analyzer(len(guess), outputfiles_basename=str(path)) stats = analyzer.get_stats() samples = analyzer.get_equal_weighted_posterior()[:, :-1] samples = dict(zip(names, samples.T)) # Format the output popt = dict() vals = [] for name, m in zip(names, stats['marginals']): lo, hi = m['1sigma'] med = m['median'] sigma = (hi - lo) / 2 popt[name] = (med, sigma) i = max(0, int(-np.floor(np.log10(sigma))) + 1) fmt = '%%.%df' % i fmts = '\t'.join([fmt + " ± " + fmt]) vals.append(fmts % (med, sigma)) self.LOG.info("Multinest results:\n%s", tt.to_string([vals], header=['A', 'α [MeV⁻¹]', 'T [MeV]', 'Eshift [MeV]'])) return popt, samples
[docs] def plot(self, *, ax: Any = None, add_label: bool = True, results: Optional[ResultsNormalized] = None, add_figlegend: bool = True, plot_fitregion: bool = True, reset_color_cycle: bool = True, **kwargs) -> Tuple[Any, Any]: """Plot the NLD, discrete levels and result of normalization Args: ax (optional): The matplotlib axis to plot onto. Creates axis is not provided add_label (bool, optional): Defaults to `True`. add_figlegend (bool, optional):Defaults to `True`. results (ResultsNormalized, optional): If provided, nld and model are taken from here instead. plot_fitregion (Optional[bool], optional): Defaults to `True`. reset_color_cycle (Optional[bool], optional): Defaults to `True` **kwargs: Description Returns: fig, ax """ if ax is None: fig, ax = plt.subplots() else: fig = ax.figure if reset_color_cycle: ax.set_prop_cycle(None) res = self.res if results is None else results pars = res.pars nld = res.nld labelNld = '_exp.' labelNldSn = None labelModel = "_model" labelDiscrete = "_known levels" if add_label: labelNld = 'exp.' labelNldSn = r'$\rho(S_n)$' labelModel = 'model' labelDiscrete = "known levels" nld.plot(ax=ax, label=labelNld, **kwargs) self.discrete.plot(ax=ax, kind='step', c='k', label=labelDiscrete) nldSn = self.nldSn_from_D0(**self.norm_pars.asdict())[1] rel_uncertainty = self.norm_pars.D0[1]/self.norm_pars.D0[0] nldSn = np.array([nldSn, nldSn * rel_uncertainty]) x = np.linspace(self.limit_high[0], self.norm_pars.Sn[0]) model = Vector(E=x, values=self.model(E=x, T=pars['T'][0], Eshift=pars['Eshift'][0])) ax.errorbar(self.norm_pars.Sn[0], nldSn[0], yerr=nldSn[1], label=labelNldSn, fmt="ks", markerfacecolor='none') # workaround for enseble Normalizer; always keep these label for i in range(3): ax.lines[-(i+1)]._label = "_nld(Sn)" ax.plot(model.E, model.values, "--", label=labelModel, markersize=0, c='g', **kwargs) if plot_fitregion: ax.axvspan(self.limit_low[0], self.limit_low[1], color='grey', alpha=0.1, label="fit limits") ax.axvspan(self.limit_high[0], self.limit_high[1], color='grey', alpha=0.1) ax.set_yscale('log') ax.set_ylabel(r"Level density $\rho(E_x)~[\mathrm{MeV}^{-1}]$") ax.set_xlabel(r"Excitation energy $E_x~[\mathrm{MeV}]$") ax.set_ylim(bottom=0.5/(nld.E[1]-nld.E[0])) if fig is not None and add_figlegend: fig.legend(loc=9, ncol=3, frameon=False) return fig, ax
[docs] @staticmethod def lnlike(x: Tuple[float, float, float, float], nld_low: Vector, nld_high: Vector, discrete: Vector, model: Callable[..., ndarray], Sn, nldSn) -> float: """ Compute log likelihood of the normalization fitting This is the result up a, which is irrelevant for the maximization Args: x: The arguments ordered as A, alpha, T and Eshift nld_low: The lower region where discrete levels will be fitted. nld_high: The upper region to fit to model. discrete: The discrete levels to be used in fitting the lower region. model: The model to use when fitting the upper region. Must support the keyword arguments ``model(E=..., T=..., Eshift=...) -> ndarray`` Returns: lnlike: log likelihood """ A, alpha, T, Eshift = x[:4] # slicing needed for multinest? transformed_low = nld_low.transform(A, alpha, inplace=False) transformed_high = nld_high.transform(A, alpha, inplace=False) err_low = transformed_low.error(discrete) expected = Vector(E=transformed_high.E, values=model(E=transformed_high.E, T=T, Eshift=Eshift)) err_high = transformed_high.error(expected) nldSn_model = model(E=Sn, T=T, Eshift=Eshift) err_nldSn = ((nldSn[0] - nldSn_model)/nldSn[1])**2 ln_stds = (np.log(transformed_low.std).sum() + np.log(transformed_high.std).sum()) return -0.5*(err_low + err_high + err_nldSn + ln_stds)
[docs] @staticmethod def const_temperature(E: ndarray, T: float, Eshift: float) -> ndarray: """ Constant Temperature NLD""" ct = np.exp((E - Eshift) / T) / T return ct
[docs] @staticmethod def nldSn_from_D0(D0: Union[float, Tuple[float, float]], Sn: Union[float, Tuple[float, float]], Jtarget: float, spincutModel: str, spincutPars: Dict[str, Any], **kwargs) -> Tuple[float, float]: """Calculate nld(Sn) from D0 1/D0 = nld(Sn) * ( g(Jtarget+1/2, pi_target) + g(Jtarget1/2, pi_target) ) Here we assume equal parity, g(J,pi) = g(J)/2 and nld(Sn) = 1/D0 * 2/(g(Jtarget+1/2) + g(Jtarget-1/2)) For the case Jtarget = 0, the g(Jtarget-1/2) = 0 Parameters: D0 (float or [float, float]): Average resonance spacing from s waves [eV]. If a tuple, it is assumed that it is of the form `[value, uncertainty]`. Sn (float or [float, float]): Separation energy [MeV]. If a tuple, it is assumed that it is of the form `[value, uncertainty]`. Jtarget (float): Target spin spincutModel (str): Model to for the spincut spincutPars Dict[str, Any]: Additional parameters necessary for the spin cut model **kwargs: Description Returns: nld: Ex=Sn and nld at Sn [MeV, 1/MeV] """ D0 = np.atleast_1d(D0)[0] Sn = np.atleast_1d(Sn)[0] def g(J): return SpinFunctions(Ex=Sn, J=J, model=spincutModel, pars=spincutPars).distribution() if Jtarget == 0: summe = 1 / 2 * g(Jtarget + 1 / 2) else: summe = 1 / 2 * (g(Jtarget - 1 / 2) + g(Jtarget + 1 / 2)) nld = 1 / (summe * D0 * 1e-6) return [Sn, nld]
[docs] @staticmethod def D0_from_nldSn(nld_model: Callable[..., Any], Sn: Union[float, Tuple[float, float]], Jtarget: float, spincutModel: str, spincutPars: Dict[str, Any], **kwargs) -> Tuple[float, float]: """Calculate D0 from nld(Sn), assuming equiparity. This is the inverse of `nldSn_from_D0` Parameters: nld_model (Callable[..., Any]): Model for nld above data of the from `y = nld_model(E)` in 1/MeV. Sn (float or [float, float]): Separation energy [MeV]. If a tuple, it is assumed that it is of the form `[value, uncertainty]`. Jtarget (float): Target spin spincutModel (str): Model to for the spincut spincutPars Dict[str, Any]: Additional parameters necessary for the spin cut model **kwargs: Description Returns: D0: D0 in eV """ Sn = np.atleast_1d(Sn)[0] nld = nld_model(Sn) def g(J): return SpinFunctions(Ex=Sn, J=J, model=spincutModel, pars=spincutPars).distribution() if Jtarget == 0: summe = 1 / 2 * g(Jtarget + 1 / 2) else: summe = 1 / 2 * (g(Jtarget - 1 / 2) + g(Jtarget + 1 / 2)) D0 = 1 / (summe * nld * 1e-6) return D0
@property def discrete(self) -> Optional[Vector]: return self._discrete @discrete.setter def discrete(self, value: Optional[Union[Path, str, Vector]]) -> None: if value is None: self._discretes = None self.LOG.debug("Set `discrete` to None") elif isinstance(value, (str, Path)): if self.nld is None: raise ValueError(f"`nld` must be set before loading levels") nld = self.nld.copy() nld.to_MeV() self.LOG.debug("Set `discrete` levels from file with FWHM %s", self.smooth_levels_fwhm) self._discrete = load_levels_smooth(value, nld.E, self.smooth_levels_fwhm) self._discrete.units = "MeV" self._discrete_path = value elif isinstance(value, Vector): if self.nld is not None and np.any(self.nld.E != value.E): raise ValueError("`nld` and `discrete` must" " have same energy binning") self._discrete = value self.LOG.debug("Set `discrete` by Vector") else: raise ValueError(f"Value {value} is not supported" " for discrete levels") @property def smooth_levels_fwhm(self) -> Optional[float]: return self._smooth_levels_fwhm @smooth_levels_fwhm.setter def smooth_levels_fwhm(self, value: float) -> None: self._smooth_levels_fwhm = value if self._discrete_path is not None: self.discrete = self._discrete_path
[docs] def self_if_none(self, *args, **kwargs): """ wrapper for lib.self_if_none """ return self_if_none(self, *args, **kwargs)
[docs]def load_levels_discrete(path: Union[str, Path], energy: ndarray) -> Vector: """ Load discrete levels without smoothing Assumes linear equdistant binning Args: path: The file to load energy: The binning to use Returns: A vector describing the levels """ histogram, _ = load_discrete(path, energy, 0.1) return Vector(values=histogram, E=energy, units='MeV')
[docs]def load_levels_smooth(path: Union[str, Path], energy: ndarray, resolution: float = 0.1) -> Vector: """ Load discrete levels with smoothing Assumes linear equdistant binning Args: path: The file to load energy: The binning to use in MeV resolution: The resolution (FWHM) of the smoothing to use in MeV Returns: A vector describing the smoothed levels """ histogram, smoothed = load_discrete(path, energy, resolution) return Vector(values=smoothed if resolution > 0 else histogram, E=energy, units='MeV')