Source code for stochss_compute.launch

'''
stochss_compute.launch
'''
# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely.
# Copyright (C) 2019-2023 GillesPy2 and StochSS developers.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import sys
import asyncio
from argparse import ArgumentParser, Namespace
from distributed import LocalCluster
from stochss_compute.server.api import start_api

[docs]def launch_server(): ''' Start the REST API. Alias to script "stochss-compute". `stochss-compute --help` OR `python -m stochss_compute.launch --help` ''' def _parse_args() -> Namespace: desc = ''' StochSS-Compute is a server and cache that anonymizes StochSS simulation data. ''' parser = ArgumentParser(description=desc, add_help=True, conflict_handler='resolve') server = parser.add_argument_group('Server') server.add_argument("-p", "--port", default=29681, type=int, required=False, help="The port to use for the server. Defaults to 29681.") cache = parser.add_argument_group('Cache') cache.add_argument('-c', '--cache', default='cache/', required=False, help='Path to use for the cache. Default ./cache') cache.add_argument('--rm', '--rm-cache', default=False, required=False, help='Whether to delete the cache upon exit. Default False.') dask = parser.add_argument_group('Dask') dask.add_argument("-H", "--dask-host", default='localhost', required=False, help="The host to use for the dask scheduler. Defaults to localhost.") dask.add_argument("-P", "--dask-scheduler-port", default=8786, type=int, required=False, help="The port to use for the dask scheduler. Defaults to 8786.") return parser.parse_args() args = _parse_args() asyncio.run(start_api(**args.__dict__))
[docs]def launch_with_cluster(): ''' Start up a Dask Cluster and StochSS-Compute REST API. Alias to script "stochss-compute-cluster". `stochss-compute cluster --help` OR `python -m stochss_compute.launch cluster --help` ''' def _parse_args() -> Namespace: usage = ''' stochss-compute-cluster -p PORT ''' desc = ''' Startup script for a StochSS-Compute cluster. StochSS-Compute is a server and cache that anonymizes StochSS simulation data. Uses Dask, a Python parallel computing library. ''' parser = ArgumentParser(description=desc, add_help=True, usage=usage, conflict_handler='resolve') server = parser.add_argument_group('Server') server.add_argument("-p", "--port", default=29681, type=int, required=False, help="The port to use for the server. Defaults to 29681.") cache = parser.add_argument_group('Cache') cache.add_argument('-c', '--cache', default='cache/', required=False, help='Path to use for the cache.') cache.add_argument('--rm', default=False, action='store_true', required=False, help='Whether to delete the cache upon exit. Default False.') dask = parser.add_argument_group('Dask') dask.add_argument("-H", "--dask-host", default=None, required=False, help="The host to use for the dask scheduler. Defaults to localhost.") dask.add_argument("-P", "--dask-scheduler-port", default=0, type=int, required=False, help="The port to use for the dask scheduler. 0 for a random port. \ Defaults to a random port.") dask.add_argument('-W', '--dask-n-workers', default=None, type=int, required=False, help='Configure the number of workers. Defaults to one per core.') dask.add_argument('-T', '--dask-threads-per-worker', default=None, required=False, type=int, help='Configure the threads per worker. \ Default will let Dask decide based on your CPU.') dask.add_argument('--dask-processes', default=None, required=False, type=bool, help='Whether to use processes (True) or threads (False). \ Defaults to True, unless worker_class=Worker, in which case it defaults to False.') dask.add_argument('-D', '--dask-dashboard-address', default=':8787', required=False, help='Address on which to listen for the Bokeh diagnostics server \ like ‘localhost:8787’ or ‘0.0.0.0:8787’. Defaults to ‘:8787’. \ Set to None to disable the dashboard. Use ‘:0’ for a random port.') dask.add_argument('-N', '--dask-name', default=None, required=False, help='A name to use when printing out the cluster, defaults to type name.') args = parser.parse_args() return args args = _parse_args() dask_args = {} for (arg, value) in vars(args).items(): if arg.startswith('dask_'): dask_args[arg[5:]] = value print('Launching Dask Cluster...') cluster = LocalCluster(**dask_args) tokens = cluster.scheduler_address.split(':') dask_host = tokens[1][2:] dask_port = int(tokens[2]) print(f'Scheduler Address: <{cluster.scheduler_address}>') for i, worker in cluster.workers.items(): print(f'Worker {i}: {worker}') print(f'Dashboard Link: <{cluster.dashboard_link}>\n') try: asyncio.run(start_api(port=args.port, cache=args.cache, dask_host=dask_host, dask_scheduler_port=dask_port, rm=args.rm)) except asyncio.exceptions.CancelledError: pass finally: print('Shutting down cluster...', end='') asyncio.run(cluster.close()) print('OK')
if __name__ == '__main__': # import os # if 'COVERAGE_PROCESS_START' in os.environ: # import coverage # coverage.process_startup() if len(sys.argv) > 1: if sys.argv[1] == 'cluster': del sys.argv[1] launch_with_cluster() launch_server()