-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase_helper.py
130 lines (105 loc) · 4.48 KB
/
database_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import datetime
import os
from dateutil.relativedelta import relativedelta
import sqlalchemy as db
DATABASE_NAME = 'email'
TABLE_NAME = 'emails'
env = os.getenv("ENV")
if env == "local":
DATABASE_URI = 'sqlite:///email.db'
elif env == "test":
DATABASE_URI = 'sqlite:///:memory:'
engine = db.create_engine(DATABASE_URI)
meta =db.MetaData()
meta.reflect(bind=engine)
def create_database_table():
emails = db.Table(
f'{TABLE_NAME}', meta,
db.Column('id', db.Integer, primary_key=True),
db.Column('created_at', db.DateTime, server_default=db.func.now(), nullable=False),
db.Column('updated_at', db.DateTime, server_default=db.func.now(), nullable=False),
db.Column('message_id', db.String),
db.Column('email_received_date', db.Date, nullable=False),
db.Column('mail_from', db.String),
db.Column('mail_to', db.String),
db.Column('subject', db.String),
db.Column('body', db.Text),
db.Column('payload', db.String)
)
meta.create_all(engine)
def insert_email_record(**kwargs):
email_table = meta.tables[TABLE_NAME]
subject = kwargs.get("subject")
mail_from = kwargs.get("mail_from")
mail_to = kwargs.get("mail_to")
if "received_date" in kwargs:
date = kwargs.get("received_date")
body = kwargs.get("message_body")
payload = kwargs.get("payload")
message_id = kwargs.get("message_id")
insert_command = email_table.insert().values(subject=subject,
mail_from=mail_from,
mail_to=mail_to,
email_received_date=date,
body=body,
payload=payload,
message_id = message_id
)
conn = engine.connect()
result = conn.execute(insert_command)
conn.commit()
conn.close()
def check_message_id(message_id):
email_table = meta.tables[TABLE_NAME]
query = db.select(email_table).where(email_table.c.message_id == message_id)
conn = engine.connect()
result = conn.execute(query)
rows = result.fetchall()
conn.close()
for row in rows:
return True
return False
def dict_to_sqlalchemy_query(group):
email_table = meta.tables[TABLE_NAME]
query = db.select(email_table.c.message_id)
message_ids = []
conjunction = group["select"]
filter_dicts = group["rules"]
filter_conditions = []
for filter_dict in filter_dicts:
if 'field' in filter_dict and 'value' in filter_dict:
field_name = filter_dict['field']
value = filter_dict['value']
if field_name == 'email_received_date':
unit = filter_dict['unit']
if unit == "months":
days = 0
months = int(value)
elif unit == "days":
months = 0
days = int(value)
value = datetime.date.today() - relativedelta(days=days, months=months)
if filter_dict['condition'] == 'contains':
filter_conditions.append(db.text("' ' || {} || ' ' like '%{}%'".format(email_table.columns[field_name], value)))
elif filter_dict['condition'] == 'does not contains':
filter_conditions.append(db.text("' ' || {} || ' ' not like '%{}%'".format(email_table.columns[field_name], value)))
elif filter_dict['condition'] == 'equal to':
filter_conditions.append(email_table.columns[field_name] == value)
elif filter_dict['condition'] == 'does not equal':
filter_conditions.append(email_table.columns[field_name] != value)
elif field_name == 'email_received_date' and filter_dict['condition'] == 'less than':
filter_conditions.append(email_table.columns[field_name] >= value)
elif field_name == 'email_received_date' and filter_dict['condition'] == 'greater than':
filter_conditions.append(email_table.columns[field_name] <= value)
if filter_conditions:
if conjunction == "all":
query = query.where(db.and_(*filter_conditions))
elif conjunction == "any":
query = query.where(db.or_(*filter_conditions))
conn = engine.connect()
result = conn.execute(query)
rows = result.fetchall()
conn.close()
for row in rows:
message_ids.append(row.message_id)
return message_ids