Source code for sciope.inference.abc_inference

# Copyright 2017 Prashant Singh, Fredrik Wrede and Andreas Hellander
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Approximate Bayesian Computation
"""

# Imports
from sciope.inference.inference_base import InferenceBase
from sciope.utilities.distancefunctions import euclidean as euc
from sciope.utilities.summarystats import burstiness as bs
from sciope.core import core
from sciope.utilities.housekeeping import sciope_logger as ml
from toolz import partition_all
import numpy as np
from dask.distributed import futures_of, as_completed, get_client
import dask

# The following variable stores n normalized distance values after n summary statistics have been calculated
normalized_distances = None

# Class definition: ABC rejection sampling
[docs]class ABC(InferenceBase): """ Approximate Bayesian Computation Rejection Sampler Properties/variables: * data (observed / fixed data) * sim (simulator function handle) * prior_function (prior over the simulator parameters) * epsilon (acceptance tolerance bound) * summaries_function (summary statistics calculation function) * distance_function (function calculating deviation between simulated statistics and observed statistics) * summaries_divisor (numpy array of maxima - used for normalizing summary statistic values) * use_logger (whether logging is enabled or disabled) Methods: * infer (perform parameter inference) """ def __init__(self, data, sim, prior_function, epsilon=0.1, summaries_function=bs.Burstiness(), distance_function=euc.EuclideanDistance(), summaries_divisor=None, use_logger=False): """ ABC class for rejection sampling Parameters ---------- data : nd-array the observed / fixed dataset sim : nd-array the simulated dataset or simulator function prior_function : sciope.utilities.priors object the prior function generating candidate samples epsilon : float, optional tolerance bound, by default 0.1 summaries_function : sciope.utilities.summarystats object, optional function calculating summary stats over simulated results; by default bs.Burstiness() distance_function : sciope.utilities.distancefunctions object, optional distance function operating over summary statistics - calculates deviation between observed and simulated data; by default euc.EuclideanDistance() summaries_divisor : 1D numpy array, optional instead of normalizing using division by current known max of each statistic, use the supplied division factors. These may come from prior knowledge, or pre-studies, etc. use_logger : bool enable/disable logging """ self.name = 'ABC' self.epsilon = epsilon self.summaries_function = summaries_function self.prior_function = prior_function.draw self.distance_function = distance_function.compute self.historical_distances = [] self.summaries_divisor = summaries_divisor self.use_logger = use_logger super(ABC, self).__init__(self.name, data, sim, self.use_logger) self.sim = sim if self.use_logger: self.logger = ml.SciopeLogger().get_logger() self.logger.info("Approximate Bayesian Computation initialized")
[docs] def scale_distance(self, dist): """ Performs scaling in [0,1] of a given distance vector/value with respect to historical distances Parameters ---------- dist : ndarray, float distance Returns ------- ndarray scaled distance """ dist = np.asarray(dist) global normalized_distances self.historical_distances.append(dist.ravel()) all_distances = np.array(self.historical_distances) if self.summaries_divisor is not None: divisor = self.summaries_divisor else: divisor = np.asarray(np.nanmax(all_distances, axis=0)) normalized_distances = all_distances for j in range(0, len(divisor), 1): if divisor[j] > 0: normalized_distances[:, j] = normalized_distances[:, j] / divisor[j] return normalized_distances[-1, :]
[docs] def compute_fixed_mean(self, chunk_size): """ Computes the mean over summary statistics on fixed data Parameters ---------- chunk_size : int the partition size when splitting the fixed data. For avoiding many individual tasks in dask if the data is large Returns ------- ndarray scaled distance """ stats_mean = core.get_fixed_mean(self.data, self.summaries_function, chunk_size) self.fixed_mean = stats_mean.compute() del stats_mean
# @sciope_profiler.profile
[docs] def rejection_sampling(self, num_samples, batch_size, chunk_size, ensemble_size, normalize): """ Perform ABC inference according to initialized configuration. Parameters ---------- num_samples : int The number of required accepted samples batch_size : int The batch size of samples for performing rejection sampling chunk_size : int the partition size when splitting the fixed data. For avoiding many individual tasks in dask if the data is large. Returns ------- dict Keys 'accepted_samples: The accepted parameter values', 'distances: Accepted distance values', 'accepted_count: Number of accepted samples', 'trial_count: The number of total trials performed in order to converge', 'inferred_parameters': The mean of accepted parameter samples """ accepted_count = 0 trial_count = 0 accepted_samples = [] distances = [] # if fixed_mean has not been computed assert hasattr(self, "fixed_mean"), "Please call compute_fixed_mean before infer" # Get dask graph graph_dict = core.get_graph_chunked(self.prior_function, self.sim, self.summaries_function, batch_size, chunk_size) dist_func = lambda x: self.distance_function(self.fixed_mean, x) graph_dict["distances"] = core.get_distance(dist_func, graph_dict["summarystats"], chunked=True) cluster_mode = core._cluster_mode() # do rejection sampling #while accepted_count < num_samples: #sim_dist_scaled = [] #params = [] #dists = [] # If dask cluster is used, use persist and futures, and scale as result is completed if cluster_mode: if self.use_logger: self.logger.info("running in cluster mode") res_param, res_dist = dask.persist(graph_dict["parameters"], graph_dict["distances"]) futures_dist = core.get_futures(res_dist) futures_params = core.get_futures(res_param) keep_idx = {f.key: idx for idx, f in enumerate(futures_dist)} while accepted_count < num_samples: for f, dist in as_completed(futures_dist, with_results=True): sim_dist_scaled = [] params = [] dists = [] for d in dist: dists.append(d) trial_count += 1 if normalize: # Normalize distances between [0,1] sim_dist_scaled.append(self.scale_distance(d)) idx = keep_idx[f.key] param = futures_params[idx] params_res = param.result() for p in params_res: params.append(p) accepted_samples, distances, accepted_count = self._scale_reject(sim_dist_scaled, dists, accepted_samples, distances, params, accepted_count, normalize) del dist, param #TODO: remove all futures including simulation and summarystats if accepted_count < num_samples: new_chunk = core.get_graph_chunked(self.prior_function, self.sim, self.summaries_function, chunk_size, chunk_size) new_chunk["distances"] = core.get_distance(dist_func, new_chunk["summarystats"], chunked=True) c_param, c_dist = dask.persist(new_chunk["parameters"], new_chunk["distances"]) f_dist = core.get_futures(c_dist)[0] f_param = core.get_futures(c_param)[0] futures_dist.append(f_dist) futures_params.append(f_param) keep_idx[f_dist.key] = len(keep_idx) else: del futures_dist, futures_params, res_param, res_dist self.results = {'accepted_samples': accepted_samples, 'distances': distances, 'accepted_count': accepted_count, 'trial_count': trial_count, 'inferred_parameters': np.mean(accepted_samples, axis=0)} return self.results # else use multiprocessing mode else: while accepted_count < num_samples: sim_dist_scaled = [] params = [] dists = [] if self.use_logger: self.logger.info("running in parallel mode") params, dists = dask.compute(graph_dict["parameters"], graph_dict["distances"]) params = core._reshape_chunks(params) dists = core._reshape_chunks(dists) if normalize: for d in dists: sim_dist_scaled.append(self.scale_distance(d)) accepted_samples, distances, accepted_count = self._scale_reject(sim_dist_scaled, dists, accepted_samples, distances, params, accepted_count, normalize) trial_count += batch_size self.results = {'accepted_samples': accepted_samples, 'distances': distances, 'accepted_count': accepted_count, 'trial_count': trial_count, 'inferred_parameters': np.mean(accepted_samples, axis=0)} return self.results
def _scale_reject(self, sim_dist_scaled, dists, accepted_samples, distances, params, accepted_count, normalize): if normalize: sim_dist_scaled = np.asarray(sim_dist_scaled) else: sim_dist_scaled = np.asarray(dists) # Take the norm to combine the distances, if more than one summary is used if sim_dist_scaled.shape[1] > 1: combined_distance = [dask.delayed(np.linalg.norm)(scaled.reshape(1, scaled.size), axis=1) for scaled in sim_dist_scaled] result, = dask.compute(combined_distance) else: result = sim_dist_scaled.ravel() # Accept/Reject for e, res in enumerate(result): if self.use_logger: self.logger.debug("ABC Rejection Sampling: trial parameter(s) = {}".format(params[e])) self.logger.debug("ABC Rejection Sampling: trial distance(s) = {}".format(sim_dist_scaled[e])) if res <= self.epsilon: accepted_samples.append(params[e]) distances.append(dists[e]) accepted_count += 1 if self.use_logger: self.logger.info("ABC Rejection Sampling: accepted a new sample, " "total accepted samples = {0}".format(accepted_count)) return accepted_samples, distances, accepted_count
[docs] def infer(self, num_samples, batch_size, chunk_size=10, ensemble_size=1, normalize=True): """ Wrapper for rejection sampling. Performs ABC rejection sampling Parameters ---------- num_samples : int The number of required accepted samples batch_size : int The batch size of samples for performing rejection sampling chunk_size : int The partition size when splitting the fixed data. For avoiding many individual tasks in dask if the data is large. Default 10. ensemble_size : int In case we have an ensemble of responses normalize : bool Whether summary statistics should be normalized and epsilon be interpreted as a percentage Returns ------- dict Keys 'accepted_samples: The accepted parameter values', 'distances: Accepted distance values', 'accepted_count: Number of accepted samples', 'trial_count: The number of total trials performed in order to converge', 'inferred_parameters': The mean of accepted parameter samples """ return self.rejection_sampling(num_samples, batch_size, chunk_size, ensemble_size, normalize)