Source code for alea.simulators

from copy import deepcopy
from itertools import product
import logging

import numpy as np
import scipy.stats as sps
import multihist as mh
from blueice.likelihood import BinnedLogLikelihood, UnbinnedLogLikelihood

logging.basicConfig(level=logging.INFO)


[docs]class BlueiceDataGenerator: """A class for generating data from a blueice likelihood term. Attributes: ll: The blueice likelihood term. binned (bool): True if the likelihood term is binned. bincs (list): The bin centers of the likelihood term. direction_names (list): The names of the directions of the likelihood term. source_histograms (list): The histograms of the sources of the likelihood term. data_lengths (list): The number of bins of each component of the likelihood term. dtype (list): The data type of the likelihood term. last_kwargs (dict): The last kwargs used to generate data. mus: The expected number of events of each source of the likelihood term. parameters (list): The parameters of the likelihood term. ll_term (BinnedLogLikelihood or UnbinnedLogLikelihood): A blueice likelihood term. """
[docs] def __init__(self, ll_term): """Initialize the BlueiceDataGenerator.""" if isinstance(ll_term, BinnedLogLikelihood): self.binned = True elif isinstance(ll_term, UnbinnedLogLikelihood): self.binned = False else: raise NotImplementedError ll = deepcopy(ll_term) bincs = [] # bin centers of each component direction_names = [] dtype = [] data_lengths = [] # list of number of bins of each component analysis_space = ll_term.base_model.config["analysis_space"] source_histograms = {} for n in ll.source_name_list: source_histograms[n] = mh.Histdd(dimensions=analysis_space) for name, bin_edges in analysis_space: bincs.append(0.5 * (bin_edges[1:] + bin_edges[:-1])) dtype.append((name, float)) data_lengths.append(len(bin_edges) - 1) direction_names.append(name) # IDEA: make source a string, not an int dtype.append(("source", int)) data_binc = np.zeros(np.prod(data_lengths), dtype=dtype) for i, l in enumerate(product(*bincs)): for n, v in zip(direction_names, l): data_binc[n][i] = v ll.set_data(data_binc) self.ll = ll self.direction_names = direction_names self.source_histograms = source_histograms self.data_lengths = data_lengths self.dtype = dtype self.last_kwargs = {} self.first_call = True self.mus = ll.base_model.expected_events() self.parameters = list(ll.shape_parameters.keys()) self.parameters += [n + "_rate_multiplier" for n in ll.rate_parameters.keys()]
[docs] def simulate(self, filter_kwargs=True, n_toys=None, sample_n_toys=False, **kwargs): """Simulate toys for each source. Args: filter_kwargs (bool, optional (default=True)): If True, only parameters of the ll object are accepted as kwargs. Defaults to True. n_toys (int, optional (default=None)): If not None, a fixed number n_toys of toys is generated for each source component. Defaults to None. sample_n_toys (bool, optional (default=False)): If True, the number of toys is sampled from a Poisson distribution with mean n_toys. Defaults to False. Only works if n_toys is not None. Keyword Args: kwargs: The parameters pasted to the likelihood function. Returns: numpy.array: Array of simulated data for all sources in the given analysis space. The index "source" indicates the corresponding source of an entry. The dtype follows self.dtype. """ self.compute_pdfs_and_mus(filter_kwargs=filter_kwargs, **kwargs) if n_toys is not None: if sample_n_toys: self.n_toys_rv = sps.poisson(n_toys).rvs() n_sources = np.full(len(self.ll.base_model.sources), self.n_toys_rv) else: n_sources = np.full(len(self.ll.base_model.sources), n_toys) else: # Generate a number n_source (according to the expectation value) # of toys for each source component: n_sources = sps.poisson(self.mus).rvs() if len(self.ll.base_model.sources) == 1: n_sources = np.array([n_sources]) r_data = np.zeros(np.sum(n_sources), dtype=self.dtype) i_write = 0 for i, n_source in enumerate(n_sources): if n_source > 0: # dont generate if 0 source_name = self.ll.source_name_list[i] rvs = self.source_histograms[source_name].get_random(n_source) for j, n in enumerate(self.direction_names): r_data[n][i_write : i_write + n_source] = rvs[:, j] r_data["source"][i_write : i_write + n_source] = i i_write += n_source return r_data
[docs] def compute_pdfs_and_mus(self, filter_kwargs=True, **kwargs) -> None: """Compute PDFs of the sources for the given parameters. Args: filter_kwargs (bool, optional (default=True)): If True, only parameters of the ll object are accepted as kwargs. Defaults to True. kwargs: The parameters pasted to the likelihood function. """ if filter_kwargs: kwargs = {k: v for k, v in kwargs.items() if k in self.parameters + ["livetime_days"]} # check if the cached generator may be used: unmatched_item = set(self.last_kwargs.items()) ^ set(kwargs.items()) kwargs_changed = len(unmatched_item) != 0 if self.first_call or kwargs_changed: ret = self.ll(full_output=True, **kwargs) if isinstance(ret, float): logging.warning("ERROR, generator kwarg outside range?") logging.warning(kwargs) mus = self.ll.base_model.expected_events() logging.warning("\nExpected events for each source:") for source_name, mu in zip(self.ll.source_name_list, mus): logging.warning(f"Source {source_name}: expected events = {mu}") _, mus, ps_array = ret for n, p in zip(self.ll.source_name_list, ps_array): self.source_histograms[n].histogram = p.reshape(self.data_lengths) if not self.binned: self.source_histograms[n] *= self.source_histograms[n].bin_volumes() self.mus = mus self.last_kwargs = kwargs self.first_call = False