From e8ad034621e069cc3118e83c79597a0233c9e563 Mon Sep 17 00:00:00 2001 From: renan-francisco Date: Tue, 4 Feb 2025 08:35:11 -0500 Subject: [PATCH] Moving Dask interceptor instance to scheduler_plugin.start https://github.com/dask/distributed/issues/9001 --- src/flowcept/flowceptor/adapters/dask/dask_plugins.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/flowcept/flowceptor/adapters/dask/dask_plugins.py b/src/flowcept/flowceptor/adapters/dask/dask_plugins.py index 83bb3bc4..ecb21b87 100644 --- a/src/flowcept/flowceptor/adapters/dask/dask_plugins.py +++ b/src/flowcept/flowceptor/adapters/dask/dask_plugins.py @@ -3,7 +3,7 @@ from uuid import uuid4 from dask.distributed import WorkerPlugin, SchedulerPlugin -from distributed import Client +from distributed import Client, Scheduler from flowcept import WorkflowObject from flowcept.configs import INSTRUMENTATION @@ -69,8 +69,11 @@ def register_dask_workflow( class FlowceptDaskSchedulerAdapter(SchedulerPlugin): """Dask schedule adapter.""" - def __init__(self, scheduler): - self.address = scheduler.address + def __init__(self): + self.interceptor = None + + def start(self, scheduler: Scheduler) -> None: + """Run this when scheduler starts""" self.interceptor = DaskSchedulerInterceptor(scheduler) def transition(self, key, start, finish, *args, **kwargs):