-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws_ecs_management_tool.py
238 lines (217 loc) · 10.6 KB
/
aws_ecs_management_tool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
## This is the mfa automation script embedded in an ecs tool.
## It brings individual services up/down, or as a 'cluster' (all of the services that exist in the cluster.)
import click
import re
import boto3
import botocore
import botocore.errorfactory
from botocore.config import Config
from os import path
from colorama import Fore, Style
## This function creates the mfa profile config in the ~/.aws/config file.
def create_mfa_profile():
file_path = path.expanduser('~/.aws/config')
with open(file_path, 'r+') as f:
lines = f.readlines() # Save all the lines in a variable.
f.seek(0) # Set the cursor at the beginning of the file.
for line in lines:
reg = re.match('\[profile mfa]', line) # Create a regex pattern and check if it exists in each line.
if not line.strip():
pass
elif not reg:
f.write(line)
else:
break
else:
region = input(Fore.LIGHTCYAN_EX + "Please enter your region for the mfa profile: " + Style.RESET_ALL)
f.write("\n")
f.write(f'[profile mfa]\nregion = {region}')
## This function checks if your mfa credential have expired.
def check_if_mfa_expired():
session = boto3.Session(profile_name='mfa')
client = session.client('sts')
client.get_caller_identity()
## This function configures the credentials in the client variable in order for boto3 APIs to work.
def aws_configuration():
global client, session
session = boto3.Session(profile_name='mfa')
client = session.client('ecs')
## This function clears the screen and prints the tool's details and menu.
def set_menu():
print (Fore.LIGHTBLUE_EX + "Menu")
print ("-----------------")
print ("1) Manual Mode")
print ("2) Cluster Mode")
print ("Q) Exit\n" + Style.RESET_ALL)
## This function converts CamelCase to snake_case for the output of aws mfa to match the credentials file format.
def change_case(camel_key):
reg_pattern = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_key)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', reg_pattern).lower()
## This function creates the temporary mfa credentials that will be inserted into the ~/.aws/credentials file.
def configure_mfa():
global mfa_creds
user = input(Fore.LIGHTCYAN_EX + "Enter your aws username: " + Style.RESET_ALL)
account_id = int(input(Fore.LIGHTCYAN_EX + "Enter your aws account id: " + Style.RESET_ALL))
mfa = input(Fore.LIGHTCYAN_EX + "Enter the mfa code: " + Style.RESET_ALL)
session = boto3.Session() # Please enter profile_name='<PROFILE>' if you are using profiles and not the default profile.
client = session.client('sts')
mfa_creds = client.get_session_token(
DurationSeconds = 129600,
SerialNumber = f'arn:aws:iam::{account_id}:mfa/{user}',
TokenCode = mfa
)
## This function writes the credentials generated by the first function into the credentials file.
def write_mfa_to_credentials_file():
file_path = path.expanduser('~/.aws/credentials')
with open(file_path, 'r+') as f:
lines = f.readlines() # Save all the lines in a variable.
f.seek(0) # Set the cursor at the beginning of the file.
for line in lines:
reg = re.match('\[mfa]', line) # Create a regex pattern and check if it exists in each line.
if not line.strip():
pass
elif not reg:
f.write(line)
else:
break
f.write("\n")
f.write('[mfa]\n')
del mfa_creds['ResponseMetadata']
del mfa_creds['Credentials']['Expiration'] # Delete unnecessary items in the dictionary.
for value in mfa_creds.values():
for key, value in value.items():
camel_key = key
key = f'aws_{change_case(camel_key)}'
f.write('%s = %s\n' % (key, value)) # Write the necessary items in a 'key = value' format instead of dict, after stripping the inner dict from the outer dict.
## This function starts and stops the ecs services based on the users input.
def stop_and_start_ecs_services():
global cluster_name
cluster_name = input(Fore.LIGHTCYAN_EX + "Please enter the name of your cluster: ")
task_def = input("Please enter the task definition name you want to use for the service: ")
service_name = input("Please enter the service to update: ")
count = int(input("Please enter the number of tasks to create (1) or delete (0) for each service: " + Style.RESET_ALL))
while count > 1:
print(Fore.LIGHTRED_EX + "Running more than 1 task is not accepted by the admins." + Style.RESET_ALL)
count = int(input(Fore.LIGHTCYAN_EX + "Please enter the number of tasks to create (1) or delete (0) for each service: " + Style.RESET_ALL))
client.update_service(
cluster = cluster_name,
service = service_name,
desiredCount = count,
taskDefinition = task_def,
forceNewDeployment = False,
)
print(Fore.LIGHTGREEN_EX + "The script has finished successfully." + Style.RESET_ALL)
## This function starts and stops the ecs services in a cluster mode with a for loop.
def stop_and_start_ecs_services_cluster():
global cluster_name
cluster_name = input(Fore.LIGHTCYAN_EX + "Please enter the name of your cluster: " + Style.RESET_ALL)
count = int(input(Fore.LIGHTCYAN_EX + "Please enter the number of tasks to create (1) or delete (0) for each service: " + Style.RESET_ALL))
while count > 1:
print(Fore.LIGHTRED_EX + "Running more than 1 task is not accepted by the admins." + Style.RESET_ALL)
count = int(input(Fore.LIGHTCYAN_EX + "Please enter the number of tasks to create (1) or delete (0) for each service: " + Style.RESET_ALL))
services = client.list_services(
cluster=cluster_name
)
services = services['serviceArns']
for service in services:
service = service.split("/")[2]
client.update_service(
cluster = cluster_name,
service = service,
desiredCount = count,
taskDefinition = service,
forceNewDeployment = False,
)
print(Fore.LIGHTGREEN_EX + "The script has finished successfully." + Style.RESET_ALL)
## This function handles the exception if the user entered a wrong cluster name
def cluster_not_found_exception():
cluster_answer = input(Fore.LIGHTRED_EX + "The cluster is not found. Do you want to see the available clusters? (y|n) " + Style.RESET_ALL).lower()
if cluster_answer == 'y':
cluster_list = client.list_clusters()
cluster_list = cluster_list['clusterArns']
for cluster in cluster_list:
cluster = cluster.split("/")[1]
print(cluster)
elif cluster_answer == 'n':
print(Fore.LIGHTGREEN_EX + "Bye" + Style.RESET_ALL)
quit()
else:
print(Fore.LIGHTRED_EX + "An invalid answer, exiting the script." + Style.RESET_ALL)
quit()
## This function handles the exception if the user entered a wrong task definition.
def task_not_found_exception():
task_def_answer = input(Fore.LIGHTRED_EX + "Task Definition is not found. Do you want to see the available task definitions? (y|n) " + Style.RESET_ALL).lower()
if task_def_answer == 'y':
task_def_list = client.list_task_definitions(
status='ACTIVE',
sort='ASC',
)
task_def_list = task_def_list['taskDefinitionArns']
for task_definition in task_def_list:
task_definition = task_definition.split("/")[1]
print(task_definition)
elif task_def_answer == 'n':
print(Fore.LIGHTGREEN_EX + "Bye" + Style.RESET_ALL)
quit()
else:
print(Fore.LIGHTRED_EX + "An invalid answer, exiting the script." + Style.RESET_ALL)
quit()
## This function handles the exception if the user entered a wrong service.
def service_not_found_exception():
service_answer = input(Fore.LIGHTRED_EX + "The service is not found. Do you want to see the available services? (y|n) " + Style.RESET_ALL).lower()
if service_answer == 'y':
service_list = client.list_services(
cluster=cluster_name
)
service_list = service_list['serviceArns']
for service in service_list:
service = service.split("/")[2]
print(service)
elif service_answer == 'n':
print(Fore.LIGHTGREEN_EX + "Bye" + Style.RESET_ALL)
quit()
else:
print(Fore.LIGHTRED_EX + "An invalid answer, exiting the script." + Style.RESET_ALL)
quit()
## Main script
click.clear()
print(Fore.LIGHTYELLOW_EX + "===================\nECS-MANAGEMENT-TOOL\n===================\nAuthor: Eylon\nVersion: 1.0.5\n" + Style.RESET_ALL)
while True:
try:
create_mfa_profile()
check_if_mfa_expired()
aws_configuration()
set_menu()
choice = input(Fore.LIGHTCYAN_EX + 'Enter your choice: ' + Style.RESET_ALL).lower()
if choice == '1':
stop_and_start_ecs_services()
elif choice == '2':
stop_and_start_ecs_services_cluster()
elif choice == 'q':
exit()
else:
print(Fore.LIGHTRED_EX + f'Not a correct choice: <{choice}>, try again' + Style.RESET_ALL)
break
## In case the user doesn't have mfa credentials in the ~/.aws/credentials file.
except botocore.exceptions.NoCredentialsError:
print(Fore.LIGHTRED_EX + "\nYou don't have the mfa credentials set, please enter the credentials." + Style.RESET_ALL)
configure_mfa()
write_mfa_to_credentials_file()
except botocore.exceptions.ClientError as error:
## In case the credentials have expired.
if error.response['Error']['Code'] == 'ExpiredToken':
print(Fore.LIGHTRED_EX + "\nYour credentials have expired, please enter the credentials." + Style.RESET_ALL)
configure_mfa()
write_mfa_to_credentials_file()
## In case the cluster name is not correct.
elif error.response['Error']['Code'] == 'ClusterNotFoundException':
cluster_not_found_exception()
## In case the task definition is not correct.
elif error.response['Error']['Code'] == 'ClientException':
task_not_found_exception()
## In case the service is not correct.
elif error.response['Error']['Code'] == 'ServiceNotFoundException':
service_not_found_exception()
## In case the user entered a string instead of an int in the tasks number.
except ValueError:
print(Fore.LIGHTRED_EX + "You've entered a string instead of a number, please try again." + Style.RESET_ALL)