Source code for stochss_compute.server.cache
'''
Cache for StochSS-Compute
'''
# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely.
# Copyright (C) 2019-2023 GillesPy2 and StochSS developers.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from json.decoder import JSONDecodeError
from datetime import datetime
from filelock import SoftFileLock
from gillespy2 import Results
[docs]class Cache:
'''
Cache
:param cache_dir: The root cache directory.
:type cache_dir: str
:param results_id: Simulation hash.
:type results_id: str
'''
def __init__(self, cache_dir, results_id):
self.results_path = os.path.join(cache_dir, f'{results_id}.results')
if not os.path.exists(cache_dir):
os.mkdir(cache_dir)
[docs] def create(self) -> None:
'''
Create the results file if it does not exist.
'''
try:
with open(self.results_path, 'x', encoding='utf-8') as file:
file.close()
except FileExistsError:
pass
[docs] def exists(self) -> bool:
'''
Check if the results file exists.
:returns: os.path.exists(self.results_path)
:rtype: bool
'''
return os.path.exists(self.results_path)
[docs] def is_empty(self) -> bool:
'''
Check if the results are empty.
:returns: filesize == 0 or self.exists()
:rtype: bool
'''
lock = SoftFileLock(f'{self.results_path}.lock')
with lock:
if self.exists():
filesize = os.path.getsize(self.results_path)
return filesize == 0
return True
[docs] def is_ready(self, n_traj_wanted) -> bool:
'''
Check if the results are ready to be retrieved from the cache.
:param n_traj_wanted: The number of requested trajectories.
:type: int
:returns: n_traj_wanted <= len(<Results in cache>)
:rtype: bool
'''
results = self.get()
if results is None or n_traj_wanted > len(results):
return False
return True
[docs] def n_traj_needed(self, n_traj_wanted) -> int:
'''
Calculate the difference between the number of trajectories the user has requested
and the number of trajectories currently in the cache.
:param n_traj_wanted: The number of requested trajectories.
:type: int
:returns: A number greater than or equal to zero.
:rtype: int
'''
if self.is_empty():
return n_traj_wanted
results = self.get()
if results is None:
return n_traj_wanted
diff = n_traj_wanted - len(results)
if diff > 0:
return diff
return 0
[docs] def n_traj_in_cache(self) -> int:
'''
Check the number of trajectories in the cache.
:returns: `len()` of the gillespy2.Results
:rtype: int
'''
if self.is_empty():
return 0
results = self.get()
if results is not None:
return len(results)
return 0
[docs] def get(self) -> Results or None:
'''
Retrieve a gillespy2.Results object from the cache or None if error.
:returns: Results.from_json(results_json)
:rtype: gillespy2.Results or None
'''
try:
results_json = self.read()
return Results.from_json(results_json)
except JSONDecodeError:
return None
[docs] def read(self) -> str:
'''
Retrieve a gillespy2.Results object as a JSON-formatted string.
:returns: The output of reading the file.
:rtype: str
'''
lock = SoftFileLock(f'{self.results_path}.lock')
with lock:
with open(self.results_path,'r', encoding='utf-8') as file:
return file.read()
[docs] def save(self, results: Results) -> None:
'''
Save a newly processed gillespy2.Results object to the cache.
:param results: The new Results.
:type: gillespy2.Results
'''
msg = f'{datetime.now()} | Cache | <{self.results_path}> | '
lock = SoftFileLock(f'{self.results_path}.lock')
with lock:
with open(self.results_path, 'r+', encoding='utf-8') as file:
try:
old_results = Results.from_json(file.read())
combined_results = results + old_results
print(msg+'Add')
file.seek(0)
file.write(combined_results.to_json())
except JSONDecodeError:
print(msg+'New')
file.seek(0)
file.write(results.to_json())