Skip to content

Commit

Permalink
Merge pull request #572 from cylc/1.4.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 1.4.x-sync into master
  • Loading branch information
oliver-sanders authored Mar 27, 2024
2 parents 49fa3ef + 17c8a71 commit 4df09a5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
16 changes: 15 additions & 1 deletion cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,16 @@ class CylcUIServer(ExtensionApp):
''',
default_value=1
)
max_threads = Int(
config=True,
help='''
Set the maximum number of threads the Cylc UI Server can use.
This determines the maximum number of active workflows that the
server can track.
''',
default_value=100,
)
profile = Bool(
config=True,
help='''
Expand Down Expand Up @@ -397,7 +407,11 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ProcessPoolExecutor(max_workers=self.max_workers)
self.workflows_mgr = WorkflowsManager(self, log=self.log)
self.data_store_mgr = DataStoreMgr(self.workflows_mgr, self.log)
self.data_store_mgr = DataStoreMgr(
self.workflows_mgr,
self.log,
self.max_threads,
)
# sub_status dictionary storing status of subscriptions
self.sub_statuses = {}
self.resolvers = Resolvers(
Expand Down
22 changes: 19 additions & 3 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,37 @@ def _inner(*args, **kwargs): # works for serial & async calls


class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows."""
"""Manage the local data-store acquisition/updates for all workflows.
Args:
workflows_mgr:
Service that scans for workflows.
log:
Application logger.
max_threads:
Max number of threads to use for subscriptions.
Note, this determines the maximum number of active workflows that
can be updated.
This should be overridden for real use in the UIS app. The
default is here for test purposes.
"""

INIT_DATA_WAIT_TIME = 5. # seconds
INIT_DATA_RETRY_DELAY = 0.5 # seconds
RECONCILE_TIMEOUT = 5. # seconds
PENDING_DELTA_CHECK_INTERVAL = 0.5

def __init__(self, workflows_mgr, log):
def __init__(self, workflows_mgr, log, max_threads=10):
self.workflows_mgr = workflows_mgr
self.log = log
self.data = {}
self.w_subs: Dict[str, WorkflowSubscriber] = {}
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.loop = None
self.executor = ThreadPoolExecutor()
self.executor = ThreadPoolExecutor(max_threads)
self.delta_queues = {}

@log_call
Expand Down

0 comments on commit 4df09a5

Please sign in to comment.