-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathring_with_distributed.jl
158 lines (111 loc) · 2.22 KB
/
ring_with_distributed.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
using Hwloc
Hwloc.num_physical_cores()
using Distributed
nprocs()
nworkers()
addprocs(4)
workers()
@fetch begin
println(myid())
rand(2, 2)
end
@sync begin
pids = workers()
@spawnat pids[1] (sleep(2); println("Today is reverse day!"))
@spawnat pids[2] (sleep(1); println(" class!"))
@spawnat pids[3] println("Hello")
end;
println("Done!")
@everywhere begin # execute this block on all workers
using Random
function complicated_calculation()
sleep(1)
randexp(5) # lives in Random
end
end
@fetch complicated_calculation()
ch = Channel{Int}(5)
isready(ch)
put!(ch, 3)
isready(ch)
take!(ch)
put!(ch, 4)
fetch(ch)
take!(ch)
isready(ch)
const mychannel = RemoteChannel(() -> Channel{Int}(10), workers()[2])
function whohas(s::String)
@everywhere begin
var = Symbol($s)
if isdefined(Main, var)
println("$var exists.")
else
println("Doesn't exist.")
end
end
nothing
end
whohas("mychannel")
@everywhere const mychannel = $mychannel
# +
whohas("mychannel")
# +
function do_something()
rc = RemoteChannel(() -> Channel{Int}(10)) # lives on the master
@sync for p in workers()
@spawnat p put!(rc, myid())
end
rc
end
r = do_something()
# -
using Distributed, BenchmarkTools;
rmprocs(workers());
addprocs(4);
nworkers();
# +
# serial version - count heads in a series of coin tosses
function add_serial(n)
c = 0
for i = 1:n
c += rand(Bool)
end
c
end
@btime add_serial(200_000_000);
# +
# distributed version
function add_distributed(n)
c = @distributed (+) for i = 1:n
Int(rand(Bool))
end
c
end
@btime add_distributed(200_000_000);
# +
# verbose distributed version
function add_distributed(n)
c = @distributed (+) for i = 1:n
x = Int(rand(Bool))
println(x)
x
end
c
end
add_distributed(8);
# -
@everywhere using SharedArrays # must be loaded everywhere
A = rand(2, 3)
S = SharedArray(A)
# +
function fill_shared_problematic(N)
S = SharedMatrix{Int64}(N, N)
@sync @distributed for i = 1:length(S) # added @sync here
S[i] = i
end
S
end
S = fill_shared_problematic(100)
minimum(S)
# -
minimum(S)