-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsender.py
237 lines (184 loc) · 7.21 KB
/
sender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import socket, os, base64, json
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.fernet import Fernet
from PyTerminalColor.TerminalColor import TerminalColor
class Users:
def __init__(self, users:list=None) -> None:
self.__users = {}
self.create_users(users)
self.colorize = TerminalColor(fgcolor='YELLOW', bgcolor='BLACK', style='BOLD')
def create_users(self, users:list):
'''
create users for authentication using passed list[tuple(username, password)]
'''
try:
for user, password in users:
self.__users[user] = password
return True
except ValueError:
self.colorize.cprint('[!] Users only accepts list of tuples consisting of (username, password). Ignoring passed list.', use_default=False, fgcolor='YELLOW', bgcolor='RED', style='BOLD')
except TypeError:
self.colorize.cprint('[!] No Users were created\n', use_default=False, fgcolor='YELLOW', bgcolor='RED', style='BOLD')
finally:
return False
def auth_user(self, user:str, passwd:str):
'''
authenticates user
'''
if user in self.__users and self.__users[user] == passwd:
return True
return False
def gen_key_from_pass(self, passwd:str)->bytes:
'''
Generates key from password.
'''
salt = b'SecretSalt'
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
passwd = bytes(passwd, encoding='utf-8')
key = base64.urlsafe_b64encode(kdf.derive(passwd))
return key
def encrypt_data(self, passwd_hash:str, data:str)->bytes:
'''
encrypts data which is passed with the help of key as
another parameter and returns encrypted data in form
of bytes
'''
KEY = passwd_hash
if type(data) == str:
data = data.encode('utf-8')
encrypter = Fernet(KEY)
enc_data = encrypter.encrypt(data)
return enc_data
def decrypt_data(self, passwd_hash:str, data:str)->bytes:
'''
decrypts data which is passed with the help of key as
another parameter and returns decrypted data in form
of bytes
'''
KEY = passwd_hash
data = data.encode('utf-8')
decrypter = Fernet(KEY)
dec_data = decrypter.decrypt(data)
return dec_data
class Server:
def __init__(self, ip:str='127.0.0.1', port:int=4444, users:list=None, file_path:str='') -> None:
self.SEPARATOR = "<D|M>"
self.BUFFER_SIZE = 4096
self.FILE_PATH = file_path
self.LINE_SIZE = 60
self.ip = ip
self.port = port
self.users = Users(users)
self.passwd_hash = None
self.colorize = TerminalColor(fgcolor='YELLOW', bgcolor='BLACK', style='BOLD')
def send(self, data:str):
'''
sends data serially
'''
if type(data) == bytes:
data = str(data, encoding='utf-8')
json_data = json.dumps(data)
bytes_json_data = bytes(json_data, encoding='utf-8')
self.connection.send(bytes_json_data)
def receive(self):
'''
receives data serially
'''
bytes_json_data = b''
while True:
try:
bytes_json_data += self.connection.recv(self.BUFFER_SIZE)
data = json.loads(bytes_json_data)
return data
except json.JSONDecodeError:
continue
def close_conn(self):
'''
forcely close connection
'''
print('-'*self.LINE_SIZE)
self.colorize.cprint(f'[!] Closing {self.conn_addr} connection.', use_default=False, fgcolor='YELLOW', bgcolor='RED', style='BOLD')
self.send('exit')
self.connection.close()
print('-'*self.LINE_SIZE)
def authenticate_user(self):
'''
authenticate user before transferring file
'''
# send authentication request
self.send('auth_user')
# accept username and password
username = self.receive()
passwd = self.receive()
if self.users.auth_user(username, passwd):
self.passwd_hash = self.users.gen_key_from_pass(passwd)
return True
return False
def send_file(self):
'''
sends file securely.
'''
if os.name == 'nt':
file_name = self.FILE_PATH.split('\\')[-1]
else:
file_name = self.FILE_PATH.split('/')[-1]
self.colorize.cprint(f'[*] Sending {file_name}', use_default=False, fgcolor='YELLOW')
with open(self.FILE_PATH, "rb") as f:
file_data = f.read()
# encode file data to base64 format
file_data = base64.b64encode(file_data)
# encrypt file data
enc_file_data = self.users.encrypt_data(self.passwd_hash, file_data).decode('utf-8')
# Creating packet
# packet = transfer_send (sep) filename (sep) data
packet = f'transfer_send{self.SEPARATOR}{file_name}{self.SEPARATOR}{str(enc_file_data)}'
# send packet over network
self.send(packet)
# receive acknowledgement
if self.receive() == 'transfer_completed':
return True
return False
def start(self):
'''
starts server
'''
print()
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((self.ip, self.port))
self.server.listen(0)
print('-'*self.LINE_SIZE)
self.colorize.cprint(f'[*] Waiting for incoming connections on {self.ip}:{self.port}')
self.connection, self.conn_addr = self.server.accept()
print('-'*self.LINE_SIZE)
self.colorize.cprint(f'[*] Incoming from {self.conn_addr}')
# auth incoming connection
if self.authenticate_user():
self.colorize.cprint(f'[*] {self.conn_addr} Authenticated successfully. Logged in',use_default=False, fgcolor='GREEN', style='BOLD')
self.send('Authenticated')
print('-'*self.LINE_SIZE)
if self.send_file():
self.colorize.cprint(f'[*] File {self.FILE_PATH} successfully transferred.', use_default=False, fgcolor='GREEN', style='ITALIC')
else:
self.colorize.cprint(f'[!] Transferred Failed', use_default=False, fgcolor='YELLOW', bgcolor='RED', style='BOLD')
else:
# close connection if user is not authenticated
self.colorize.cprint(f'[!] {self.conn_addr} unsuccessfull authentication attempt', use_default=False, fgcolor='YELLOW', bgcolor='RED', style='BOLD')
self.close_conn()
if __name__=='__main__':
FILE_PATH = r'path_to_file'
IP = '127.0.0.1'
PORT = 4444
USERS = [
('1234','1234'),
]
server = Server(IP, PORT, USERS, FILE_PATH)
server.start()