11
11
import numpy as np
12
12
import matplotlib .pyplot as plt
13
13
from typing import List , Union , Optional , Callable
14
+ from dask .distributed import Client
14
15
from IPython .display import display , HTML
15
16
import cartopy .crs as ccrs
16
17
import cartopy .feature as cfeature
17
18
18
19
from pvdeg .scenario import Scenario
19
20
21
+ pvdeg .geospatial .start_dask
22
+
20
23
class GeospatialScenario (Scenario ):
21
24
def __init__ (
22
25
self ,
@@ -33,6 +36,7 @@ def __init__(
33
36
meta_data : pd .DataFrame = None ,
34
37
func : Callable = None ,
35
38
template : xr .Dataset = None ,
39
+ dask_client : Client = None ,
36
40
):
37
41
super ().__init__ (
38
42
name = name ,
@@ -49,6 +53,7 @@ def __init__(
49
53
self .hpc = hpc
50
54
self .func = func
51
55
self .template = template
56
+ self .dask_client = dask_client
52
57
self .kdtree = None # sklearn kdtree
53
58
54
59
def __eq__ (self , other ):
@@ -57,8 +62,46 @@ def __eq__(self, other):
57
62
due to larger than memory/out of memory datasets stored in
58
63
GeospatialScenario.weather_data attribute.
59
64
""" )
60
-
61
65
66
+ def start_dask (self , hpc = None ) -> None :
67
+ """
68
+ Starts a dask cluster for parallel processing.
69
+
70
+ Parameters
71
+ ----------
72
+ hpc : dict
73
+ Dictionary containing dask hpc settings (see examples below).
74
+ Supply `None` for a default configuration.
75
+
76
+ Examples
77
+ --------
78
+ Local cluster:
79
+
80
+ .. code-block:: python
81
+
82
+ hpc = {'manager': 'local',
83
+ 'n_workers': 1,
84
+ 'threads_per_worker': 8,
85
+ 'memory_limit': '10GB'}
86
+
87
+ SLURM cluster:
88
+
89
+ .. code-block:: python
90
+
91
+ kestrel = {
92
+ 'manager': 'slurm',
93
+ 'n_jobs': 1, # Max number of nodes used for parallel processing
94
+ 'cores': 104,
95
+ 'memory': '246GB',
96
+ 'account': 'pvsoiling',
97
+ 'walltime': '4:00:00',
98
+ 'processes': 52,
99
+ 'local_directory': '/tmp/scratch',
100
+ 'job_extra_directives': ['-o ./logs/slurm-%j.out'],
101
+ 'death_timeout': 600,}
102
+ """
103
+ self .dask_client = pvdeg .geospatial .start_dask ()
104
+
62
105
# add restoring from gids functionality from nsrdb
63
106
def addLocation (
64
107
self ,
@@ -605,12 +648,13 @@ def run(self, hpc_worker_conf: Optional[dict] = None) -> None:
605
648
Only supports one function at a time. Unlike `Scenario` which supports unlimited conventional pipeline jobs.
606
649
Results are stored in the `GeospatialScenario.results` attribute.
607
650
608
- Creates a dask cluster or client using the hpc_worker_conf parameter .
651
+ Creates a dask client if it has not been initialized previously with `GeospatialScenario.start_dask` .
609
652
610
653
Parameters:
611
654
-----------
612
655
hpc_worker_conf : dict
613
656
Dictionary containing dask hpc settings (see examples below).
657
+ When `None`, a default configuration is used.
614
658
615
659
Examples
616
660
--------
@@ -639,7 +683,12 @@ def run(self, hpc_worker_conf: Optional[dict] = None) -> None:
639
683
'job_extra_directives': ['-o ./logs/slurm-%j.out'],
640
684
'death_timeout': 600,}
641
685
"""
642
- client = pvdeg .geospatial .start_dask (hpc = hpc_worker_conf )
686
+ if self .dask_client and hpc_worker_conf :
687
+ raise ValueError (f"Dask Client already exists, cannot configure new client." )
688
+ elif not self .dask_client :
689
+ self .dask_client = pvdeg .geospatial .start_dask (hpc = hpc_worker_conf )
690
+
691
+ print ("Dashboard:" , self .dask_client .dashboard_link )
643
692
644
693
analysis_result = pvdeg .geospatial .analysis (
645
694
weather_ds = self .weather_data ,
@@ -650,7 +699,7 @@ def run(self, hpc_worker_conf: Optional[dict] = None) -> None:
650
699
651
700
self .results = analysis_result
652
701
653
- client .shutdown ()
702
+ self . dask_client .shutdown ()
654
703
655
704
def restore_result_gids (self ):
656
705
"""
@@ -987,6 +1036,15 @@ def format_geospatial_work(self):
987
1036
<p><strong>self.template:</strong> { self .format_template ()} </p>
988
1037
"""
989
1038
1039
+ return ""
1040
+
1041
+ def format_dask_link (self ):
1042
+ if self .dask_client :
1043
+ return f"""
1044
+ <a href="{ self .dask_client .dashboard_link } " target="_blank">{ self .dask_client .dashboard_link } </a></p>
1045
+ """
1046
+ return ""
1047
+
990
1048
def _ipython_display_ (self ):
991
1049
file_url = f"file:///{ os .path .abspath (self .path ).replace (os .sep , '/' )} "
992
1050
html_content = f"""
@@ -1015,6 +1073,10 @@ def _ipython_display_(self):
1015
1073
<h3>self.meta_data</h3>
1016
1074
{ self .format_geo_meta ()}
1017
1075
</div>
1076
+ <div>
1077
+ <h3>self.dask_client</h3>
1078
+ { self .format_dask_link ()}
1079
+ </div>
1018
1080
</div>
1019
1081
<p><i>All attributes can be accessed by the names shown above.</i></p>
1020
1082
<script>
0 commit comments