-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpopulate_database.py
127 lines (106 loc) · 3.51 KB
/
populate_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import sqlite3
from faker import Faker
import random
fake = Faker()
DATABASE = "patient_management.db"
DOCTORS = [
"Dr. Susan Brown",
"Dr. Michael Green",
"Dr. Laura White",
"Dr. Robert Johnson",
"Dr. Emily Davis",
]
def generate_patients(n=50):
patients = []
for _ in range(n):
name = fake.name()
first_name = name.split()[0]
last_name = name.split()[-1]
email = f"{first_name[0].lower()}{last_name.lower()}@example.com"
age = random.randint(18, 90)
gender = random.choice(["Male", "Female"])
phone = fake.phone_number()
patients.append((name, age, gender, email, phone))
return patients
def is_valid_appointment(patient_id, doctor, appointment_date, cursor):
cursor.execute(
"""
SELECT COUNT(*)
FROM Appointments
WHERE doctor = ? AND appointment_date = ? AND patient_id != ?
""",
(doctor, appointment_date, patient_id),
)
conflict_count = cursor.fetchone()[0]
return conflict_count == 0
def generate_appointments(patient_ids, n=100):
appointments = []
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
for _ in range(n):
patient_id = random.choice(patient_ids)
appointment_date = fake.date_between(start_date="-2y", end_date="+10d")
doctor = random.choice(DOCTORS)
# Ensure the appointment is valid
if is_valid_appointment(patient_id, doctor, appointment_date, cursor):
appointments.append((patient_id, appointment_date, doctor))
return appointments
def generate_medical_history(patient_ids, n=100):
diagnoses = [
"Hypertension",
"Diabetes",
"Critical Condition",
"Asthma",
"Healthy",
"Minor Illness",
"Chronic Condition",
]
medications = [
"Paracetamol",
"Metformin",
"Aspirin",
"Albuterol",
"Ibuprofen",
"Lisinopril",
"Simvastatin",
"Amoxicillin",
"Omeprazole",
]
medical_history = []
for _ in range(n):
patient_id = random.choice(patient_ids)
diagnosis = random.choice(diagnoses)
medication = random.choice(medications)
doctor = random.choice(DOCTORS)
hospital = fake.company()
diagnosis_date = fake.date_between(start_date="-1y", end_date="today")
treatment = f"{medication} prescribed by {doctor} at {hospital}"
medical_history.append((patient_id, diagnosis, diagnosis_date, treatment))
return medical_history
def populate_database():
conn = sqlite3.connect("patient_management.db")
cursor = conn.cursor()
patients = generate_patients()
cursor.executemany(
"INSERT INTO Patients (name, age, gender, email, phone) VALUES (?, ?, ?, ?, ?)",
patients,
)
conn.commit()
cursor.execute("SELECT id FROM Patients")
patient_ids = [row[0] for row in cursor.fetchall()]
appointments = generate_appointments(patient_ids)
cursor.executemany(
"INSERT INTO Appointments (patient_id, appointment_date, doctor) VALUES (?, ?, ?)",
appointments,
)
conn.commit()
medical_history = generate_medical_history(patient_ids)
cursor.executemany(
"INSERT INTO MedicalHistory (patient_id, condition, diagnosis_date, treatment) VALUES (?, ?, ?, ?)",
medical_history,
)
conn.commit()
conn.close()
print("Database repopulated successfully!")
if __name__ == "__main__":
populate_database()