Source code for gillespy2.solvers.cpp.tau_hybrid_c_solver

import numpy as np
import gillespy2
from gillespy2.solvers.cpp.c_decoder import IterativeSimDecoder
from gillespy2.solvers.utilities import solverutils as cutils
from gillespy2.core import GillesPySolver, Model, Event, RateRule
from gillespy2.core.gillespyError import *
from typing import Union
from typing import Set
from enum import IntEnum
from gillespy2.core import Results

from .c_solver import CSolver, SimulationReturnCode
from gillespy2.solvers.cpp.build.template_gen import SanitizedModel

[docs]class TauHybridCSolver(GillesPySolver, CSolver): """ This solver uses a root-finding interpretation of the direct SSA method, along with ODE solvers to simulate ODE and Stochastic systems interchangeably or simultaneously. """ name = "TauHybridCSolver" target = "hybrid" def __init__(self, model = None, output_directory = None, delete_directory = True, resume=None, variable = False, constant_tau_stepsize=None): self.constant_tau_stepsize = constant_tau_stepsize super().__init__(model=model, output_directory=output_directory, delete_directory=delete_directory, resume=resume, variable=variable)
[docs] class ErrorStatus(IntEnum): UNKNOWN = 1 LOOP_OVER_INTEGRATE = 2 INTEGRATOR_FAILED = 3 INVALID_AFTER_SSA = 4 NEGATIVE_STATE_NO_SSA_REACTION = 5 NEGATIVE_STATE_AT_BEGINING_OF_STEP = 6
def __create_options(self, sanitized_model: "SanitizedModel") -> "SanitizedModel": """ Populate the given list of species modes into a set of template macro definitions. Generated options are specific to the Tau Hybrid solver, and get passed as custom definitions to the build engine. :param sanitized_model: Sanitized model containing sanitized species definitions. The GPY_HYBRID_SPECIES_MODES option will be set as an option for the model. :type sanitized_model: SanitizedModel :returns: Pass-through of sanitized model object. :rtype: SanitizedModel """ species_mode_map = { "continuous": "CONTINUOUS_MODE", "discrete": "DISCRETE_MODE", "dynamic": "DYNAMIC_MODE", } boundary_condition_types = [ # When species.boundary_condition == False "STANDARD", # When species.boundary_condition == True "BOUNDARY", ] trigger_mode_types = [ # When event.use_values_from_trigger_time == False "USE_EVAL", # When event.use_values_from_trigger_time == True "USE_TRIGGER", ] persist_types = [ # When event.trigger.persistent == False "IRREGULAR", # When event.trigger.persistent == True "PERSISTENT", ] initial_value_types = [ # When event.trigger.initial_value == False "INIT_FALSE", # When event.trigger.initial_value == True "INIT_TRUE", ] species_mode_list = [] for spec_id, species in enumerate(sanitized_model.species.values()): mode_keyword = species_mode_map.get(species.mode, species_mode_map["dynamic"]) # Casting a bool to an int evaluates: False -> 0, and True -> 1 # Explicit cast to bool for safety, in case boundary_condition is given weird values boundary_keyword = boundary_condition_types[int(bool(species.boundary_condition))] # Example: SPECIES_MODE(2, 10, CONTINUOUS_MODE, BOUNDARY) entry = f"SPECIES_MODE({spec_id},{species.switch_min},{species.switch_tol},{mode_keyword},{boundary_keyword})" species_mode_list.append(entry) # EVENT(event_id, {targets}, trigger, delay, priority, use_trigger, use_persist) event_list = [] # [SPECIES/VARIABLE]_ASSIGNMENT(assign_id, target_id, expr) event_assignment_list = [] assign_id = 0 for event_id, event in enumerate(sanitized_model.model.listOfEvents.values()): trigger = sanitized_model.expr.getexpr_cpp(event.trigger.expression) delay = sanitized_model.expr.getexpr_cpp(event.delay) \ if event.delay is not None else "0" priority = sanitized_model.expr.getexpr_cpp(event.priority) \ if event.priority is not None else "0" use_trigger = trigger_mode_types[int(bool(event.use_values_from_trigger_time))] use_persist = persist_types[int(bool(event.trigger.persistent))] initial_value = initial_value_types[int(bool(event.trigger.value or False))] assignments: "list[str]" = [] for assign in event.assignments: variable = assign.variable expression = sanitized_model.expr.getexpr_cpp(assign.expression) if isinstance(variable, str): if variable in sanitized_model.model.listOfSpecies: variable = sanitized_model.model.listOfSpecies.get(variable) elif variable in sanitized_model.model.listOfParameters: variable = sanitized_model.model.listOfParameters.get(variable) else: raise ValueError(f"Error in event={event} " f"Invalid event assignment {assign}: received name {variable} " f"Must match the name of a valid Species or Parameter.") if isinstance(variable, gillespy2.Species): assign_str = f"SPECIES_ASSIGNMENT(" \ f"{assign_id},{sanitized_model.species_id.get(variable.name)},{expression})" elif isinstance(variable, gillespy2.Parameter): assign_str = f"VARIABLE_ASSIGNMENT(" \ f"{assign_id},{sanitized_model.parameter_id.get(variable.name)},{expression})" else: raise ValueError(f"Invalid event assignment {assign}: received variable of type {type(variable)} " f"Must be of type str, Species, or Parameter") assignments.append(str(assign_id)) event_assignment_list.append(assign_str) assign_id += 1 # Check for "None"s for a in assignments: if a is None: raise Exception(f"assignment={a} is None in event={event}") if event_id is None: raise Exception(f"event_id is None in event={event}") if trigger is None: raise Exception(f"trigger is None in event={event}") if delay is None: raise Exception(f"delay is None in event={event}") if priority is None: raise Exception(f"priority is None in event={event}") if use_trigger is None: raise Exception(f"use_trigger is None in event={event}") if use_persist is None: raise Exception(f"use_persist is None in event={event}") if initial_value is None: raise Exception(f"initial_value is None in event={event}") assignments: "str" = " AND ".join(assignments) event_list.append( f"EVENT(" f"{event_id}," f"{{{assignments}}}," f"{trigger}," f"{delay}," f"{priority}," f"{use_trigger}," f"{use_persist}," f"{initial_value}" f")" ) sanitized_model.options["GPY_HYBRID_SPECIES_MODES"] = " ".join(species_mode_list) sanitized_model.options["GPY_HYBRID_EVENTS"] = " ".join(event_list) sanitized_model.options["GPY_HYBRID_NUM_EVENTS"] = str(len(event_list)) sanitized_model.options["GPY_HYBRID_EVENT_ASSIGNMENTS"] = " ".join(event_assignment_list) sanitized_model.options["GPY_HYBRID_NUM_EVENT_ASSIGNMENTS"] = str(len(event_assignment_list)) return sanitized_model
[docs] @classmethod def get_supported_features(cls): return { Event, RateRule, }
[docs] @classmethod def get_supported_integrator_options(cls) -> "Set[str]": return { "rtol", "atol", "max_step" }
def _build(self, model: "Union[Model, SanitizedModel]", simulation_name: str, variable: bool, debug: bool = False, custom_definitions=None) -> str: variable = variable or len(model.listOfEvents) > 0 sanitized_model = self.__create_options(SanitizedModel(model, variable=variable)) for rate_rule in model.listOfRateRules.values(): sanitized_model.use_rate_rule(rate_rule) # determine if a constant stepsize has been requested if self.constant_tau_stepsize is not None: sanitized_model.options['GPY_CONSTANT_TAU_STEPSIZE'] = str(float(self.constant_tau_stepsize)) else: sanitized_model.options['GPY_CONSTANT_TAU_STEPSIZE'] = '0' return super()._build(sanitized_model, simulation_name, variable, debug) def _handle_return_code(self, return_code: "int") -> "int": if return_code == TauHybridCSolver.ErrorStatus.UNKNOWN: raise SimulationError("C++ solver failed (no error code given).") if return_code == TauHybridCSolver.ErrorStatus.LOOP_OVER_INTEGRATE: raise SimulationError("Loop over integrate exceeded, problem space is too stiff") if return_code == TauHybridCSolver.ErrorStatus.INTEGRATOR_FAILED: raise SimulationError("Sundials ODE solver failed with 'BAD_STEP_SIZE'") if return_code == TauHybridCSolver.ErrorStatus.INVALID_AFTER_SSA: raise SimulationError("Invalid state after single SSA step") if return_code == TauHybridCSolver.ErrorStatus.NEGATIVE_STATE_NO_SSA_REACTION: raise SimulationError("Negative State detected in step, and no reaction found to fire.") if return_code == TauHybridCSolver.ErrorStatus.NEGATIVE_STATE_AT_BEGINING_OF_STEP: raise SimulationError("Negative State detected at beginning of step. Species involved in reactions can not be negative.") return super()._handle_return_code(return_code)
[docs] @classmethod def get_solver_settings(cls): """ Returns a list of arguments supported by tau_hybrid_c_solver.run. :return: Tuple of strings, denoting all keyword argument for this solvers run() method. :rtype: tuple """ return ('model', 't', 'number_of_trajectories', 'timeout', 'increment', 'seed', 'debug', 'profile')
[docs] def run(self=None, model: Model = None, t: int = None, number_of_trajectories: int = 1, timeout: int = 0, increment: int = None, seed: int = None, debug: bool = False, profile: bool = False, variables={}, resume=None, live_output: str = None, live_output_options: dict = {}, tau_step: int = .03, tau_tol=0.03, integrator_options: "dict[str, float]" = None, use_root_finding=False, **kwargs): """ :param model: The model on which the solver will operate. (Deprecated) :type model: gillespy2.Model :param t: End time of simulation. :type t: int :param number_of_trajectories: Number of trajectories to simulate. By default number_of_trajectories = 1. :type number_of_trajectories: int :param timeout: If set, if simulation takes longer than timeout, will exit. :type timeout: int :param increment: Time step increment for plotting. :type increment: float :param seed: The random seed for the simulation. Optional, defaults to None. :type seed: int :param variables: Dictionary of species and their data that will override existing species data. :type variables: dict :param resume: Result of a previously run simulation, to be resumed. :type resume: gillespy2.Results :param live_output: The type of output to be displayed by solver. Can be "progress", "text", or "graph". :type live_output: str :param live_output_options: dictionary contains options for live_output. By default {"interval":1}. "interval" specifies seconds between displaying. "clear_output" specifies if display should be refreshed with each display. :type live_output_options: dict :param tau_tol: Tolerance level for Tau leaping algorithm. Larger tolerance values will result in larger tau steps. Default value is 0.03. :type tau_tol: float :returns: A result object containing the results of the simulation. :rtype: gillespy2.Results """ from gillespy2 import log if self is None: # Post deprecation block # raise SimulationError("TauHybridCSolver must be instantiated to run the simulation") # Pre deprecation block log.warning( """ `gillespy2.Model.run(solver=TauHybridCSolver)` is deprecated. You should use `gillespy2.Model.run(solver=TauHybridCSolver(model=gillespy2.Model)) Future releases of GillesPy2 may not support this feature. """ ) self = TauHybridCSolver(model, resume=resume) if model is not None: log.warning('model = gillespy2.model is deprecated. Future releases ' 'of GillesPy2 may not support this feature.') if self.model is None: if model is None: raise SimulationError("A model is required to run the simulation.") self._set_model(model=model) self.model.compile_prep() self.validate_model(self.model, model) self.validate_sbml_features(model=self.model) self.validate_tspan(increment=increment, t=t) if increment is None: increment = self.model.tspan[-1] - self.model.tspan[-2] if t is None: t = self.model.tspan[-1] # Validate parameters prior to running the model. self._validate_type(variables, dict, "'variables' argument must be a dictionary.") self._validate_variables_in_set(variables, self.species + self.parameters) self._validate_resume(t, resume) self._validate_kwargs(**kwargs) if resume is not None: t = abs(t - int(resume["time"][-1])) number_timesteps = int(round(t / increment + 1)) args = { "trajectories": number_of_trajectories, "timesteps": number_timesteps, "tau_tol": tau_tol, "end": t, "interval": str(number_timesteps), } if self.variable: populations = cutils.update_species_init_values(self.model.listOfSpecies, self.species, variables, resume) parameter_values = cutils.change_param_values(self.model.listOfParameters, self.parameters, self.model.volume, variables) args.update({ "init_pop": populations, "parameters": parameter_values }) elif variables: log.warning("'variables' argument ignored, because solver has variable=False.") if integrator_options is not None: integrator_options = TauHybridCSolver.validate_integrator_options(integrator_options) args.update(integrator_options) seed = self._validate_seed(seed) if seed is not None: args.update({ "seed": seed }) if live_output is not None: live_output_options['type'] = live_output display_args = { "model": self.model, "number_of_trajectories": number_of_trajectories, "timeline": np.linspace(0, t, number_timesteps), "live_output_options": live_output_options, "resume": bool(resume) } else: display_args = None args = self._make_args(args) if debug: args.append("--verbose") if use_root_finding: args.append("--use_root_finding") decoder = IterativeSimDecoder.create_default(number_of_trajectories, number_timesteps, len(self.model.listOfSpecies)) sim_exec = self._build(self.model, self.target, self.variable, False) sim_status = self._run(sim_exec, args, decoder, timeout, display_args) trajectories, time_stopped = decoder.get_output() simulation_data = self._format_output(trajectories) if sim_status == SimulationReturnCode.PAUSED: simulation_data = self._make_resume_data(time_stopped, simulation_data, t) if resume is not None: simulation_data = self._update_resume_data(resume, simulation_data, time_stopped) self.result = simulation_data self.rc = int(sim_status) return Results.build_from_solver_results(self, live_output_options)