-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
282 lines (216 loc) · 9.18 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import pytest
from fastapi.testclient import TestClient
from main import app
from datetime import datetime, timedelta
from db import get_db_connection
# Create a test client
client = TestClient(app)
# Mock data for testing
mock_user_data = {
"name": "John Doe",
"email": "john.doe@example.com",
"phone": "+1 (555) 123 4567",
"badge_code": "test-badge-code",
}
mock_scan_data = {"activity_name": "Workshop on AI", "activity_category": "workshop"}
VALID_USER_ID = (
get_db_connection().cursor().execute("SELECT id FROM users LIMIT 1").fetchone()[0]
)
# Utility function to generate unique scan names for testing
def generate_unique_activity_name():
return f"Activity-{datetime.utcnow().isoformat()}"
### TESTS ###
# Test to ensure root endpoint works
def test_read_root():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"Hello": "World"}
# Test retrieving users
def test_read_users():
response = client.get("/users")
assert response.status_code == 200
assert isinstance(response.json(), list)
# Test updating a user with valid data and verify it is updated
def test_update_user():
updated_data = {"name": "Jane Doe", "phone": "+1 (555) 987 6543"}
# Update the user
response = client.put(f"/users/{VALID_USER_ID}", json=updated_data)
assert response.status_code == 200
json_response = response.json()
assert json_response["name"] == updated_data["name"]
assert json_response["phone"] == updated_data["phone"]
# Fetch the updated user and verify changes in the database
get_response = client.get("/users")
assert get_response.status_code == 200
# Find the updated user in the response
user = next((u for u in get_response.json() if u["id"] == VALID_USER_ID), None)
assert user is not None
assert user["name"] == updated_data["name"]
assert user["phone"] == updated_data["phone"]
# Test updating user with no fields provided and verify no change
def test_update_user_no_fields():
# Get the user details before the update attempt
initial_response = client.get("/users")
initial_user = next(
(u for u in initial_response.json() if u["id"] == VALID_USER_ID), None
)
# Make sure the initial user is found
assert initial_user is not None
# Attempt to update with no fields
response = client.put(f"/users/{VALID_USER_ID}", json={})
assert response.status_code == 400 # Custom HTTPException for no fields provided
# Get the user details again after the failed update
final_response = client.get("/users")
final_user = next(
(u for u in final_response.json() if u["id"] == VALID_USER_ID), None
)
# Verify that the user's details remain unchanged
assert final_user is not None
assert final_user == initial_user # No changes should have occurred
# Test updating user with invalid phone number
def test_update_user_invalid_phone():
response = client.put(f"/users/{VALID_USER_ID}", json={"phone": "invalid-phone"})
assert response.status_code == 422 # Pydantic should catch this
# Test adding a scan for a valid user
def test_add_scan():
unique_activity_name = generate_unique_activity_name()
scan_data = {"activity_name": unique_activity_name, "activity_category": "workshop"}
response = client.post(f"/scan/{VALID_USER_ID}", json=scan_data)
assert response.status_code == 200
json_response = response.json()
assert json_response["message"] == "Scan added successfully"
assert json_response["activity"] == unique_activity_name
# Test adding a scan for a non-existent user
def test_add_scan_nonexistent_user():
response = client.post("/scan/nonexistent-user-id", json=mock_scan_data)
assert response.status_code == 404 # HTTPException for user not found
# Test retrieving scan frequency without filters
def test_get_scans_no_filters():
response = client.get("/scans")
assert response.status_code == 200
assert isinstance(response.json(), list)
# Test retrieving scan frequency with filters (min_frequency)
def test_get_scans_with_min_frequency():
response = client.get("/scans?min_frequency=1")
assert response.status_code == 200
assert isinstance(response.json(), list)
# Ensure all returned scans meet the minimum frequency
for scan in response.json():
assert scan["frequency"] >= 1
# Test retrieving scan frequency with activity category filter
def test_get_scans_with_activity_category():
response = client.get("/scans?activity_category=workshop")
assert response.status_code == 200
assert isinstance(response.json(), list)
# Ensure all returned scans belong to the correct category
for scan in response.json():
assert scan["activity_category"] == "workshop"
# Test retrieving scans with both min and max frequency filters
def test_get_scans_with_min_and_max_frequency():
response = client.get("/scans?min_frequency=1&max_frequency=10")
assert response.status_code == 200
assert isinstance(response.json(), list)
# Ensure all scans meet both frequency filters
for scan in response.json():
assert 1 <= scan["frequency"] <= 10
# Test adding multiple scans and checking the frequency
def test_multiple_scan_entries():
activity_name = generate_unique_activity_name()
# Add multiple scans for the same activity
for _ in range(3):
client.post(
f"/scan/{VALID_USER_ID}",
json={"activity_name": activity_name, "activity_category": "test-category"},
)
# Check scan frequency
response = client.get("/scans?activity_category=test-category")
assert response.status_code == 200
# Find the added activity in the response
scan_data = [
scan for scan in response.json() if scan["activity_name"] == activity_name
]
assert len(scan_data) == 1
assert scan_data[0]["frequency"] == 3
# Utility function to insert a scan with a specific time
def insert_scan(user_id: str, activity_name: str, scanned_at: str):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO scans (user_id, activity_name, activity_category, scanned_at)
VALUES (?, ?, ?, ?)
""",
(user_id, activity_name, "test-category", scanned_at),
)
conn.commit()
conn.close()
# Utility function to generate a specific datetime string
def generate_datetime(offset_hours=0, offset_minutes=0):
return (
datetime.utcnow() + timedelta(hours=offset_hours, minutes=offset_minutes)
).isoformat()
# Test clustering by hour
def test_cluster_scans_by_hour():
unique_activity_name = f"Activity-{datetime.utcnow().isoformat()}"
current_time = generate_datetime()
# Insert scans within the current hour
for _ in range(3):
insert_scan(VALID_USER_ID, unique_activity_name, current_time)
response = client.get(
f"/scans/clusters?activity_name={unique_activity_name}&time_unit=hour"
)
assert response.status_code == 200
assert len(response.json()) >= 1
# Verify correct grouping by hour
result = response.json()[0]
assert result["activity_name"] == unique_activity_name
assert result["scan_count"] == 3
# Test clustering by minute
def test_cluster_scans_by_minute():
unique_activity_name = f"Activity-{datetime.utcnow().isoformat()}"
current_time = generate_datetime()
# Insert scans within the current minute
for _ in range(2):
insert_scan(VALID_USER_ID, unique_activity_name, current_time)
response = client.get(
f"/scans/clusters?activity_name={unique_activity_name}&time_unit=minute"
)
assert response.status_code == 200
assert len(response.json()) >= 1
# Verify correct grouping by minute
result = response.json()[0]
assert result["activity_name"] == unique_activity_name
assert result["scan_count"] == 2
# Test clustering with time range filters
def test_cluster_scans_with_time_range():
unique_activity_name = f"Activity-{datetime.utcnow().isoformat()}"
# Insert scans at different times
earlier_time = generate_datetime(offset_hours=-1)
current_time = generate_datetime()
insert_scan(VALID_USER_ID, unique_activity_name, earlier_time)
insert_scan(VALID_USER_ID, unique_activity_name, current_time)
response = client.get(
f"/scans/clusters?activity_name={unique_activity_name}&start_time={earlier_time}&end_time={current_time}&time_unit=hour"
)
assert response.status_code == 200
assert len(response.json()) >= 1
# Verify that both scans are within the time range
total_scans = sum([result["scan_count"] for result in response.json()])
assert total_scans == 2
# Test when no data is available
def test_cluster_scans_no_data():
response = client.get(
"/scans/clusters?activity_name=NonExistentActivity&time_unit=hour"
)
assert response.status_code == 200
assert response.json() == []
# Test invalid time format
def test_cluster_scans_invalid_time_format():
response = client.get(
"/scans/clusters?activity_name=Workshop&start_time=invalid-time"
)
assert response.status_code == 400
assert (
response.json()["detail"]
== "start_time and end_time must be in ISO 8601 format (YYYY-MM-DDTHH:MM:SS)"
)