# GillesPy2 is a modeling toolkit for biochemical simulation.
# Copyright (C) 2019-2024 GillesPy2 developers.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import random, math, sys
from collections import OrderedDict
from scipy.integrate import ode, LSODA
import heapq
import numpy as np
import threading
import gillespy2
from gillespy2.solvers.utilities import Tau
from gillespy2.core import GillesPySolver, log, Event, RateRule, AssignmentRule, FunctionDefinition
from gillespy2.core.gillespyError import *
from gillespy2.core.results import Results
eval_globals = math.__dict__
def __piecewise(*args):
# Eval entry for piecewise functions
args = list(args)
solution = None
if len(args) % 2:
args.append(True)
for i, arg in enumerate(args):
if not i % 2:
continue
if arg:
solution = args[i - 1]
break
return solution
def __xor(*args):
# Eval entry for MathML xor function
from operator import ixor
from functools import reduce
args = list(args)
return reduce(ixor, args)
eval_globals['false'] = False
eval_globals['true'] = True
eval_globals['piecewise'] = __piecewise
eval_globals['xor'] = __xor
[docs]class TauHybridSolver(GillesPySolver):
"""
This solver uses a root-finding interpretation of the direct SSA method,
along with ODE solvers to simulate ODE and Stochastic systems
interchangeably or simultaneously.
Uses integrators from scipy.integrate.ode to perform calculations used to produce solutions.
:param model: The model on which the solver will operate.
:type model: gillespy2.Model
"""
name = "TauHybridSolver"
rc = 0
result = None
stop_event = None
def __init__(self, model=None, profile_reactions=False, constant_tau_stepsize=None):
if model is None:
raise SimulationError("A model is required to run the simulation.")
name = 'TauHybridSolver'
rc = 0
self.model = copy.deepcopy(model)
self.is_instantiated = True
self.profile_reactions = profile_reactions
self.profile_data = {}
if self.profile_reactions:
self.profile_data['time'] = []
for k in list(self.model.listOfSpecies)+list(self.model.listOfReactions):
self.profile_data[k] = []
self.non_negative_species = set()
for reaction, _ in model.listOfReactions.items():
for key, value in model.listOfReactions[reaction].reactants.items():
self.non_negative_species.add(key.name)
for key, value in model.listOfReactions[reaction].products.items():
self.non_negative_species.add(key.name)
self.constant_tau_stepsize = constant_tau_stepsize
# check if model should skip ODE integration step
self.pure_discrete = False
if len(self.model.listOfRateRules)==0:
self.pure_discrete = True
for species in self.model.listOfSpecies:
if self.model.listOfSpecies[species].mode != 'discrete':
self.pure_discrete = False
break
def __save_state_to_output(self, curr_time, save_index, curr_state, species,
trajectory, save_times):
"""
Helper function to save the curr_state to the trajectory output
"""
# Now update the step and trajectories for this step of the simulation.
# Here we make our final assignments for this step, and begin
# populating our results trajectory.
num_saves = 0
while len(save_times)>0 and save_times[0] <= curr_time:
time = save_times.pop(0) # remove the value from the front of the list
# if a solution is given for it
trajectory_index = save_index
assignment_state = copy.deepcopy(curr_state)
for s,sname in enumerate(species):
# Get ODE Solutions
trajectory[trajectory_index][s + 1] = curr_state[sname]
# Update Assignment Rules for all processed time points
if len(self.model.listOfAssignmentRules):
# Copy ODE state for assignments
assignment_state[sname] = curr_state[sname]
assignment_state['t'] = time
for ar in self.model.listOfAssignmentRules.values():
assignment_value = eval(ar.formula, {**eval_globals, **assignment_state})
assignment_state[ar.variable] = assignment_value
trajectory[trajectory_index][species.index(ar.variable.name) + 1] = assignment_value
num_saves += 1
save_index += 1
return save_index
def __toggle_reactions(self, all_compiled, deterministic_reactions, dependencies,
curr_state, det_spec, rr_sets):
"""
Helper method which is used to convert reaction channels into
rate rules, and rate rules into reaction channels, as they are switched
dynamically throughout the simulation based on user-supplied tolerance.
"""
# initialize variables
inactive_reactions = all_compiled['inactive_rxns']
rate_rules = all_compiled['rules']
rxns = all_compiled['rxns']
# If the set has changed, reactivate non-determinsitic reactions
reactivate = []
for r in inactive_reactions:
if not r in deterministic_reactions:
reactivate.append(r)
for r in reactivate:
rxns[r] = inactive_reactions.pop(r, None)
# Deactivate Determinsitic Reactions
for r in deterministic_reactions:
if not r in inactive_reactions:
inactive_reactions[r] = rxns.pop(r, None)
# floor non-det species
for s,d in det_spec.items():
if not d and isinstance(curr_state[s], float):
curr_state[s] = round(curr_state[s])
# Check if this reaction set is already compiled and in use:
if deterministic_reactions in rr_sets.keys():
return rr_sets[deterministic_reactions]
else:
# Otherwise, this is a new determinstic reaction set that must be compiled
tmp = self.__create_diff_eqs(deterministic_reactions,
dependencies, rr_sets, rate_rules)
return tmp
def __create_diff_eqs(self, comb, dependencies, rr_sets, rate_rules):
"""
Helper method used to convert stochastic reaction descriptions into
differential equations, used dynamically throught the simulation.
"""
diff_eqs = OrderedDict()
output_rules = copy.deepcopy(rate_rules)
# Initialize sample dict
rr_vars = {}
for n, rr in self.model.listOfRateRules.items():
rr_vars[rr.variable.name] = n
for spec in self.model.listOfSpecies:
if spec in rr_vars.keys():
diff_eqs[self.model.listOfSpecies[spec]] = self.model.listOfRateRules[rr_vars[spec]].formula
else:
diff_eqs[self.model.listOfSpecies[spec]] = '0'
# loop through each det reaction and concatenate it's diff eq for each species
for reaction in comb:
factor = {dep: 0 for dep in dependencies[reaction]}
for key, value in self.model.listOfReactions[reaction].reactants.items():
if not key.constant and not key.boundary_condition:
factor[key.name] -= value
for key, value in self.model.listOfReactions[reaction].products.items():
if not key.constant and not key.boundary_condition:
factor[key.name] += value
for dep in dependencies[reaction]:
if factor[dep] != 0:
if self.model.listOfSpecies[dep].mode == 'continuous':
diff_eqs[self.model.listOfSpecies[dep]] += ' + {0}*({1})'.format(factor[dep],
self.model.listOfReactions[reaction].ode_propensity_function)
else:
diff_eqs[self.model.listOfSpecies[dep]] += ' + {0}*({1})'.format(factor[dep],
self.model.listOfReactions[reaction].propensity_function)
for spec in self.model.listOfSpecies:
if diff_eqs[self.model.listOfSpecies[spec]] == '0':
del diff_eqs[self.model.listOfSpecies[spec]]
# create a dictionary of compiled gillespy2 rate rules
for spec, rate in diff_eqs.items():
output_rules[spec] = compile(gillespy2.RateRule(spec, rate).formula, '<string>', 'eval')
rr_sets[comb] = rate_rules # save values
return output_rules
def __flag_det_reactions(self, det_spec, det_rxn, dependencies):
"""
Helper method used to flag reactions that can be processed
deterministically without exceeding the user-supplied tolerance.
"""
# Determine if each rxn would be deterministic apart from other reactions
prev_state = det_rxn.copy()
for rxn in self.model.listOfReactions:
# assume it is deterministic
det_rxn[rxn] = True
# iterate through the dependent species of this reaction
for species in dependencies[rxn]:
# if any of the dependencies are discrete or (dynamic AND the
# species itself has not been flagged as deterministic)
# then allow it to be modelled discretely
if self.model.listOfSpecies[species].mode == 'discrete':
det_rxn[rxn] = False
break
if self.model.listOfSpecies[species].mode == 'dynamic' and det_spec[species] == False:
det_rxn[rxn] = False
break
# Create a hashable frozenset of all determinstic reactions
deterministic_reactions = set()
for rxn in det_rxn:
if det_rxn[rxn]:
deterministic_reactions.add(rxn)
deterministic_reactions = frozenset(deterministic_reactions)
return deterministic_reactions
def __calculate_statistics(self, curr_time, propensities, curr_state, tau_step, det_spec, cv_history={}):
"""
Calculates Mean, Standard Deviation, and Coefficient of Variance for each
dynamic species, then set if species can be represented determistically
NOTE: the argument cv_history should not be passed in, this is modified by the function
to keep a persistent data set.
"""
#move configuration to solver init (issue #905)
history_length = 12 #cite: "Statistical rules of thumb" G Van Belle
if curr_time==0.0: #re-set cv_history
for k in list(cv_history.keys()):
del cv_history[k]
CV = OrderedDict()
# calculate CV by estimating the next step
mn = {species: curr_state[species] for species, value in
self.model.listOfSpecies.items() if value.mode == 'dynamic'}
sd = {species: 0 for species, value in
self.model.listOfSpecies.items() if value.mode == 'dynamic'}
for r, rxn in self.model.listOfReactions.items():
for reactant in rxn.reactants:
if reactant.mode == 'dynamic':
mn[reactant.name] -= (propensities[r] * rxn.reactants[reactant])
sd[reactant.name] += (propensities[r] * (rxn.reactants[reactant] ** 2))
for product in rxn.products:
if product.mode == 'dynamic':
mn[product.name] += (propensities[r] * rxn.products[product])
sd[product.name] += (propensities[r] * (rxn.products[product] ** 2))
# Calcuate the derivative based CV
for species,value in self.model.listOfSpecies.items():
if value.mode == 'dynamic':
if mn[species] > 0 and sd[species] > 0:
CV[species] = math.sqrt(sd[species]) / mn[species]
else:
CV[species] = 1 # value chosen to guarantee species will be discrete
# Keep a history of the past CV values, calculate a time-averaged value
CV_a = OrderedDict() # time-averaged CV (forward derivative based)
for species,value in self.model.listOfSpecies.items():
if value.mode == 'dynamic':
if species not in cv_history:
cv_history[species] = []
cv_history[species].append(CV[species])
if len(cv_history[species]) > history_length:
cv_history[species].pop(0) #remove the first item
CV_a[species] = sum(cv_history[species])/len(cv_history[species])
# Select DISCRETE or CONTINOUS mode for each species
for species in mn:
prev_det = det_spec[species]
sref = self.model.listOfSpecies[species]
if sref.switch_min == 0:
# Set species to deterministic if CV is less than threshhold
det_spec[species] = CV_a[species] < sref.switch_tol
else:
det_spec[species] = mn[species] > sref.switch_min
return mn, sd, CV
@staticmethod
def __f(t, y, curr_state, species, reactions, rate_rules, propensities,
y_map, compiled_reactions, active_rr, events, assignment_rules):
"""
Evaluate the propensities for the reactions and the RHS of the Reactions and RateRules.
Also evaluates boolean value of event triggers.
"""
state_change = [0] * len(y_map)
curr_state['t'] = t
curr_state['time'] = t
for item, index in y_map.items():
if item in assignment_rules:
curr_state[assignment_rules[item].variable] = eval(assignment_rules[item].formula,
{**eval_globals, **curr_state})
else:
curr_state[item] = y[index]
for s, rr in active_rr.items():
try:
state_change[y_map[s.name]] += eval(rr, {**eval_globals, **curr_state})
except ValueError as e:
pass
for i, r in enumerate(compiled_reactions):
propensities[r] = eval(compiled_reactions[r], {**eval_globals, **curr_state})
state_change[y_map[r]] += propensities[r]
for event in events:
triggered = eval(event.trigger.expression, {**eval_globals, **curr_state})
if triggered:
state_change[y_map[event]] = 1
return state_change
def __find_event_time(self, sol, start, end, index, depth):
"""
Helper method providing binary search implementation for locating
precise event times.
"""
dense_range = np.linspace(start, end, 3)
mid = dense_range[1]
if start >= mid or mid >= end or depth == 20: return end
solutions = np.diff(sol.sol(dense_range)[-len(self.model.listOfEvents) + index])
bool_res = [x > 0 for x in solutions]
if bool_res[0]: # event before mid
depth += 1
return self.__find_event_time(sol, dense_range[0],
dense_range[1], index, depth)
else: # event after mid
depth += 1
return self.__find_event_time(sol, dense_range[1],
dense_range[2], index, depth)
def __detect_events(self, event_sensitivity, sol, delayed_events,
trigger_states, curr_time, curr_state):
"""
Helper method to locate precise time of event firing. This method
first searches for any instance of an event using event_sensitivity to
determine the granularity of the search. If an event is detected, a
binary search is then used to locate the precise time of that event.
"""
event_times = {}
dense_range = np.linspace(sol.t[0], sol.t[-1], len(sol.t) * event_sensitivity)
solutions = np.diff(sol.sol(dense_range))
for i, e in enumerate(self.model.listOfEvents.values()):
bool_res = [x > 0 for x in solutions[i - len(self.model.listOfEvents)]]
curr_state[e.name] = bool_res[-1]
# Search for changes from False to True in event, record first time
for y in range(1, len(dense_range) - 1):
# Check Persistent Delays. If an event is not designated as
# persistent, and the trigger expression fails to evaluate as
# true before assignment is carried out, remove the event from
# the queue.
if e.name in trigger_states and not e.trigger.persistent:
if not bool_res[y]:
delayed_events[i] = delayed_events[-1] # move to end
heapq.heappop(delayed_events)
del trigger_states[e.name]
curr_state[e.name] = False
# IF triggered from false to true, refine search
elif bool_res[y] and dense_range[y] != curr_time and bool_res[y - 1] == 0:
event_time = self.__find_event_time(sol, dense_range[y - 1],
dense_range[y + 1], i, 0)
if event_time in event_times:
event_times[event_time].append(e)
else:
event_times[event_time] = [e]
break
return event_times
def __get_next_step(self, event_times, reaction_times, delayed_events,
sim_end, next_tau):
"""
Helper method to determine the next action to take during simulation,
and returns that action along with the time that it occurs.
"""
next_event_trigger = sim_end + 2
next_delayed_event = sim_end + 3
curr_time = sim_end
# event triggers
if len(event_times):
next_event_trigger = min(event_times)
# delayed events
if len(delayed_events):
next_delayed_event = delayed_events[0][0]
# Execute rxns/events
next_step = {sim_end: 'end', next_tau: 'tau', next_event_trigger: 'trigger', next_delayed_event: 'delay'}
# Set time to next action
curr_time = min(sim_end, next_tau, next_event_trigger,
next_delayed_event)
return next_step[curr_time], curr_time
def __process_queued_events(self, event_queue, trigger_states,
curr_state, det_spec):
"""
Helper method which processes the events queue. Method is primarily for
evaluating assignments at the designated state (trigger time or event
time).
"""
# Process all queued events
events_processed = []
pre_assignment_state = curr_state.copy()
while event_queue:
# Get events in priority order
fired_event = self.model.listOfEvents[heapq.heappop(event_queue)[1]]
events_processed.append(fired_event)
if fired_event.name in trigger_states:
assignment_state = trigger_states[fired_event.name]
del trigger_states[fired_event.name]
else:
assignment_state = pre_assignment_state
for a in fired_event.assignments:
# Get assignment value
assign_value = eval(a.expression, eval_globals, assignment_state)
# Update state of assignment variable
curr_state[a.variable.name] = assign_value
return events_processed
def __handle_event(self, event, curr_state, curr_time, event_queue,
trigger_states, delayed_events):
"""
Helper method providing logic for updating states based on assignments.
"""
# Fire trigger time events immediately
if event.delay is None:
heapq.heappush(event_queue, (eval(event.priority), event.name))
# Queue delayed events
else:
curr_state['t'] = curr_time
curr_state['time'] = curr_time
execution_time = curr_time + eval(event.delay, {**eval_globals, **curr_state})
curr_state[event.name] = True
heapq.heappush(delayed_events, (execution_time, event.name))
if event.use_values_from_trigger_time:
trigger_states[event.name] = curr_state.copy()
else:
trigger_states[event.name] = curr_state
def __check_t0_events(self, initial_state):
"""
Helper method for firing events who reach a trigger condition at start
of simulation, time == 0.
"""
# Check Event State at t==0
species_modified_by_events = []
t0_delayed_events = {}
for e in self.model.listOfEvents.values():
if not e.trigger.value:
t0_firing = eval(e.trigger.expression, {**eval_globals, **initial_state})
if t0_firing:
if e.delay is None:
for a in e.assignments:
initial_state[a.variable.name] = eval(a.expression, {**eval_globals, **initial_state})
species_modified_by_events.append(a.variable.name)
else:
execution_time = eval(e.delay, {**eval_globals, **initial_state})
t0_delayed_events[e.name] = execution_time
return t0_delayed_events, species_modified_by_events
def __update_stochastic_rxn_states(self, propensities, tau_step, compiled_reactions, curr_state, only_update=None):
"""
Helper method for updating the state of stochastic reactions.
if 'only_update' is set to a reaction name, it will only reset that reaction, and only one firing
"""
rxn_count = OrderedDict()
species_modified = OrderedDict()
# Update stochastic reactions
for rxn in compiled_reactions:
rxn_count[rxn] = 0
if only_update is not None: # for a single SSA step
if rxn == only_update:
curr_state[rxn] = math.log(np.random.uniform(0, 1)) #set this value, needs to be <0
rxn_count[rxn] = 1
elif curr_state[rxn] > 0:
rxn_count[rxn] = 1 + np.random.poisson(curr_state[rxn])
curr_state[rxn] = math.log(np.random.uniform(0, 1))
if rxn_count[rxn]:
for reactant in self.model.listOfReactions[rxn].reactants:
species_modified[reactant.name] = True
curr_state[reactant.name] -= self.model.listOfReactions[rxn].reactants[reactant] * rxn_count[rxn]
for product in self.model.listOfReactions[rxn].products:
species_modified[product.name] = True
curr_state[product.name] += self.model.listOfReactions[rxn].products[product] * rxn_count[rxn]
return species_modified, rxn_count
def __integrate_constant(self, integrator_options, curr_state, y0, curr_time,
propensities, y_map, compiled_reactions,
active_rr, event_queue,
delayed_events, trigger_states,
event_sensitivity, tau_step ):
"""
Integreate one step, skip ODE integrator as all species are
discrete.
"""
tau_step = max(1e-6, tau_step)
next_tau = curr_time + tau_step
prev_time = curr_time
curr_state['t'] = curr_time
curr_state['time'] = curr_time
event_times = {}
reaction_times = []
next_step, curr_time = self.__get_next_step(event_times, reaction_times,
delayed_events,
self.model.tspan[-1], next_tau)
curr_state['t'] = curr_time
tau_step2 = curr_time - prev_time
for rxn in compiled_reactions:
curr_state[rxn] = curr_state[rxn] + propensities[rxn]*tau_step2
return curr_time, tau_step2
def __integrate(self, integrator_options, curr_state, y0, curr_time,
propensities, y_map, compiled_reactions,
active_rr, event_queue,
delayed_events, trigger_states,
event_sensitivity, tau_step, det_rxn ):
"""
Helper function to perform the ODE integration of one step. This
method uses scipy.integrate.LSODA to get simulation data, and
determines the next stopping point of the simulation. The state is
updated and returned to __simulate along with curr_time and the
solution object.
"""
use_const = False
if self.pure_discrete:
use_const = True
elif len(self.model.listOfRateRules)==0:
use_const = True
for rxn in det_rxn:
if det_rxn[rxn]:
use_const = False
break
if use_const:
return self.__integrate_constant(integrator_options, curr_state, y0, curr_time,
propensities, y_map, compiled_reactions, active_rr, event_queue,
delayed_events, trigger_states, event_sensitivity, tau_step)
from functools import partial
events = self.model.listOfEvents.values()
dense_output = False
int_args = [curr_state, self.model.listOfSpecies, self.model.listOfReactions,
self.model.listOfRateRules,
propensities, y_map,
compiled_reactions,
active_rr,
events,
self.model.listOfAssignmentRules]
rhs = lambda t, y: TauHybridSolver.__f(t, y, *int_args)
if 'min_step' in integrator_options:
tau_step = max(integrator_options['min_step'], tau_step)
else:
tau_step = max(1e-6, tau_step)
#next_tau = curr_time + tau_step
last_curr_time = curr_time
# Set curr time to next time a change occurs in the system outside of
# the standard ODE process. Determine what kind of change this is,
# and set the curr_time of simulation to restart simulation after
# making the appropriate state changes.
event_times = {}
reaction_times = []
next_step, curr_time = self.__get_next_step(event_times, reaction_times,
delayed_events,
self.model.tspan[-1], curr_time + tau_step)
curr_state['t'] = curr_time
curr_state['time'] = curr_time
actual_tau_step = last_curr_time - curr_time
# Integrate until end or tau is reached
loop_count = 0
sol = LSODA(rhs, last_curr_time, y0, curr_time)
counter = 0
while sol.t < curr_time:
counter += 1
sol.step()
# Update states of all species based on changes made to species through
# ODE processes. This will update all species whose mode is set to
# 'continuous', as well as 'dynamic' mode species which have been
# flagged as deterministic.
for spec_name, species in self.model.listOfSpecies.items():
if not species.constant:
curr_state[spec_name] = sol.y[y_map[spec_name]]
# Search for precise event times
'''
if len(model.listOfEvents):
event_times = self.__detect_events(event_sensitivity, sol, delayed_events,
trigger_states, curr_time, curr_state)
else:
event_times = {}
# Get next tau time
reaction_times = []
'''
# Stochastic Reactions are also fired through a root-finding method
# which mirrors the standard SSA probability. Since we are using
# Tau-Leaping, rather than firing reactions based on a direct poisson
# distribution from the propensity, we use that same poisson
# distribution to select a random negative offset for the reaction
# state, then integrate forward along with any deterministic
# reactions/species for a designated time (tau). Root crossings
# satisfy the SSA firing process, and because multiple reactions can be
# fired in a single Tau step, a new random number will be generated and
# added (representing a single-firing each time) until the state value
# of the reaction is once again negative.
for rxn in compiled_reactions:
curr_state[rxn] = sol.y[y_map[rxn]]
# In the case that a major change occurs to the system (outside of the
# standard ODE process) is caused by an event trigger, we then examine
# the event which was triggered. if Event assignments are carried out
# immediately, we add them to a heap in Event Priority order to be
# immediately dealt with. If an Event contains a delay, we can now set
# the time of event assignment execution by evaluating the event delay
# in the current state.
if next_step == 'trigger':
for event in event_times[curr_time]:
self.__handle_event(event, curr_state, curr_time,
event_queue, trigger_states, delayed_events)
# In the case that a major change occurs to the system (outside of the
# standard ODE process) is caused by a delayed event which has now
# reached the designated time of execution, we add all currently
# designated event assignments to a heap to be processed in Event
# Priority order.
elif next_step == 'delay':
event = heapq.heappop(delayed_events)
heapq.heappush(event_queue, (eval(self.model.listOfEvents[event[1]].priority), event[1]))
return curr_time, actual_tau_step
def __simulate_negative_state_check(self, curr_state):
# check each species to see if they are negative
for s in self.non_negative_species:
if curr_state[s] < 0:
raise SimulationError(f"Negative State detected at begining of step." \
" Species involved in reactions can not be negative.")
def __simulate_invalid_state_check(self, species_modified, curr_state, compiled_reactions):
invalid_state = False
err_message=""
# check each species to see if they are negative
for s in species_modified.keys():
if curr_state[s] < 0:
invalid_state = True
err_message += f"'{s}' has negative state '{curr_state[s]}'"
return (invalid_state, err_message)
def __simulate(self, integrator_options, curr_state, y0, curr_time,
propensities, species, parameters, compiled_reactions,
active_rr, y_map, trajectory, save_times, save_index,
delayed_events, trigger_states, event_sensitivity,
tau_step, debug, det_spec, det_rxn):
"""
Function to process simulation until next step, which can be a
stochastic reaction firing, an event trigger or assignment, or end of
simulation.
:param curr_state: Contains all state variables for system at current time
:type curr_state: dict
:param curr_time: Represents current time
:type curr_time: float
:param save_times: Currently unreached save points
:type save_times: list
:returns: curr_state, curr_time, save_times, sol
"""
# first check if we have a valid state:
self.__simulate_negative_state_check(curr_state)
if curr_time == 0.0:
# save state at beginning of simulation
save_index = self.__save_state_to_output(
curr_time, save_index, curr_state, species, trajectory, save_times
)
event_queue = []
prev_y0 = copy.deepcopy(y0)
prev_curr_state = copy.deepcopy(curr_state)
prev_curr_time = curr_time
loop_count = 0
invalid_state = False
starting_curr_state = copy.deepcopy(curr_state)
starting_propensities = copy.deepcopy(propensities)
starting_tau_step=tau_step
species_modified=None
rxn_count=None
loop_err_message=""
if self.profile_reactions:
self.profile_data['time'].append(curr_time)
for k in list(self.model.listOfSpecies)+list(self.model.listOfReactions):
self.profile_data[k].append(curr_state[k])
# check each reaction to see if it is >=0. If we have taken a single SSA step, this could be >0 for the non-selected reactions, check if propensity is zero and reset if so
for r in compiled_reactions.keys():
if curr_state[r] >= 0 and propensities[r] == 0:
curr_state[r] = math.log(np.random.uniform(0, 1))
curr_time, actual_tau_step = self.__integrate(integrator_options, curr_state,
y0, curr_time, propensities, y_map,
compiled_reactions,
active_rr,
event_queue,
delayed_events,
trigger_states,
event_sensitivity,
tau_step, det_rxn
)
species_modified,rxn_count = self.__update_stochastic_rxn_states(propensities, actual_tau_step, compiled_reactions, curr_state)
# Occasionally, a tau step can result in an overly-aggressive
# forward step and cause a species population to fall below 0,
# which would result in an erroneous simulation.
# We estimate the time to the first
# stochatic reaction firing (assume constant propensities) and
# simulate the ODE system until that time, fire that reaction
# and continue the simulation.
(invalid_state, invalid_err_message) = self.__simulate_invalid_state_check(species_modified, curr_state, compiled_reactions)
if invalid_state:
invalid_state = False
# Redo this step, with a smaller Tau such that a single SSA reaction occurs
y0 = copy.deepcopy(prev_y0)
curr_state_after = copy.deepcopy(curr_state)
curr_state = copy.deepcopy(prev_curr_state)
floored_curr_state = copy.deepcopy(prev_curr_state)
propensities_after = copy.deepcopy(propensities)
propensities = copy.deepcopy(starting_propensities)
floored_propensities = copy.deepcopy(starting_propensities)
curr_time = prev_curr_time
# floored propensites
for i, s in enumerate(self.model.listOfSpecies):
floored_curr_state[s] = math.floor(floored_curr_state[s])
saved_curr_state = copy.deepcopy(curr_state)
curr_state = floored_curr_state
for i, r in enumerate(compiled_reactions):
try:
floored_propensities[r] = eval(compiled_reactions[r], {**eval_globals, **curr_state})
except Exception as e:
raise SimulationError('Error calculation propensity for {0}.\nReason: {1}\nfloored_propensities={2}\ncompiled_reactions={3}'.format(r, e, floored_propensities,compiled_reactions))
curr_state = saved_curr_state
rxn_times = OrderedDict()
min_tau = None
rxn_selected = None
for rname,rcnt in rxn_count.items():
if floored_propensities[rname] > 0.0:
# estimate the zero crossing time
rxn_times[rname] = -1* curr_state[rname] / propensities[rname]
if min_tau is None or min_tau > rxn_times[rname]:
min_tau = rxn_times[rname]
rxn_selected = rname
if rxn_selected is None: raise SimulationError(f"Negative State detected in step, and no reaction found to fire. error_message={invalid_err_message}")
tau_step = min_tau #estimated time to the first stochatic reaction
curr_time, actual_tau_step = self.__integrate(integrator_options, curr_state,
y0, curr_time, propensities, y_map,
compiled_reactions,
active_rr,
event_queue,
delayed_events,
trigger_states,
event_sensitivity,
tau_step, det_rxn
)
# only update the selected reaction
first_rxn_count = copy.deepcopy(rxn_count)
first_err_message = invalid_err_message
species_modified,rxn_count = self.__update_stochastic_rxn_states(propensities, actual_tau_step, compiled_reactions, curr_state, only_update=rxn_selected)
(invalid_state, invalid_err_message) = self.__simulate_invalid_state_check(species_modified, curr_state, compiled_reactions)
if invalid_state:
raise SimulationError(f"Negative State detected in step, after single SSA step. error_message={invalid_err_message}")
save_index = self.__save_state_to_output(curr_time, save_index, curr_state, species, trajectory, save_times)
events_processed = self.__process_queued_events(event_queue, trigger_states, curr_state, det_spec)
# Finally, perform a final check on events after all non-ODE assignment
# changes have been carried out on model.
event_cycle = True
while event_cycle:
event_cycle = False
for i, e in enumerate(self.model.listOfEvents.values()):
triggered = eval(e.trigger.expression, {**eval_globals, **curr_state})
if triggered and not curr_state[e.name]:
curr_state[e.name] = True
self.__handle_event(e, curr_state, curr_time,
event_queue, trigger_states, delayed_events)
event_cycle = True
elif not triggered:
curr_state[e.name] = False
events_processed = self.__process_queued_events(event_queue, trigger_states, curr_state, det_spec)
return curr_state, curr_time, save_times, save_index
def __set_seed(self, seed):
# Set seed if supplied
if seed is not None:
if not isinstance(seed, int):
seed = int(seed)
if seed > 0:
np.random.seed(seed)
else:
raise ModelError('seed must be a positive integer')
def __set_recommended_ode_defaults(self, integrator_options):
"""
Set some ODE solver defaults. These values are chosen based on the
precision required to successfully complete the SBML Test suite.
"""
if 'rtol' not in integrator_options:
integrator_options['rtol'] = 1e-9
if 'atol' not in integrator_options:
integrator_options['atol'] = 1e-12
if 'max_step' not in integrator_options:
integrator_options['max_step'] = 0.25
def __compile_all(self):
"""
Compile all run-time evaluables to enhance performance.
"""
compiled_reactions = OrderedDict()
for i, r in enumerate(self.model.listOfReactions):
compiled_reactions[r] = compile(self.model.listOfReactions[r].propensity_function, '<string>',
'eval')
compiled_rate_rules = OrderedDict()
for i, rr in enumerate(self.model.listOfRateRules.values()):
if isinstance(rr.variable, str):
compiled_rate_rules[self.model.listOfSpecies[rr.variable]] = compile(
rr.formula, '<string>', 'eval')
else:
compiled_rate_rules[rr.variable] = compile(rr.formula, '<string>', 'eval')
compiled_inactive_reactions = OrderedDict()
compiled_propensities = copy.deepcopy(compiled_reactions)
return compiled_reactions, compiled_rate_rules, compiled_inactive_reactions, compiled_propensities
def __initialize_state(self, curr_state, debug):
"""
Initialize curr_state for each trajectory.
"""
# intialize parameters to current state
for p in self.model.listOfParameters:
curr_state[p] = self.model.listOfParameters[p].value
# initialize species population state
for s in self.model.listOfSpecies:
curr_state[s] = self.model.listOfSpecies[s].initial_value
# Set reactions to uniform random number
for i, r in enumerate(self.model.listOfReactions):
curr_state[r] = math.log(np.random.uniform(0, 1))
if debug:
print("Setting Random number ", curr_state[r], " for ", self.model.listOfReactions[r].name)
# Initialize event last-fired times to 0
for e_name in self.model.listOfEvents:
curr_state[e_name] = 0
sanitized_species = self.model.sanitized_species_names()
sanitized_parameters = self.model.sanitized_parameter_names()
for fd in self.model.listOfFunctionDefinitions.values():
sanitized_function = fd.sanitized_function(sanitized_species, sanitized_parameters)
curr_state[fd.name] = eval(f"lambda {', '.join(fd.args)}: {sanitized_function}", eval_globals)
for ar in self.model.listOfAssignmentRules.values():
if ar.variable in self.model.listOfSpecies:
continue
curr_state[ar.variable] = ar.formula
def __map_state(self, species, parameters, compiled_reactions, events, curr_state):
"""
Creates the start state vector for integration and provides a
dictionary map to it's elements.
"""
y_map = OrderedDict()
# Build integration start state
y0 = [0] * (len(species) + len(parameters) + len(compiled_reactions) + len(events))
for i, spec in enumerate(species):
if isinstance(curr_state[spec], str):
y0[i] = eval(curr_state[spec], {**eval_globals, **curr_state})
else:
y0[i] = curr_state[spec]
y_map[spec] = i
for i, param in enumerate(parameters):
y0[i + len(species)] = curr_state[param]
y_map[param] = i + len(species)
for i, rxn in enumerate(compiled_reactions):
y0[i + len(species) + len(parameters)] = curr_state[rxn]
y_map[rxn] = i + len(species) + len(parameters)
for i, event in enumerate(events.values()):
y0[i + len(species) + len(parameters) + len(compiled_reactions)] = curr_state[event.name]
y_map[event] = i + len(species) + len(parameters) + len(compiled_reactions)
return y0, y_map
[docs] @classmethod
def get_solver_settings(cls):
"""
Returns a list of arguments supported by tau_hybrid_solver.run.
:returns: Tuple of strings, denoting all keyword argument for this solvers run() method.
:rtype: tuple
"""
return ('model', 't', 'number_of_trajectories', 'increment', 'seed', 'debug', 'profile', 'tau_tol',
'event_sensitivity', 'integrator_options', 'timeout')
[docs] @classmethod
def get_supported_features(cls):
return {
Event,
RateRule,
AssignmentRule,
FunctionDefinition,
}
[docs] def run(self=None, model=None, t=None, number_of_trajectories=1, increment=None, seed=None,
debug=False, profile=False, tau_tol=0.03, event_sensitivity=100,integrator_options={},
live_output=None, live_output_options={}, timeout=None, **kwargs):
"""
Function calling simulation of the model. This is typically called by the run function in GillesPy2 model
objects and will inherit those parameters which are passed with the model as the arguments this run function.
:param model: The model on which the solver will operate. (Deprecated)
:type model: gillespy2.Model
:param t: Simulation run time.
:type t: int or float
:param number_of_trajectories: Number of trajectories to simulate. By default number_of_trajectories = 1.
:type number_of_trajectories: int
:param increment: Save point increment for recording data.
:type increment: float
:param seed: The random seed for the simulation. Optional, defaults to None.
:type seed: int
:param debug: Set to True to provide additional debug information about the simulation.
:type debug: bool
:param profile: Set to True to provide information about step size (tau) taken at each step.
:type profile: bool
:param tau_tol: Tolerance level for Tau leaping algorithm. Larger tolerance values will
result in larger tau steps. Default value is 0.03.
:type tau_tol: float
:param event_sensitivity: Number of data points to be inspected between integration
steps/save points for event detection. Default event_sensitivity = 100
:type event_sensitivity: int
:param integrator_options: contains options to the scipy integrator. by default, this includes
rtol=1e-9 and atol=1e-12. for a list of options,
see https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.LSODA.html.
Example use: {max_step : 0, rtol : .01}
:type integrator_options: dict
:param live_output: The type of output to be displayed by solver. Can be "progress", "text", or "graph".
:type live_output: str
:param live_output_options: contains options for live_output. By default {"interval":1}.
"interval" specifies seconds between displaying.
"clear_output" specifies if display should be refreshed with each display
:type live_output_options: dict
:param timeout: If set, if simulation takes longer than timeout, will exit.
:type timeout: int
:returns: A result object containing the results of the simulation.
:rtype: gillespy2.Results
"""
from gillespy2 import log
if self is None:
# Post deprecation block
# raise SimulationError("TauHybridSolver must be instantiated to run the simulation")
# Pre deprecation block
log.warning(
"""
`gillespy2.Model.run(solver=TauHybridSolver)` is deprecated.
You should use `gillespy2.Model.run(solver=TauHybridSolver(model=gillespy2.Model))
Future releases of GillesPy2 may not support this feature.
"""
)
self = TauHybridSolver(model=model)
if model is not None:
log.warning('model = gillespy2.model is deprecated. Future releases '
'of GillesPy2 may not support this feature.')
if self.model is None:
if model is None:
raise SimulationError("A model is required to run the simulation.")
self.model = copy.deepcopy(model)
self.model.compile_prep()
self.validate_model(self.model, model)
self.validate_sbml_features(model=self.model)
self.validate_tspan(increment=increment, t=t)
if increment is None:
increment = self.model.tspan[-1] - self.model.tspan[-2]
if t is None:
t = self.model.tspan[-1]
if timeout is not None and timeout > 0:
for i, s in enumerate(list(self.model._listOfSpecies.keys())):
# Solve_ivp doesn't return any results until it's finished solving so timing out early only slows
# the solver.
if self.model.listOfSpecies[s].mode == 'continuous':
timeout = 0
log.warning('timeouts not supported by continuous species.')
break
elif self.model.listOfSpecies[s].mode == 'dynamic':
log.warning('timeouts not fully supported by dynamic species. If timeout is triggered during'
' integration, total solve time could be longer than expected.')
break
self.stop_event = threading.Event()
if len(kwargs) > 0:
for key in kwargs:
log.warning('Unsupported keyword argument to {0} solver: {1}'.format(self.name, key))
if timeout is not None and timeout <= 0:
timeout = None
if debug:
print("t = ", t)
print("increment = ", increment)
if len(self.model.listOfEvents):
self.__set_recommended_ode_defaults(integrator_options)
self.__set_seed(seed)
species = list(self.model._listOfSpecies.keys())
number_species = len(species)
initial_state = OrderedDict()
self.__initialize_state(initial_state, debug)
initial_state['vol'] = self.model.volume
initial_state['t'] = 0
# create numpy array for timeline
timeline = np.linspace(0, t, int(round(t / increment + 1)))
self.model.tspan = timeline
# create numpy matrix to mark all state data of time and species
trajectory_base = np.zeros((number_of_trajectories, timeline.size, number_species + 1))
# copy time values to all trajectory row starts
trajectory_base[:, :, 0] = timeline
# copy initial populations to base
spec_modes = ['continuous', 'dynamic', 'discrete', None]
for i, s in enumerate(species):
if self.model.listOfSpecies[s].mode is None:
self.model.listOfSpecies[s].mode = 'dynamic'
if self.model.listOfSpecies[s].mode not in spec_modes:
raise SpeciesError('Species mode can only be \'continuous\', \'dynamic\',\'discrete\', or '
'\'unspecified(default to dynamic)\'.')
trajectory_base[:, 0, i + 1] = initial_state[s]
# curr_time and curr_state are list of len 1 so that __run receives reference
curr_time = [0] # Current Simulation Time
curr_state = [None]
live_grapher = [None]
sim_thread = threading.Thread(target=self.___run,
args=(curr_state, curr_time, timeline, trajectory_base, initial_state,
live_grapher,), kwargs={'t': t,
'number_of_trajectories': number_of_trajectories,
'increment': increment, 'seed': seed,
'debug': debug, 'profile': profile,
'tau_tol': tau_tol,
'event_sensitivity': event_sensitivity,
'integrator_options': integrator_options})
try:
sim_thread.start()
if live_output is not None:
live_output_options['type'] = live_output
import gillespy2.core.liveGraphing
gillespy2.core.liveGraphing.valid_graph_params(live_output_options)
if live_output_options['type'] == "graph":
for i, s in enumerate(list(self.model._listOfSpecies.keys())):
if self.model.listOfSpecies[s].mode == 'continuous':
log.warning('display \"type\" = \"graph\" not recommended with continuous species. '
'Try display \"type\" = \"text\" or \"progress\".')
break
live_grapher[0] = gillespy2.core.liveGraphing.LiveDisplayer(self.model, timeline, number_of_trajectories,
live_output_options)
display_timer = gillespy2.core.liveGraphing.RepeatTimer(live_output_options['interval'],
live_grapher[0].display,
args=(curr_state, curr_time, trajectory_base, live_output))
display_timer.start()
sim_thread.join(timeout=timeout)
if live_grapher[0] is not None:
display_timer.cancel()
self.stop_event.set()
while self.result is None: pass
except:
pass
if hasattr(self, 'has_raised_exception'):
raise self.has_raised_exception
return Results.build_from_solver_results(self, live_output_options)
def ___run(self, curr_state, curr_time, timeline, trajectory_base, initial_state, live_grapher, t=20,
number_of_trajectories=1, increment=0.05, seed=None,
debug=False, profile=False, tau_tol=0.03, event_sensitivity=100,
integrator_options={}, **kwargs):
try:
self.__run(curr_state, curr_time, timeline, trajectory_base, initial_state, live_grapher, t,
number_of_trajectories, increment, seed, debug,
profile, tau_tol, event_sensitivity,
integrator_options, **kwargs)
except Exception as e:
self.has_raised_exception = e
self.result = []
return [], -1
def __run(self, curr_state, curr_time, timeline, trajectory_base, initial_state, live_grapher, t=20,
number_of_trajectories=1, increment=0.05, seed=None,
debug=False, profile=False,
tau_tol=0.03, event_sensitivity=100,
integrator_options={}, **kwargs):
# create mapping of species dictionary to array indices
species_mappings = self.model._listOfSpecies
species = list(species_mappings.keys())
parameter_mappings = self.model._listOfParameters
parameters = list(parameter_mappings.keys())
number_species = len(species)
t0_delayed_events, species_modified_by_events = self.__check_t0_events(initial_state)
# Create deterministic tracking data structures
det_spec = {species: value.mode != 'discrete' for (species, value) in self.model.listOfSpecies.items()}
det_rxn = {rxn: False for (rxn, value) in self.model.listOfReactions.items()}
# Determine if entire simulation is ODE or Stochastic, in order to
# avoid unnecessary calculations during simulation
simulation_data = []
dependencies = OrderedDict()
# If considering deterministic changes, create dependency data
# structure for creating diff eqs later
for reaction in self.model.listOfReactions:
dependencies[reaction] = set()
[dependencies[reaction].add(reactant.name) for reactant in self.model.listOfReactions[reaction].reactants]
[dependencies[reaction].add(product.name) for product in self.model.listOfReactions[reaction].products]
# Main trajectory loop
for trajectory_num in range(number_of_trajectories):
if self.stop_event.is_set():
print('exiting')
self.rc = 33
break
# For multi trajectories, live_grapher needs to be informed of trajectory increment
if live_grapher[0] is not None:
live_grapher[0].increment_trajectory(trajectory_num)
trajectory = trajectory_base[trajectory_num] # NumPy array containing this simulation's results
propensities = OrderedDict() # Propensities evaluated at current state
curr_state[0] = initial_state.copy()
curr_time[0] = 0 # Current Simulation Time
for i, r in enumerate(self.model.listOfReactions):
curr_state[0][r] = math.log(np.random.uniform(0, 1))
if debug:
print("Setting Random number ", curr_state[0][r], " for ", self.model.listOfReactions[r].name)
end_time = self.model.tspan[-1] # End of Simulation time
entry_pos = 1
data = OrderedDict() # Dictionary for results
data['time'] = timeline # All time entries
save_index = 0
# Record Highest Order reactant for each reaction and set error tolerance
HOR, reactants, mu_i, sigma_i, g_i, epsilon_i, critical_threshold = Tau.initialize(self.model, tau_tol)
# One-time compilations to reduce time spent with eval
compiled_reactions, compiled_rate_rules, compiled_inactive_reactions, compiled_propensities = \
self.__compile_all()
all_compiled = OrderedDict()
all_compiled['rxns'] = compiled_reactions
all_compiled['inactive_rxns'] = compiled_inactive_reactions
all_compiled['rules'] = compiled_rate_rules
save_times = list(np.copy(self.model.tspan))
delayed_events = []
trigger_states = {}
# Handle delayed t0 events
for state in trigger_states.values():
if state is None: state = curr_state[0]
for ename, etime in t0_delayed_events.items():
curr_state[0][ename] = True
heapq.heappush(delayed_events, (etime, ename))
if self.model.listOfEvents[ename].use_values_from_trigger_time:
trigger_states[ename] = curr_state[0].copy()
else:
trigger_states[ename] = curr_state[0]
# Each save step
while curr_time[0] < self.model.tspan[-1]:
if self.stop_event.is_set():
self.rc = 33
break
# Get current propensities
for i, r in enumerate(self.model.listOfReactions):
try:
propensities[r] = eval(compiled_propensities[r], eval_globals, curr_state[0])
if curr_state[0][r] > 0 and propensities[r]==0:
# This is an edge case, that might happen after a single SSA step.
curr_state[0][r] = math.log(np.random.uniform(0, 1))
except Exception as e:
raise SimulationError('Error calculation propensity for {0}.\nReason: {1}\ncurr_state={2}'.format(r, e, curr_state))
# Calculate Tau statistics and select a good tau step
if self.constant_tau_stepsize is None:
tau_step = Tau.select(HOR, reactants, mu_i, sigma_i, g_i, epsilon_i, tau_tol, critical_threshold, self.model, propensities, curr_state[0], curr_time[0], save_times[0])
else:
tau_step = self.constant_tau_stepsize
if debug:
X = ''
for k,v in curr_state[0].items():
X+= f'{k}:{v:.2f} '
print(f"t={curr_time[0]} tau={tau_step} curr_state={X}")
# Process switching if used
mn, sd, CV = self.__calculate_statistics(curr_time[0], propensities, curr_state[0], tau_step, det_spec)
# Calculate sd and CV for hybrid switching and flag deterministic reactions
deterministic_reactions = self.__flag_det_reactions(det_spec, det_rxn, dependencies)
if debug:
print('mean: {0}'.format(mn))
print('standard deviation: {0}'.format(sd))
print('CV: {0}'.format(CV))
print('det_spec: {0}'.format(det_spec))
print('det_rxn: {0}'.format(det_rxn))
# Set active reactions and rate rules for this integration step
rr_sets = {frozenset() : compiled_rate_rules} # base rr set
active_rr = self.__toggle_reactions(all_compiled, deterministic_reactions,
dependencies, curr_state[0], det_spec, rr_sets)
# Create integration initial state vector
y0, y_map = self.__map_state(species, parameters,
compiled_reactions, self.model.listOfEvents, curr_state[0])
# Run simulation to next step
curr_state[0], curr_time[0], save_times, save_index = self.__simulate(integrator_options,
curr_state[0], y0, curr_time[0],
propensities, species,
parameters, compiled_reactions,
active_rr, y_map,
trajectory, save_times, save_index,
delayed_events,
trigger_states,
event_sensitivity, tau_step,
debug, det_spec, det_rxn)
# End of trajectory, format results
data = {'time': timeline}
for i in range(number_species):
data[species[i]] = trajectory[:, i + 1]
simulation_data.append(data)
self.result = simulation_data
return simulation_data, self.rc