import logging
import numpy as np
import copy
import json
import termtables as tt
from numpy import ndarray
from pathlib import Path
from typing import Optional, Union, Tuple, Any, Callable, Dict, Iterable, List
import pymultinest
import matplotlib.pyplot as plt
from contextlib import redirect_stdout
from .abstract_normalizer import AbstractNormalizer
from .extractor import Extractor
from .library import log_interp1d, self_if_none
from .models import Model, ResultsNormalized, ExtrapolationModelLow,\
ExtrapolationModelHigh, NormalizationParameters
from .normalizer_nld import NormalizerNLD
from .normalizer_gsf import NormalizerGSF
from .spinfunctions import SpinFunctions
from .vector import Vector
from .stats import truncnorm_ppf
[docs]class NormalizerSimultan(AbstractNormalizer):
""" Simultaneous normalization of nld and gsf. Composed of Normalizer and NormalizerGSF as input, so read more on the normalization there
Attributes:
extractor (Extractor): Extractor instance
gsf (Optional[Vector], optional): gsf to normalize
multinest_path (Path, optional): Default path where multinest
saves files
multinest_kwargs (dict): Additional keywords to multinest. Defaults to
`{"seed": 65498, "resume": False}`
nld (Optional[Vector], optional): nld to normalize
normalizer_nld (NormalizerNLD): `NormalizerNLD` instance to get the normalization paramters
normalizer_gsf (NormalizerGSF): `NormalizerGSF` instance to get the normalization paramters
res (ResultsNormalized): Results
std_fake_gsf (bool): Whether the std. deviation is faked
(see `normalize`)
std_fake_nld (bool): Whether the std. deviation is faked
(see `normalize`)
path (Path): The path save the results.
TODO:
Work with more general models, too, not just CT for nld
"""
LOG = logging.getLogger(__name__)
logging.captureWarnings(True)
def __init__(self, *,
gsf: Optional[Vector] = None,
nld: Optional[Vector] = None,
normalizer_nld: Optional[NormalizerNLD] = None,
normalizer_gsf: Optional[NormalizerGSF] = None,
path: Optional[Union[str, Path]] = 'saved_run/normalizers',
regenerate: bool = False):
"""
TODO:
- currently have to set arguments here, an cannot set them in
"normalize"
Args:
gsf (optional): see above
nld (optional): see above
normalizer_nld (optional): see above
normalizer_gsf (optional): see above
"""
super().__init__(regenerate)
if normalizer_nld is None:
self.normalizer_nld = None
else:
self.normalizer_nld = copy.deepcopy(normalizer_nld)
if normalizer_gsf is None:
self.normalizer_gsf = None
else:
self.normalizer_gsf = copy.deepcopy(normalizer_gsf)
self.gsf = None if gsf is None else gsf.copy()
self.nld = None if nld is None else nld.copy()
self.std_fake_nld: Optional[bool] = None # See `normalize`
self.std_fake_gsf: Optional[bool] = None # See `normalize`
self.res: Optional[ResultsNormalized] = None
self.multinest_path: Optional[Path] = Path('multinest')
self.multinest_kwargs: dict = {"seed": 65498, "resume": False}
if path is None:
self.path = None
else:
self.path = Path(path)
self.path.mkdir(exist_ok=True, parents=True)
[docs] def normalize(self, *, num: int = 0,
gsf: Optional[Vector] = None,
nld: Optional[Vector] = None,
normalizer_nld: Optional[NormalizerNLD] = None,
normalizer_gsf: Optional[NormalizerGSF] = None) -> None:
"""Perform normalization and saves results to `self.res`
Args:
num (int, optional): Loop number
gsf (Optional[Vector], optional): gsf before normalization
nld (Optional[Vector], optional): nld before normalization
normalizer_nld (Optional[NormalizerNLD], optional): NormalizerNLD
instance
normalizer_gsf (Optional[NormalizerGSF], optional): NormalizerGSF
instance
"""
if not self.regenerate:
try:
self.load()
return
except FileNotFoundError:
pass
# reset internal state
self.res = ResultsNormalized(name="Results NLD")
self.normalizer_nld = copy.deepcopy(self.self_if_none(normalizer_nld))
self.normalizer_gsf = copy.deepcopy(self.self_if_none(normalizer_gsf))
for norm in [self.normalizer_nld, self.normalizer_gsf]:
norm._save_instance = False
norm.regenerate = True
gsf = self.self_if_none(gsf)
nld = self.self_if_none(nld)
nld = nld.copy()
gsf = gsf.copy()
nld.to_MeV()
gsf.to_MeV()
# Need to give some sort of standard deviation for sensible results
# Otherwise deviations at higher level density will have an
# uncreasonably high weight.
if self.std_fake_nld is None:
self.std_fake_nld = False
if self.std_fake_nld or nld.std is None:
self.std_fake_nld = True
nld.std = nld.values * 0.3 # x% is an arb. choice
if self.std_fake_gsf or gsf.std is None:
self.std_fake_gsf = True
gsf.std = gsf.values * 0.3 # x% is an arb. choice
# update
self.normalizer_nld.nld = nld # update before initial guess
self.normalizer_gsf.gsf_in = gsf # update before initial guess
# Use DE to get an inital guess before optimizing
args_nld, guess = self.initial_guess()
# Optimize using multinest
popt, samples = self.optimize(num, args_nld, guess)
self.res.pars = popt
self.res.samples = samples
# reset
if self.std_fake_nld is True:
self.std_fake_nld = None
nld.std = None
if self.std_fake_gsf is True:
self.std_fake_gsf = None
gsf.std = None
self.res.nld = nld.transform(self.res.pars["A"][0],
self.res.pars["alpha"][0], inplace=False)
self.res.gsf = gsf.transform(self.res.pars["B"][0],
self.res.pars["alpha"][0], inplace=False)
self.normalizer_gsf.model_low.autorange(self.res.gsf)
self.normalizer_gsf.model_high.autorange(self.res.gsf)
self.normalizer_gsf.extrapolate(self.res.gsf)
self.res.gsf_model_low = self.normalizer_gsf.model_low
self.res.gsf_model_high = self.normalizer_gsf.model_high
for model in [self.res.gsf_model_low, self.res.gsf_model_high]:
model.shift_after = model.shift
self.save() # save instance
[docs] def initial_guess(self) -> None:
""" Find an inital guess for normalization parameters
Uses guess of normalizer_nld and corresponding normalization of gsf
Returns:
The arguments used for chi^2 minimization and the
minimizer.
"""
normalizer_nld = self.normalizer_nld
normalizer_gsf = self.normalizer_gsf
args_nld, guess = normalizer_nld.initial_guess()
[A, alpha, T, Eshift] = [guess["A"], guess["alpha"],
guess["T"], guess["Eshift"]]
nld = normalizer_nld.nld.transform(A, alpha, inplace=False)
nld_model = lambda E: normalizer_nld.model(E, T=T, Eshift=Eshift) # noqa
normalizer_gsf.normalize(nld=nld, nld_model=nld_model, alpha=alpha)
guess["B"] = normalizer_gsf.res.pars["B"][0]
guess_print = copy.deepcopy(guess)
self.LOG.info("DE results/initial guess:\n%s",
tt.to_string([list(guess_print.values())],
header=['A', 'α [MeV⁻¹]', 'T [MeV]',
'Eshift [MeV]', 'B']))
return args_nld, guess
[docs] def optimize(self, num: int,
args_nld: Iterable,
guess: Dict[str, float]) -> Tuple[Dict[str, Tuple[float, float]], Dict[str, List[float]]]: # noqa
"""Find parameters given model constraints and an initial guess
Employs Multinest.
Args:
num (int): Loop number
args_nld (Iterable): Additional arguments for the nld lnlike
guess (Dict[str, float]): The initial guess of the parameters
Returns:
Tuple:
- popt (Dict[str, Tuple[float, float]]): Median and 1sigma of the
parameters
- samples (Dict[str, List[float]]): Multinest samplesø.
Note: They are still importance weighted, not random draws
from the posterior.
Raises:
ValueError: Invalid parameters for automatix prior
Note:
You might want to adjust the priors for your specific case! Here
we just propose a general solution that might often work out of
the box.
"""
if guess['alpha'] < 0:
raise NotImplementedError("Prior selection not implemented for "
"α < 0")
alpha_exponent = np.log10(guess['alpha'])
if guess['T'] < 0:
raise ValueError("Prior selection not implemented for T < 0; "
"negative temperature is unphysical")
T_exponent = np.log10(guess['T'])
A = guess['A']
B = guess["B"]
# truncations from absolute values
lower_A, upper_A = 0., np.inf
mu_A, sigma_A = A, 10*A
a_A = (lower_A - mu_A) / sigma_A
b_A = (upper_A - mu_A) / sigma_A
lower_Eshift, upper_Eshift = -5., 5
mu_Eshift, sigma_Eshift = 0, 5
a_Eshift = (lower_Eshift - mu_Eshift) / sigma_Eshift
b_Eshift = (upper_Eshift - mu_Eshift) / sigma_Eshift
lower_B, upper_B = 0., np.inf
mu_B, sigma_B = B, 10*B
a_B = (lower_B - mu_B) / sigma_B
b_B = (upper_B - mu_B) / sigma_B
def prior(cube, ndim, nparams):
# NOTE: You may want to adjust this for your case!
# truncated normal prior
cube[0] = truncnorm_ppf(cube[0], a_A, b_A)*sigma_A + mu_A
# log-uniform prior
# if alpha = 1e2, it's between 1e1 and 1e3
cube[1] = 10**(cube[1]*2 + (alpha_exponent-1))
# log-uniform prior
# if T = 1e2, it's between 1e1 and 1e3
cube[2] = 10**(cube[2]*2 + (T_exponent-1))
# truncated normal prior
cube[3] = truncnorm_ppf(cube[3], a_Eshift,
b_Eshift)*sigma_Eshift + mu_Eshift
# truncated normal prior
cube[4] = truncnorm_ppf(cube[4], a_B, b_B)*sigma_B + mu_B
if np.isinf(cube[3]):
self.LOG.debug("Encountered inf in cube[3]:\n%s", cube[3])
def loglike(cube, ndim, nparams):
return self.lnlike(cube, args_nld=args_nld)
# parameters are changed in the lnlike
norm_pars_org = copy.deepcopy(self.normalizer_gsf.norm_pars)
self.multinest_path.mkdir(exist_ok=True)
path = self.multinest_path / f"sim_norm_{num}_"
assert len(str(path)) < 60, "Total path length too long for multinest"
self.LOG.info("Starting multinest: ")
self.LOG.debug("with following keywords %s:", self.multinest_kwargs)
# Hack where stdout from Multinest is redirected as info messages
self.LOG.write = lambda msg: (self.LOG.info(msg) if msg != '\n'
else None)
with redirect_stdout(self.LOG):
pymultinest.run(loglike, prior, len(guess),
outputfiles_basename=str(path),
**self.multinest_kwargs)
# Save parameters for analyzer
names = list(guess.keys())
json.dump(names, open(str(path) + 'params.json', 'w'))
analyzer = pymultinest.Analyzer(len(guess),
outputfiles_basename=str(path))
stats = analyzer.get_stats()
samples = analyzer.get_equal_weighted_posterior()[:, :-1]
samples = dict(zip(names, samples.T))
# Format the output
popt = dict()
vals = []
for name, m in zip(names, stats['marginals']):
lo, hi = m['1sigma']
med = m['median']
sigma = (hi - lo) / 2
popt[name] = (med, sigma)
i = max(0, int(-np.floor(np.log10(sigma))) + 1)
fmt = '%%.%df' % i
fmts = '\t'.join([fmt + " ± " + fmt])
vals.append(fmts % (med, sigma))
self.LOG.info("Multinest results:\n%s", tt.to_string([vals],
header=['A', 'α [MeV⁻¹]', 'T [MeV]',
'Eshift [MeV]', 'B']))
# reset state
self.normalizer_gsf.norm_pars = norm_pars_org
return popt, samples
[docs] def lnlike(self, x: Tuple[float, float, float, float, float],
args_nld: Iterable) -> float:
"""Compute log likelihood of the normalization fitting
This is the result up to the constant, which is irrelevant for the
maximization
Args:
x (Tuple[float, float, float, float, float]): The arguments
ordered as A, alpha, T and Eshift, B
args_nld (TYPE): Additional arguments for the nld lnlike
Returns:
lnlike: log likelihood
"""
A, alpha, T, Eshift, B = x[:5] # slicing needed for multinest?
normalizer_gsf = self.normalizer_gsf
normalizer_nld = self.normalizer_nld
err_nld = normalizer_nld.lnlike(x[:4], *args_nld)
nld = normalizer_nld.nld.transform(A, alpha, inplace=False)
nld_model = lambda E: normalizer_nld.model(E, T=T, Eshift=Eshift) # noqa
normalizer_gsf.nld_model = nld_model
normalizer_gsf.nld = nld
# calculate the D0-equivalent of T and Eshift used
D0 = normalizer_nld.D0_from_nldSn(nld_model,
**normalizer_nld.norm_pars.asdict())
normalizer_gsf.norm_pars.D0 = [D0, np.nan] # dummy uncertainty
normalizer_gsf._gsf = normalizer_gsf.gsf_in.transform(B, alpha,
inplace=False)
normalizer_gsf._gsf_low, normalizer_gsf._gsf_high = \
normalizer_gsf.extrapolate()
err_gsf = normalizer_gsf.lnlike()
return err_nld + err_gsf
[docs] def plot(self, ax: Optional[Any] = None, add_label: bool = True,
add_figlegend: bool = True,
**kwargs) -> Tuple[Any, Any]:
"""Plots nld and gsf
Args:
ax (optional): The matplotlib axis to plot onto. Creates axis
is not provided
add_label (bool, optional):Defaults to `True`.
add_figlegend (bool, optional): Defaults to `True`.
results Optional[ResultsNormalized]: If provided, gsf and model
are taken from here instead.
**kwargs: kwargs for plot
Returns:
fig, ax
"""
if ax is None:
fig, ax = plt.subplots(1, 2, constrained_layout=True)
else:
fig = ax[0].figure
self.normalizer_nld.plot(ax=ax[0], add_label=True, results=self.res,
add_figlegend=False, **kwargs)
self.normalizer_gsf.plot(ax=ax[1], add_label=False, results=self.res,
add_figlegend=False, **kwargs)
ax[0].set_title("Level density")
ax[1].set_title(r"$\gamma$SF")
if add_figlegend:
fig.legend(loc=9, ncol=4, frameon=True)
fig.subplots_adjust(left=0.1, right=0.9, top=0.8, bottom=0.1)
return fig, ax
[docs] def self_if_none(self, *args, **kwargs):
""" wrapper for lib.self_if_none """
return self_if_none(self, *args, **kwargs)