Source code for gillespy2.solvers.numpy.CLE_solver

# GillesPy2 is a modeling toolkit for biochemical simulation.
# Copyright (C) 2019-2024 GillesPy2 developers.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Class and methods for the CLE Solver"""
import copy
import random
import math
from threading import Thread, Event
import numpy as np
from gillespy2.solvers.utilities import Tau
from gillespy2.solvers.utilities import solverutils as nputils
from gillespy2.core import GillesPySolver, log, liveGraphing, SimulationError
from gillespy2.core import ModelError, ExecutionError
from gillespy2.core.results import Results


[docs]class CLESolver(GillesPySolver): """ A Chemical Langevin Equation Solver for GillesPy2 models. This solver uses an algorithm that calculates multiple reactions in a single step over a given tau step size. The change in propensities over this step are bounded by bounding the relative change in state, yielding greatly improved run-time performance with very little trade-off in accuracy. :param model: The model on which the solver will operate. :type model: gillespy2.Model """ name = "CLESolver" rc = 0 stop_event = None pause_event = None result = None def __init__(self, model=None, debug=False): if model is None: raise SimulationError("A model is required to run the simulation.") name = "CLESolver" rc = 0 stop_event = None pause_event = None result = None self.model = model self.debug = debug self.is_instantiated = True def __get_reactions(self, step, curr_state, curr_time, save_time, propensities, reactions): r""" Helper Function to get reactions fired from t to t+tau. In the equation: X_i(t+\tau) = X_i(t) + \sum_{j=1}^M \nu_{ji} \left[ a_j(x_t)\tau + \left( a_j(x_t)\tau \right)^{1/2} \mathcal{N}(0,1) \right] This function calculates: \left[ a_j(x_t)\tau + \left( a_j(x_t)\tau \right)^{1/2} \mathcal{N}(0,1) \right] for each reaction channel. Reference: Gillespie 2000. https://doi.org/10.1063/1.481811 (eq 21) :returns: Three values: rxn_count - dict with key=Reaction channel value=number of times fired curr_state - dict containing all state variables for system at current time curr_time - float representing current time """ if curr_time + step > save_time: if self.debug: print("Step exceeds save_time, changing step size from ", step, " to ", save_time - curr_time) step = save_time - curr_time if self.debug: print("Curr Time: ", curr_time, " Save time: ", save_time, "step: ", step) rxn_count = {} for rxn in reactions: #rxn_count[rxn] = np.random.poisson(propensities[rxn] * step) rxn_count[rxn] = propensities[rxn]*step + \ math.sqrt(propensities[rxn]*step)*np.random.normal(0,1) if self.debug: print("Reactions Fired: ", rxn_count) curr_time = curr_time+step return rxn_count, curr_state, curr_time
[docs] @classmethod def get_solver_settings(cls): """ Returns a list of arguments supported by CLE_solver.run. :returns: Tuple of strings, denoting all keyword argument for this solvers run() method. :rtype: tuple """ return ('model', 't', 'number_of_trajectories', 'increment', 'seed', 'debug', 'profile','timeout', 'tau_tol')
[docs] def run(self=None, model=None, t=None, number_of_trajectories=1, increment=None, seed=None, debug=False, profile=False, live_output=None, live_output_options={}, timeout=None, resume=None, tau_tol=0.03, **kwargs): """ Function calling simulation of the model. This is typically called by the run function in GillesPy2 model objects and will inherit those parameters which are passed with the model as the arguments this run function. :param model: The model on which the solver will operate. (Deprecated) :type model: gillespy2.Model :param t: Simulation run time. :type t: int or float :param number_of_trajectories: The number of times to sample the chemical master equation. Each trajectory will be returned at the end of the simulation. By default number_of_trajectories = 1. :type number_of_trajectories: int :param increment: Save point increment for recording data. :type increment: float :param seed: The random seed for the simulation. Optional, defaults to None. :type seed: int :param debug: Set to True to provide additional debug information about the simulation. :type debug: bool :param profile: Set to True to provide information about step size (tau) taken at each step. :type profile: bool :param live_output: The type of output to be displayed by solver. Can be "progress", "text", or "graph". :type live_output: str :param live_output_options: Contains options for live_output. By default {"interval":1}. "interval" specifies seconds between displaying. "clear_output" specifies if display should be refreshed with each display. :type live_output_options: dict :param timeout: If set, if simulation takes longer than timeout, will exit. :type timeout: int :param resume: Result of a previously run simulation, to be resumed. :type resume: gillespy2.Results :param tau_tol: Tolerance level for Tau leaping algorithm. Larger tolerance values will result in larger tau steps. Default value is 0.03. :type tau_tol: float :returns: A result object containing the results of the simulation. :rtype: gillespy2.Results """ from gillespy2 import log if self is None: # Post deprecation block # raise SimulationError("CLESolver must be instantiated to run the simulation") # Pre deprecation block log.warning( """ `gillespy2.Model.run(solver=CLESolver)` is deprecated. You should use `gillespy2.Model.run(solver=CLESolver(model=gillespy2.Model)) Future releases of GillesPy2 may not support this feature. """ ) self = CLESolver(model=model, debug=debug, profile=profile) if model is not None: log.warning('model = gillespy2.model is deprecated. Future releases ' 'of GillesPy2 may not support this feature.') if self.model is None: if model is None: raise SimulationError("A model is required to run the simulation.") self.model = copy.deepcopy(model) self.model.compile_prep() self.validate_model(self.model, model) self.validate_sbml_features(model=self.model) self.validate_tspan(increment=increment, t=t) if increment is None: increment = self.model.tspan[-1] - self.model.tspan[-2] if t is None: t = self.model.tspan[-1] self.stop_event = Event() self.pause_event = Event() if timeout is not None and timeout <= 0: timeout = None if len(kwargs) > 0: for key in kwargs: log.warning('Unsupported keyword argument to {0} solver: {1}'.format(self.name, key)) # create numpy array for timeline if resume is not None: # start where we last left off if resuming a simulatio lastT = resume['time'][-1] step = lastT - resume['time'][-2] timeline = np.arange(lastT, t+step, step) else: timeline = np.linspace(0, t, int(round(t / increment + 1))) species = list(self.model._listOfSpecies.keys()) trajectory_base, tmpSpecies = nputils.numpy_trajectory_base_initialization(self.model, number_of_trajectories, timeline, species, resume=resume) # total_time and curr_state are list of len 1 so that __run receives reference if resume is not None: total_time = [resume['time'][-1]] else: total_time = [0] curr_state = [None] live_grapher = [None] sim_thread = Thread(target=self.___run, args=(curr_state, total_time, timeline, trajectory_base, tmpSpecies, live_grapher,), kwargs={'t': t, 'number_of_trajectories': number_of_trajectories, 'increment': increment, 'seed': seed, 'debug': debug, 'resume': resume, 'timeout': timeout, 'tau_tol': tau_tol }) try: time = 0 sim_thread.start() if live_output is not None: import gillespy2.core.liveGraphing live_output_options['type'] = live_output gillespy2.core.liveGraphing.valid_graph_params( live_output_options) if resume is not None: resumeTest = True # If resuming, relay this information to live_grapher else: resumeTest = False live_grapher[ 0] = gillespy2.core.liveGraphing.LiveDisplayer(self.model, timeline, number_of_trajectories, live_output_options, resume=resumeTest) display_timer = gillespy2.core.liveGraphing.RepeatTimer( live_output_options['interval'], live_grapher[0].display, args=(curr_state, total_time, trajectory_base, live_output ) ) display_timer.start() if timeout is not None: while sim_thread.is_alive(): sim_thread.join(.1) time += .1 if time >= timeout: break else: while sim_thread.is_alive(): sim_thread.join(.1) if live_grapher[0] is not None: display_timer.cancel() self.stop_event.set() while self.result is None: pass except KeyboardInterrupt: if live_output: display_timer.pause = True display_timer.cancel() self.pause_event.set() while self.result is None: pass if hasattr(self, 'has_raised_exception'): raise SimulationError( f"Error encountered while running simulation:\nReturn code: {int(self.rc)}.\n" ) from self.has_raised_exception return Results.build_from_solver_results(self, live_output_options)
def ___run(self, curr_state,total_time, timeline, trajectory_base, tmpSpecies, live_grapher, t=20, number_of_trajectories=1, increment=0.05, seed=None, debug=False, profile=False, timeout=None, resume=None, tau_tol=0.03, **kwargs): try: self.__run(curr_state, total_time, timeline, trajectory_base, tmpSpecies, live_grapher, t, number_of_trajectories, increment, seed, debug, profile, timeout, resume, tau_tol, **kwargs) except Exception as e: self.has_raised_exception = e self.result = [] return [], -1 def __run(self, curr_state, total_time, timeline, trajectory_base, tmpSpecies, live_grapher, t=20, number_of_trajectories=1, increment=0.05, seed=None, debug=False, profile=False, timeout=None, resume=None, tau_tol=0.03, **kwargs): # for use with resume, determines how much excess data to cut off due to # how species and time are initialized to 0 timeStopped = 0 if resume is not None: if resume[0].model != self.model: raise ModelError('When resuming, one must not alter the model being resumed.') if t < resume['time'][-1]: raise ExecutionError( "'t' must be greater than previous simulations end time, or set in the run() method as the " "simulations next end time") if debug: print("t = ", t) print("increment = ", increment) species_mappings, species, parameter_mappings, number_species = nputils.numpy_initialization(self.model) if seed is not None: if not isinstance(seed, int): seed = int(seed) if seed > 0: random.seed(seed) np.random.seed(seed) else: raise ModelError('seed must be a positive integer') simulation_data = [] for trajectory_num in range(number_of_trajectories): if self.stop_event.is_set(): self.rc = 33 break elif self.pause_event.is_set(): timeStopped = timeline[entry_count] # For multi trajectories, live_grapher needs to be informed of trajectory increment if live_grapher[0] is not None: live_grapher[0].increment_trajectory(trajectory_num) start_state = [0] * (len(self.model.listOfReactions) + len(self.model.listOfRateRules)) propensities = {} curr_state[0] = {} if resume is not None: save_time = total_time[0] curr_time = [save_time] else: save_time = 0 curr_time = [0] curr_state[0]['vol'] = self.model.volume data = {'time': timeline} steps_taken = [] steps_rejected = 0 entry_count = 0 trajectory = trajectory_base[trajectory_num] HOR, reactants, mu_i, sigma_i, g_i, epsilon_i, critical_threshold = Tau.initialize(self.model, tau_tol) if resume is not None: for spec in self.model.listOfSpecies: curr_state[0][spec] = tmpSpecies[spec] else: for spec in self.model.listOfSpecies: curr_state[0][spec] = self.model.listOfSpecies[spec].initial_value for param in self.model.listOfParameters: curr_state[0][param] = self.model.listOfParameters[param].value for i, rxn in enumerate(self.model.listOfReactions): # set reactions to uniform random number and add to start_state start_state[i] = (math.log(random.uniform(0, 1))) if debug: print("Setting Random number ", start_state[i], " for ", self.model.listOfReactions[rxn].name) compiled_propensities = {} for i, r in enumerate(self.model.listOfReactions): compiled_propensities[r] = compile(self.model.listOfReactions[r].ode_propensity_function, '<string>', 'eval') timestep = 0 # Each save step while entry_count < timeline.size: if self.stop_event.is_set(): self.rc = 33 break elif self.pause_event.is_set(): timeStopped = timeline[entry_count] break # Until save step reached while curr_time[0] < save_time: if self.stop_event.is_set(): self.rc = 33 break elif self.pause_event.is_set(): timeStopped = timeline[entry_count] break propensity_sum = 0 for i, r in enumerate(self.model.listOfReactions): propensities[r] = eval(compiled_propensities[r], curr_state[0]) propensity_sum += propensities[r] tau_args = [HOR, reactants, mu_i, sigma_i, g_i, epsilon_i, tau_tol, critical_threshold, self.model, propensities, curr_state[0], curr_time[0], save_time] tau_step = Tau.select(*tau_args) prev_start_state = start_state.copy() prev_curr_state = curr_state[0].copy() prev_curr_time = curr_time[0] total_time[0] = curr_time[0] loop_cnt = 0 while True: loop_cnt += 1 if loop_cnt > 100: raise Exception("Loop over __get_reactions() exceeded loop count") reactions, curr_state[0], curr_time[0] = self.__get_reactions( tau_step, curr_state[0], curr_time[0], save_time, propensities, self.model.listOfReactions) # Update curr_state with the result of the CLE step species_modified = {} for i, rxn in enumerate(self.model.listOfReactions): if reactions[rxn] > 0: for reactant in self.model.listOfReactions[rxn].reactants: species_modified[reactant.name] = True curr_state[0][reactant.name] -= self.model.listOfReactions[ rxn].reactants[reactant] * reactions[rxn] for product in self.model.listOfReactions[rxn].products: species_modified[product.name] = True curr_state[0][product.name] += self.model.listOfReactions[ rxn].products[product] * reactions[rxn] neg_state = False for spec in species_modified: if curr_state[0][spec] < 0: neg_state = True if debug: print("Negative state detected: curr_state[{0}]= {1}".format(spec, curr_state[0][spec])) if neg_state: if debug: print("\trxn={0}".format(reactions)) start_state = prev_start_state.copy() curr_state[0] = prev_curr_state.copy() curr_time[0] = prev_curr_time total_time[0] = prev_curr_time tau_step = tau_step / 2 steps_rejected += 1 if debug: print("Resetting curr_state[{0}]= {1}".format(spec, curr_state[0][spec])) if debug: print("\tRejecting step, taking step of half size, tau_step={0}".format(tau_step)) else: break # breakout of the while True # save step reached for i in range(number_species): trajectory[entry_count][i + 1] = curr_state[0][species[i]] save_time += increment timestep += 1 entry_count += 1 # end of trajectory for i in range(number_species): data[species[i]] = trajectory[:, i+1] simulation_data.append(data) if profile: print(steps_taken) print("Total Steps Taken: ", len(steps_taken)) print("Total Steps Rejected: ", steps_rejected) # If simulation has been paused, or tstopped !=0 if timeStopped != 0 or resume is not None: simulation_data = nputils.numpy_resume(timeStopped, simulation_data, resume=resume) self.result = simulation_data return self.result, self.rc