-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmongo.py
130 lines (86 loc) · 4.96 KB
/
mongo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import pymongo
from connections import Constant
class Mongo():
def __init__(self, target_uri, target_db):
## mognodb+srv://{ACCOUNT}:{PASSWORD}@{HOST}:{PORT}/{DB_NAME}
self.target_uri = target_uri
self.target_db = target_db
def get_conn(self):
return pymongo.MongoClient(self.target_uri).get_database(self.target_db)
def list_collection_names(self):
return self.get_conn().list_collection_names()
def count_documents(self, collection_name):
return self.get_conn().get_collection(collection_name).count_documents({})
def list_indexes(self, collection_name):
return self.get_conn().get_collection(collection_name).list_indexes()
def index_information(self, collection):
return self.get_conn().get_collection(collection).index_information()
def create_index(self, collection_name, index_name):
return self.get_conn().get_collection(collection_name).create_index(index_name, background=True)
class Handler():
def __init__(self, source, target, db):
self.source = source
self.target = target
self.db = db
self.source_conn = Mongo(source, db)
self.target_conn = Mongo(target, db)
def count_diff(self):
source_list = []
target_list = []
for col in self.source_conn.list_collection_names():
source_list.append(col + "_" + str(self.source_conn.count_documents(col)))
for col in self.target_conn.list_collection_names():
target_list.append(col + "_" + str(self.target_conn.count_documents(col)))
return print(set(source_list) - set(target_list))
def index_diff(self):
source_index_list = []
target_index_list = []
for col in self.source_conn.list_collection_names():
for index in self.source_conn.list_indexes(col):
## append "{'_id': 1}_orders" as example
source_index_list.append(str(index.to_dict()['key']) + "_" + str(index.to_dict()['ns'].split('.')[1]))
for col in self.target_conn.list_collection_names():
for index in self.target_conn.list_indexes(col):
## append "{'_id': 1}_orders" as example
target_index_list.append(str(index.to_dict()['key']) + "_" + str(index.to_dict()['ns'].split('.')[1]))
return print(set(source_index_list) - set(target_index_list))
def single_index_diff(self, collection_name):
source_index_list = []
target_index_list = []
for index in self.source_conn.list_indexes(collection_name):
## append "{'_id': 1}_orders" as example
source_index_list.append(str(index.to_dict()['key']) + "_" + str(index.to_dict()['ns'].split('.')[1]))
for index in self.target_conn.list_indexes(collection_name):
## append "{'_id': 1}_orders" as example
target_index_list.append(str(index.to_dict()['key']) + "_" + str(index.to_dict()['ns'].split('.')[1]))
return print((set(target_index_list) - set(source_index_list)), (set(source_index_list) - set(target_index_list)))
def single_collection_indexes_migrate(self, collection_name):
print(f'collection: {collection_name}')
for indexes in list(self.source_conn.index_information(collection_name).values()):
to_be_create_indexes = []
print(f"""source index: {indexes.get('key')} """)
for index in indexes.get('key'):
list(index)[1] = int(list(index)[1])
to_be_create_indexes.append((index[0] , int(list(index)[1])))
print(f'target index: {to_be_create_indexes}')
self.target_conn.create_index(collection_name, to_be_create_indexes) if to_be_create_indexes != [('_id', 1)] else print(f'exception: {to_be_create_indexes}')
print(f'{collection_name} collection indexes done migrated')
def index_migrate(self):
for col in self.source_conn.list_collection_names():
print(f'collection: {col}')
for indexes in list(self.source_conn.index_information(col).values()):
to_be_create_indexes = []
print(f"""source index: {indexes.get('key')} """)
for index in indexes.get('key'):
list(index)[1] = int(list(index)[1])
to_be_create_indexes.append((index[0] , int(list(index)[1])))
print(f'target index: {to_be_create_indexes}')
self.target_conn.create_index(col, to_be_create_indexes) if to_be_create_indexes != [('_id', 1)] else print(f'exception: {to_be_create_indexes}')
print(f'{col} collection indexes done migrated')
if __name__ == "__main__":
source_uri = Constant.dw3_client
target_uri = Constant.atlas_client
db = Constant.db
# Handler(source_uri, target_uri, db).index_migrate()
Handler(source_uri, target_uri, db).single_collection_indexes_migrate('orders')
Handler(source_uri, target_uri, db).single_index_diff('orders')