-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathactualTest.py
126 lines (99 loc) · 3.43 KB
/
actualTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from Queue import Queue
# 1. Read the file into batches and put into queue
from multiprocessing import Process, Queue
from threading import Thread
from itertools import islice
class Mulithreader():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
self.processes = []
return rets
# more advanced: uses a worker pool and gets results in order, like Celery
class Batcher():
CMD_JOB = 0
CMD_KILL = 1
def __init__(self, num_workers):
self.jobs = []
self.num_workers = num_workers
@staticmethod
def _worker(in_queue, out_queue):
while True:
# listen for new jobs
cmd, index, job = in_queue.get()
if cmd == Batcher.CMD_JOB:
# process job, return result
# job is a function
func, args, kwargs = job
ret = func(*args, **kwargs)
# print("job:", cmd, index, job)
print("Args: ", args, kwargs, ret)
out_queue.put((index, ret))
elif cmd == Batcher.CMD_KILL:
# time to stop
return
else:
assert False
def enqueue(self, func, *args, **kwargs):
job = (func, args, kwargs)
self.jobs.append(job)
def process(self, num_workers=None):
if num_workers is None:
num_workers = self.num_workers
# spawn workers
in_queue, out_queue = Queue(), Queue()
workers = []
for _ in range(num_workers):
# p = Process(target=self._worker, args=(in_queue, out_queue))
p = Thread(target=self._worker, args=(in_queue, out_queue))
workers.append(p)
p.start()
# put jobs into queue
job_idx = 0
for start in range(0, len(self.jobs), num_workers):
for job in self.jobs[start: start + num_workers]:
print("Input queue: ", job_idx, job)
in_queue.put((Batcher.CMD_JOB, job_idx, job))
job_idx += 1
# get results from workers
results = [None] * len(self.jobs)
for _ in range(len(self.jobs)):
res_idx, res = out_queue.get()
assert results[res_idx] == None
results[res_idx] = res
# stop workers
for _ in range(num_workers):
in_queue.put((Batcher.CMD_KILL, None, None))
for i in range(num_workers):
workers[i].join()
return results
def custom_translate(myList):
return str(myList)
# tester/examples
if __name__ == "__main__":
mp2 = Batcher(num_workers=2)
batch_size = 2
filename = "bla.txt"
with open(filename, 'rb') as f:
for _batch in iter(lambda: tuple(islice(f, batch_size)), ()):
print(_batch)
mp2.enqueue(custom_translate, map(int, _batch))
# mp2.enqueue(sum, map(int,_batch))
ret = mp2.process()
print(ret)