-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreducer.py
74 lines (60 loc) · 1.84 KB
/
reducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import glob
import os
import logging
from typing import List, Dict, Tuple
import grpc
from google.protobuf.empty_pb2 import Empty
from driver_service_pb2_grpc import DriverServiceStub
INTERMEDIATE_DIR = 'intermediate'
OUT_DIR = 'out'
SERVER_ADDRESS = 'localhost:50051'
class WordCounter:
r'''
Stores number of occurrences of each word
'''
def __init__(self):
self._dict: Dict[str, int] = {}
def count(self, word: int) -> None:
r'''
Increases the counter of given word
'''
if word not in self._dict:
self._dict[word] = 0
self._dict[word] += 1
def items(self) -> List[Tuple[str, int]]:
r'''
Returns words and occurrences
'''
return self._dict.items()
class Reducer:
r'''
Performs the reduce task
'''
def _count_bucket(self, bucket_id: int) -> WordCounter:
r'''
Counts each word in bucket intermeriate files
'''
wc = WordCounter()
for bucket_file in glob.glob(f'{INTERMEDIATE_DIR}/mr-*-{bucket_id}'):
with open(bucket_file) as bf:
for word in bf.readlines():
wc.count(word.strip())
return wc
def _finish_reduce(self) -> None:
r'''
Calls FinishReduce rpc
'''
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = DriverServiceStub(channel)
stub.FinishReduce(Empty())
def reduce(self, bucket_id: int) -> None:
r'''
Reduce Task
'''
os.makedirs(OUT_DIR, exist_ok=True)
logging.info('starting reduce %d', bucket_id)
wc = self._count_bucket(bucket_id)
with open(f'{OUT_DIR}/out-{bucket_id}', 'a') as out:
for word, count in wc.items():
out.write(f'{word} {count}\n')
self._finish_reduce()