From 21574ae600c37c22c930b3c4cf7a38e4a665859f Mon Sep 17 00:00:00 2001 From: coulm Date: Mon, 4 Nov 2024 18:11:14 +0100 Subject: [PATCH] add test --- app.py | 70 +++++++++-------- auth_utils.py | 0 def_app.py | 0 def_session.py | 178 ++++++++++++++++++++++++++++++++++++++++++ pytest.ini | 2 + requirements.txt | 6 +- tests/test_def.py | 168 +++++++++++++++++++++++++++++++++++++++ tests/test_session.py | 0 8 files changed, 388 insertions(+), 36 deletions(-) create mode 100644 auth_utils.py create mode 100644 def_app.py create mode 100644 def_session.py create mode 100644 pytest.ini create mode 100644 tests/test_def.py create mode 100644 tests/test_session.py diff --git a/app.py b/app.py index 8fe478e..180ff98 100755 --- a/app.py +++ b/app.py @@ -121,11 +121,10 @@ def dir_in_dir(path): def images_in_dir(path): - # Spécifiez le chemin du dossier à scanner - image_list=[] - for racine, soudoss, fichiers in sorted(os.walk(path)): + image_list = [] + for racine, _, fichiers in sorted(os.walk(path)): for fichier in fichiers: - if fichier != "integrity.check" or "metadata.json": + if fichier not in ["integrity.check", "metadata.json"]: image_list.append(fichier) return image_list @@ -139,6 +138,39 @@ def need_update(): else: return True +def create_zip_from_list(text_list, project_name): + # Utiliser un fichier en mémoire pour stocker le zip + memory_file = io.BytesIO() + + # Chemin du projet + project_path = os.path.join(TEMP_FOLDER, project_name) + os.makedirs(project_path, exist_ok=True) # Créer le dossier si nécessaire + + # Créer le fichier zip + with zipfile.ZipFile(memory_file, 'w') as zf: + for element in text_list: + # Créer le nom du fichier, par exemple "file_1.txt" + filename = '.'.join(element['filename'].split('.')[:-1]) + filename = f"{filename}.txt" + + # Chemin complet du fichier + file_path = f'{TEMP_FOLDER}/{project_name}/{filename}' + # Créer un fichier txt pour chaque ligne + with open(file_path, 'a') as f: + f.write('#objet x y width height \n') + for region in element['regions']: + + result_string = f"{region['region_attributes']['objet'],region['shape_attributes']['x'],region['shape_attributes']['y'],region['shape_attributes']['width'],region['shape_attributes']['height']}" + with open(file_path, 'a') as f: + f.write(f'{result_string}\n') + + # Ajouter chaque fichier dans le zip + zf.write(file_path, filename) + os.remove(f'{file_path}') + + memory_file.seek(0) # Rewind the file pointer to the beginning + return memory_file + @app.route('/', methods=['GET', 'POST']) def login(): @@ -488,36 +520,6 @@ def overview_dir(dirname): return jsonify({'message': 'Authentication failed'}), 401 -def create_zip_from_list(text_list,projet_name): - # Utiliser un fichier en mémoire pour stocker le zip - memory_file = io.BytesIO() - - with zipfile.ZipFile(memory_file, 'w') as zf: - for element in text_list: - # Créer le nom du fichier, par exemple "file_1.txt" - filename = '.'.join(element['filename'].split('.')[:-1]) - filename = f"{filename}.txt" - - # Chemin complet du fichier - file_path = f'{TEMP_FOLDER}/{projet_name}/{filename}' - # Créer un fichier txt pour chaque ligne - with open(file_path, 'a') as f: - f.write('#objet x y width height \n') - for region in element['regions']: - - result_string = f"{region['region_attributes']['objet'],region['shape_attributes']['x'],region['shape_attributes']['y'],region['shape_attributes']['width'],region['shape_attributes']['height']}" - with open(file_path, 'a') as f: - f.write(f'{result_string}\n') - - # Ajouter chaque fichier dans le zip - zf.write(file_path, filename) - os.remove(f'{file_path}') - - - - # Revenir au début du fichier en mémoire - memory_file.seek(0) - return memory_file diff --git a/auth_utils.py b/auth_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/def_app.py b/def_app.py new file mode 100644 index 0000000..e69de29 diff --git a/def_session.py b/def_session.py new file mode 100644 index 0000000..817a2b8 --- /dev/null +++ b/def_session.py @@ -0,0 +1,178 @@ +import os +import json +from pathlib import Path +from werkzeug.utils import secure_filename +from dotenv import load_dotenv +import uuid +from bson import json_util +import io +from git import Repo +import zipfile +from user_agents import parse +from functools import wraps +from flask import g, request, redirect, url_for, flash + +# from email.mime.multipart import MIMEMultipart +# from email.mime.text import MIMEText +import smtplib +#from flask_mail import Mail, Message +import json +from flask import flash, session +from PIL import Image +from functools import wraps +from flask import Flask, jsonify, redirect, render_template, request, session, url_for, send_file, jsonify +import requests +import os +import json +from pathlib import Path +from werkzeug.utils import secure_filename +from dotenv import load_dotenv +import uuid +from bson import json_util +from PIL import Image +import io +from git import Repo +import os +import zipfile +from user_agents import parse + + + +UUID_MACHINE=os.getenv('UUID_MACHINE') +SERVER_URL = os.environ.get("SERVER_URL") + + + +def get_token(email, password,ip_address,user_agent_parsed): + url = f'{SERVER_URL}/users/token' + data = {'email': email, 'password': password,"uuid_machine":UUID_MACHINE,"ip_address":ip_address,"user_agent": f"{user_agent_parsed}"} + headers = {'Content-Type': 'application/json'} + try: + response = requests.post(url, json=data, headers=headers) + response.raise_for_status() # Raises an HTTPError if the response status code is 4XX/5XX + print('reponce au token',response.json()) + print('Raw response:', response.text) + return response.json().get('access_token') ,response.json().get('last_login') + except requests.exceptions.RequestException as e: + print(f"Request failed: {e}") + return None ,None + + +def get_current_user(): + token = session.get('token') + print('token',token) + if token: + headers = { + 'Authorization': f'Bearer {token}', + } + print('headers',headers,f'url {SERVER_URL}/users/protected_route') + response = requests.post(f'{SERVER_URL}/users/protected_route', headers=headers) + + # headers = {'Authorization': f'Bearer {token}'} + # response = requests.get(f'{SERVER_URL}/users/protected_route', headers=headers) + + if response.status_code == 200: + user_data = response.json() + return user_data + if response.status_code == 401: + print("Authentication failed") + return logout(),401 + return login() + +def check_authentication(func): + @wraps(func) + def wrapper(*args, **kwargs): + # Check if the current route is the "predict_route" + if request.endpoint == 'predict_route': + # Extract the token from the request's form + token = request.form['token'] + headers = {'Authorization': f'Bearer {token}'} + response = requests.post(f'{SERVER_URL}/users/protected_route', headers=headers) + if response.status_code == 200: + # Do not print the result here + pass + if response.status_code == 401: + print("Authentication failed") + return logout(),401 + else: + # Check for the token in the session + token = session.get('token') + if token: + # Verify the token using the get_token function + headers = {'Authorization': f'Bearer {token}'} + response = requests.post(f'{SERVER_URL}/users/protected_route', headers=headers) + if response.status_code == 200: + # Do not print the result here + pass + else: + print(f"Failed to access protected resource. Status code: {response.status_code}") + + return func(*args, **kwargs) + else: + print("Authentication failed") + return logout(),401 + + return wrapper + +def compter_elements_dans_dossier(dossier): + return len([f for f in sorted(os.listdir(dossier)) if os.path.isfile(os.path.join(dossier, f))]) + +def dir_in_dir(path): + dir_list=[] + for racine, sous_dossiers, fichiers in sorted(os.walk(path)): + for sous_dossier in sous_dossiers: + print("dir_in_dir",os.path.join(racine, sous_dossier)) + image_number = compter_elements_dans_dossier(f'{racine}/{sous_dossier}') + dir_list.append([sous_dossier,image_number]) + return dir_list + + +def images_in_dir(path): + # Spécifiez le chemin du dossier à scanner + image_list=[] + for racine, soudoss, fichiers in sorted(os.walk(path)): + for fichier in fichiers: + if fichier != "integrity.check" or "metadata.json": + image_list.append(fichier) + return image_list + +def need_update(): + repo = Repo('.') + current_commit = repo.head.commit + repo.remotes.origin.fetch() + origin_main_commit = repo.commit('origin/main') + if current_commit == origin_main_commit: + return False + else: + return True +def create_zip_from_list(text_list,projet_name): + # Utiliser un fichier en mémoire pour stocker le zip + memory_file = io.BytesIO() + + with zipfile.ZipFile(memory_file, 'w') as zf: + for element in text_list: + # Créer le nom du fichier, par exemple "file_1.txt" + filename = '.'.join(element['filename'].split('.')[:-1]) + filename = f"{filename}.txt" + + # Chemin complet du fichier + file_path = f'{TEMP_FOLDER}/{projet_name}/{filename}' + # Créer un fichier txt pour chaque ligne + with open(file_path, 'a') as f: + f.write('#objet x y width height \n') + for region in element['regions']: + + result_string = f"{region['region_attributes']['objet'],region['shape_attributes']['x'],region['shape_attributes']['y'],region['shape_attributes']['width'],region['shape_attributes']['height']}" + with open(file_path, 'a') as f: + f.write(f'{result_string}\n') + + # Ajouter chaque fichier dans le zip + zf.write(file_path, filename) + os.remove(f'{file_path}') + + + + # Revenir au début du fichier en mémoire + memory_file.seek(0) + return memory_file + diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..03f586d --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +pythonpath = . \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3f02559..e3b3259 100755 --- a/requirements.txt +++ b/requirements.txt @@ -8,10 +8,12 @@ itsdangerous==2.1.2 Jinja2==3.1.2 MarkupSafe==2.1.3 Pillow==10.1.0 -requests==2.31.0 +requests urllib3==2.1.0 Werkzeug==3.0.1 python-dotenv pymongo gitpython -user-agents \ No newline at end of file +user-agents +pytest +# pytest-asyncio..httpx \ No newline at end of file diff --git a/tests/test_def.py b/tests/test_def.py new file mode 100644 index 0000000..41219e0 --- /dev/null +++ b/tests/test_def.py @@ -0,0 +1,168 @@ +import pytest +from unittest.mock import patch, Mock, MagicMock +import requests +import os +import zipfile +import io +from app import (app,get_token, get_current_user, check_authentication, compter_elements_dans_dossier, dir_in_dir, images_in_dir, need_update, create_zip_from_list) +from git import Repo +from flask import session ,Flask + +from unittest.mock import patch, Mock + +@patch('requests.post') +def test_get_token_success(mock_post): + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'access_token': 'test_token', 'last_login': '2023-10-10T10:00:00'} + mock_post.return_value = mock_response + token, last_login = get_token('test@example.com', 'password', '192.168.1.1', 'TestAgent') + assert token == 'test_token' + assert last_login == '2023-10-10T10:00:00' + + +@patch('requests.post') +def test_get_token_failure(mock_post): + mock_post.side_effect = requests.exceptions.RequestException("Connection error") + token, last_login = get_token('test@example.com', 'password', '192.168.1.1', 'TestAgent') + assert token is None + assert last_login is None + + + +@patch('requests.post') +def test_get_current_user_authenticated(mock_post): + with app.test_request_context(): + with patch('flask.session.get') as mock_session_get: + # Simuler un token valide dans la session + mock_session_get.return_value = 'test_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'user_id': 1, 'username': 'testuser'} + mock_post.return_value = mock_response + + # Appel de la fonction + user_data = get_current_user() + + # Vérification + assert user_data == {'user_id': 1, 'username': 'testuser'} + + +# @patch.dict('flask.session', {'token': None}) # Simuler l'absence de token dans la session +# @patch('requests.post') +# def test_get_current_user_unauthenticated(mock_post): +# # Appel de la fonction +# user_data = get_current_user() + +# # Vérification : vérifier si la fonction retourne un appel à `login()` +# assert user_data == login() + +# from unittest.mock import patch, Mock + +# # Créer une instance de Flask pour les tests +# app = Flask(__name__) +# app.secret_key = 'test_secret' # Nécessaire pour utiliser la session +# @patch.dict('flask.session', {'token': None}) # Simuler l'absence de token dans la session +# # Décorateur de test pour simuler le contexte de l'application +# def test_check_authentication_with_token(): +# with app.test_request_context(): # Crée un contexte de requête +# with patch('session.get') as mock_session_get, patch('requests.post') as mock_post: +# # Simuler un token valide et une réponse 200 +# mock_session_get.return_value = 'test_token' +# mock_response = Mock() +# mock_response.status_code = 200 +# mock_post.return_value = mock_response + +# # Décorateur de vérification de l'authentification +# @check_authentication +# def protected_function(): +# return "Function accessed" + +# # Appel de la fonction protégée +# result = protected_function() + +# # Vérification +# assert result == "Function accessed" + + + +@patch('os.listdir') +@patch('os.path.isfile') +def test_compter_elements_dans_dossier(mock_isfile, mock_listdir): + mock_listdir.return_value = ['file1.txt', 'file2.txt', 'dir1'] + mock_isfile.side_effect = lambda x: not x.endswith('dir1') + count = compter_elements_dans_dossier('/path/to/folder') + assert count == 2 + + +@patch('os.walk') +@patch('app.compter_elements_dans_dossier') # Correction ici avec le chemin complet +def test_dir_in_dir(mock_compter, mock_walk): + mock_walk.return_value = [('/path', ['dir1', 'dir2'], [])] + mock_compter.side_effect = lambda x: 5 if 'dir1' in x else 3 + result = dir_in_dir('/path') + assert result == [['dir1', 5], ['dir2', 3]] + + +@patch('os.walk') +def test_images_in_dir(mock_walk): + mock_walk.return_value = [ + ('/path', [], ['image1.jpg', 'metadata.json', 'image2.png']) + ] + result = images_in_dir('/path') + assert result == ['image1.jpg', 'image2.png'] + +from unittest.mock import patch, MagicMock +from git import Repo + +from unittest.mock import patch, MagicMock +from git import Repo + +# @patch('git.Repo') +# def test_need_update(mock_repo): +# # Création d'une instance mockée pour Repo +# mock_repo_instance = mock_repo.return_value + +# # Mock du commit local +# mock_local_commit = MagicMock() +# mock_local_commit.hexsha = 'commit1' # Définir le hash du commit local +# mock_repo_instance.head.commit = mock_local_commit + +# # Mock de la méthode fetch et du commit sur origin/main +# mock_repo_instance.remotes.origin.fetch.return_value = None # Simuler la méthode fetch + +# # Mock du commit sur origin/main +# mock_origin_commit = MagicMock() +# mock_origin_commit.hexsha = 'commit1' # Simuler que origin/main a le même commit +# mock_repo_instance.commit.return_value = mock_origin_commit # Configurer la méthode commit pour retourner le commit simulé + +# # Exécuter la fonction et vérifier le résultat +# result = need_update() +# assert result is False # On s'attend à ce que cela retourne False car les commits correspondent + +# # Changer le commit distant pour simuler une mise à jour +# mock_origin_commit.hexsha = 'commit2' # Maintenant, le commit distant est différent + +# # Exécuter la fonction et vérifier le résultat à nouveau +# result = need_update() +# assert result is True # Maintenant, on s'attend à True car il y a une mise à jour disponible + + +import tempfile +def test_create_zip_from_list(): + global TEMP_FOLDER + with tempfile.TemporaryDirectory() as temp_dir: + TEMP_FOLDER = temp_dir + text_list = [{'filename': 'file1.jpg', 'regions': [{'region_attributes': {'objet': 'object1'}, 'shape_attributes': {'x': 1, 'y': 2, 'width': 10, 'height': 20}}]}] + + memory_file = create_zip_from_list(text_list, 'project_name') + assert memory_file is not None + assert memory_file.getvalue() + + with zipfile.ZipFile(memory_file) as zf: + assert 'file1.txt' in zf.namelist() + with zf.open('file1.txt') as f: + content = f.read() + if content: + assert content + # assert content == b'Some content here' \ No newline at end of file diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..e69de29