-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathdatabase.py
179 lines (160 loc) · 6.77 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from datetime import datetime
import os
from pathlib import Path
import aiosqlite
class Sqlite:
@classmethod
async def init(cls) -> None:
os.makedirs(Path(__file__).resolve().parent / "data", exist_ok=True)
cls.conn = await aiosqlite.connect(
Path(__file__).resolve().parent / "data" / "data.db"
)
await cls.exec("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uid INTEGER NOT NULL,
length INTEGER NOT NULL,
sex TEXT NOT NULL,
time DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
await cls.exec("""
CREATE TABLE IF NOT EXISTS records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uid INTEGER NOT NULL,
action TEXT NOT NULL,
origin_length INTEGER NOT NULL,
diff INTEGER NOT NULL,
new_length INTEGER NOT NULL,
time DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
await cls.exec("CREATE INDEX IF NOT EXISTS idx_length ON users(length DESC)")
@classmethod
async def exec(cls, sql: str, *args) -> list[dict] | None:
"""
执行自定义 SQL 语句并返回结果。
:param sql: 要执行的 SQL 语句。
:param args: SQL 语句中的参数。
:return: 查询结果的字典列表,如果是非查询语句则返回 None。
"""
async with cls.conn.cursor() as cursor:
await cursor.execute(sql, args)
if "SELECT" in sql.strip().upper():
results = await cursor.fetchall()
if not results:
return None
column_names = [description[0] for description in cursor.description]
return [dict(zip(column_names, row)) for row in results]
else:
await cls.conn.commit()
return None
@classmethod
async def json2db(cls, file_data) -> None:
async with cls.conn.cursor() as cursor:
await cursor.execute("DELETE FROM users")
await cls.conn.commit()
mixed_data = []
for groups in file_data:
mixed_data.extend(
{
"uid": int(user_id),
"length": user_length,
"sex": "boy" if user_length > 0 else "girl",
}
for user_id, user_length in groups.items()
)
for i in mixed_data:
if not await cls.insert("users", i, {"uid": i["uid"]}):
await cls.update("users", i, {"uid": i["uid"]})
@classmethod
async def query(
cls,
table: str,
columns: list | None = None,
conditions: dict | None = None,
order_by: str | None = None,
limit: int | None = None,
) -> list[dict]:
"""
根据条件查询数据。
:param table: 要查询的表名。
:param columns: 要查询的列名列表,如果不指定则查询所有列。
:param conditions: 查询条件,字典格式,键为字段名,值为条件值。
:param order_by: 排序条件,例如 "time DESC"。
:param limit: 限制结果数量。
:return: 查询结果的字典列表。
"""
columns_str = ", ".join(columns) if columns else "*"
query = f"SELECT {columns_str} FROM {table}"
if conditions:
query += " WHERE " + " AND ".join(
[f"{key} = ?" for key in conditions.keys()]
)
if order_by:
query += f" ORDER BY {order_by}"
if limit is not None:
query += f" LIMIT {limit}"
async with cls.conn.cursor() as cursor:
await cursor.execute(
query, tuple(conditions.values()) if conditions else ()
)
result = await cursor.fetchall()
if not result:
return []
column_names = [description[0] for description in cursor.description]
return [dict(zip(column_names, row)) for row in result]
@classmethod
async def insert(
cls, table: str, data: dict, conditions: dict | None = None
) -> bool:
"""
插入数据到指定表中。如果提供了条件,则先检查是否存在符合条件的记录,如果存在则不插入。
:param table: 要插入数据的表名。
:param data: 要插入的数据,字典格式,键为字段名,值为数据值。
:param conditions: 插入条件,字典格式,键为字段名,值为条件值。
:return: 插入成功返回 True,否则返回 False。
"""
data["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if conditions and await cls.query(table, conditions=conditions):
return False
columns = ", ".join(data.keys())
placeholders = ", ".join(["?"] * len(data))
query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
async with cls.conn.cursor() as cursor:
await cursor.execute(query, tuple(data.values()))
await cls.conn.commit()
return True
@classmethod
async def update(cls, table: str, data: dict, conditions: dict) -> bool:
"""
更新符合条件的数据。
:param table: 要更新数据的表名。
:param data: 要更新的数据,字典格式,键为字段名,值为数据值。
:param conditions: 更新条件,字典格式,键为字段名,值为条件值。
:return: True
"""
data["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
set_clause = ", ".join([f"{key} = ?" for key in data])
where_clause = " AND ".join([f"{key} = ?" for key in conditions])
query = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
async with cls.conn.cursor() as cursor:
await cursor.execute(
query, tuple(list(data.values()) + list(conditions.values()))
)
await cls.conn.commit()
return True
@classmethod
async def delete(cls, table: str, conditions: dict) -> bool:
"""
删除符合条件的数据。
:param table: 要删除数据的表名。
:param conditions: 删除条件,字典格式,键为字段名,值为条件值。
:return: True
"""
where_clause = " AND ".join([f"{key} = ?" for key in conditions])
query = f"DELETE FROM {table} WHERE {where_clause}"
async with cls.conn.cursor() as cursor:
await cursor.execute(query, tuple(conditions.values()))
await cls.conn.commit()
return True