-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler.py
90 lines (72 loc) · 3.29 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
A task scheduler that assign unfinished jobs to different workers.
"""
import numpy as np
def get_unplotted_indices(vals, xcoordinates, ycoordinates=None):
"""
Args:
vals: values at (x, y), with value -1 when the value is not yet calculated.
xcoordinates: x locations, i.e.,[-1, -0.5, 0, 0.5, 1]
ycoordinates: y locations, i.e.,[-1, -0.5, 0, 0.5, 1]
Returns:
- a list of indices into vals for points that have not yet been calculated.
- a list of corresponding coordinates, with one x/y coordinate per row.
"""
# Create a list of indices into the vectorizes vals
inds = np.array(range(vals.size))
# Select the indices of the un-recorded entries, assuming un-recorded entries
# will be smaller than zero. In case some vals (other than loss values) are
# negative and those indexces will be selected again and calcualted over and over.
inds = inds[vals.ravel() <= 0]
# Make lists containing the x- and y-coodinates of the points to be plotted
if ycoordinates is not None:
# If the plot is 2D, then use meshgrid to enumerate all coordinates in the 2D mesh
xcoord_mesh, ycoord_mesh = np.meshgrid(xcoordinates, ycoordinates)
s1 = xcoord_mesh.ravel()[inds]
s2 = ycoord_mesh.ravel()[inds]
return inds, np.c_[s1,s2]
else:
return inds, xcoordinates.ravel()[inds]
def split_inds(num_inds, nproc):
"""
Evenly slice out a set of jobs that are handled by each MPI process.
- Assuming each job takes the same amount of time.
- Each process handles an (approx) equal size slice of jobs.
- If the number of processes is larger than rows to divide up, then some
high-rank processes will receive an empty slice rows, e.g., there will be
3, 2, 2, 2 jobs assigned to rank0, rank1, rank2, rank3 given 9 jobs with 4
MPI processes.
"""
chunk = num_inds // nproc
remainder = num_inds % nproc
splitted_idx = []
for rank in range(0, nproc):
# Set the starting index for this slice
start_idx = rank * chunk + min(rank, remainder)
# The stopping index can't go beyond the end of the array
stop_idx = start_idx + chunk + (rank < remainder)
splitted_idx.append(range(start_idx, stop_idx))
return splitted_idx
def get_job_indices(vals, xcoordinates, ycoordinates, comm):
"""
Prepare the job indices over which coordinate to calculate.
Args:
vals: the value matrix
xcoordinates: x locations, i.e.,[-1, -0.5, 0, 0.5, 1]
ycoordinates: y locations, i.e.,[-1, -0.5, 0, 0.5, 1]
comm: MPI environment
Returns:
inds: indices that splitted for current rank
coords: coordinates for current rank
inds_nums: max number of indices for all ranks
"""
inds, coords = get_unplotted_indices(vals, xcoordinates, ycoordinates)
rank = 0 if comm is None else comm.Get_rank()
nproc = 1 if comm is None else comm.Get_size()
splitted_idx = split_inds(len(inds), nproc)
# Split the indices over the available MPI processes
inds = inds[splitted_idx[rank]]
coords = coords[splitted_idx[rank]]
# Figure out the number of jobs that each MPI process needs to calculate.
inds_nums = [len(idx) for idx in splitted_idx]
return inds, coords, inds_nums