-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.py
90 lines (71 loc) · 2.51 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import time
import logging
import argparse
from enum import Enum
import grpc
from google.protobuf.empty_pb2 import Empty
from mapper import Mapper
from reducer import Reducer
from driver_service_pb2_grpc import DriverServiceStub
from driver_service_pb2 import TaskType, TaskInfo
SERVER_ADDRESS = 'localhost:50051'
class WorkerState(Enum):
Waiting = 0 # Waiting for driver to start
Idle = 1 # Given task is NoOp
Working = 2 # Doing a Map or Reduce task
class Worker:
def __init__(self):
self._state = WorkerState.Working
self._mapper = Mapper()
self._reducer = Reducer()
def _noop(self):
r'''
NoOp task
'''
# Just one time log idle
if self._state != WorkerState.Idle:
logging.info('idle')
def _ask_task(self) -> TaskInfo:
r'''
Calls AskTask rpc
'''
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = DriverServiceStub(channel)
task = stub.AskTask(Empty())
return task
def run(self) -> None:
r'''
Runs the worker and calls AskTask rpc until shut down signal
'''
while True:
try:
task = self._ask_task()
if task.type == TaskType.Map:
self._state = WorkerState.Working
self._mapper.map(task.id, task.filenames, task.M)
elif task.type == TaskType.Reduce:
self._state = WorkerState.Working
self._reducer.reduce(task.id)
elif task.type == TaskType.NoOp:
self._noop()
self._state = WorkerState.Idle
else: # shut down
return
except grpc._channel._InactiveRpcError:
# Just one time log driver is unavailable
if self._state != WorkerState.Waiting:
logging.info('driver is unavailable')
self._state = WorkerState.Waiting
def get_args() -> str:
r'''
Parses name from arguments
'''
parser = argparse.ArgumentParser(description='Starts a worker.')
parser.add_argument('--name', dest='name', default='', help='worker name')
args = parser.parse_args()
return args.name
if __name__ == '__main__':
name = get_args()
logging.basicConfig(format=f'%(asctime)s worker_{name} %(levelname)s: %(message)s', datefmt='%m/%d/%Y %H:%M:%S', level=logging.INFO)
worker = Worker()
worker.run()