-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_parser.py
237 lines (188 loc) · 7.35 KB
/
data_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# this file will parse the data into json
import csv
import json
import os
import re
import requests
from biothings.utils.dataload import dict_convert, dict_sweep
GENE_ID = "geneid"
GENE_SYMBOL = "genesymbol"
MARKER_RESOURCE = "markerresource"
FILES = ["all_cell_markers.txt", "Single_cell_markers.txt"]
def get_gene_symbol(gene_ids: str) -> str:
"""The function that make an API request to my mygene.info, requesting for symbol using gene ID
Args:
gene_id (str): the id of the gene
Returns:
(str) the symbol of the gene
"""
headers = {"content-type": "application/x-www-form-urlencoded"}
params = f"ids={gene_ids}&fields=symbol"
try:
res = requests.post("http://mygene.info/v3/gene", data=params, headers=headers)
res.raise_for_status()
except requests.RequestException as e:
return {}
return json.loads(res.content.decode("utf-8"))
def str_to_list(listLikeStr: str) -> list:
"""Case str-like-list into actual list, nested list will be expand
Args:
listLikeStr (str): the list like str
Return:
the converted list
>>> str_to_list("A")
['A']
>>> str_to_list("A, B")
['A', 'B']
>>> str_to_list("A B")
['A B']
>>> str_to_list("A, [A, B], C")
['A', 'A', 'B', 'C']
>>> str_to_list("A, B, C, D, [E, F], [G, H I]")
['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H I']
"""
parsed_str = re.sub(r"[\[\]]", "", listLikeStr)
parsed_str_list = parsed_str.split(",")
return [val.strip() for val in parsed_str_list]
def pairUp_seq_info(value_dict: dict) -> list:
"""Pair up all the elements in each value of the dictionary
Args:
value_dict (dict): values that need to be paired up
Returns:
list[dict]: A list of dictionaries where each dict contains a pairing
of keys with corresponding elements from the value lists.
>>> pairUp_seq_info({"a": "1, 2", "b": "3, 4"})
[{'a': '1', 'b': '3'}, {'a': '2', 'b': '4'}]
>>> pairUp_seq_info({"a": '1'})
[{'a': '1'}]
"""
keys = list(value_dict.keys())
values = [str_to_list(value) for value in value_dict.values()]
# when there is mismatch gene and its symbol, we will get gene symbol form my.gene API
if not all(len(v) == len(next(iter(values))) for v in values):
gene_ids = ", ".join(
[
gene_id
for gene_id in str_to_list(value_dict["geneid"])
if gene_id.casefold() != "na" and gene_id != ""
]
)
symbols = get_gene_symbol(gene_ids)
return [
{"geneid": gene_info["_id"], "genesymbol": gene_info["symbol"]}
for gene_info in symbols
]
return [dict(zip(keys, combination)) for combination in zip(*values)]
def make_uniqueMarker(cellMarkers: list) -> list:
"""Make unique markers from a list of cell markers
Args:
cellMarkers (list): list of cell markers
Returns:
list: list of unique cell markers
"""
cellMarkers = list({tuple(sorted(marker.items())) for marker in cellMarkers})
return [dict(marker) for marker in cellMarkers]
def select_items(record, item_keys):
"""Select specific items from a record
Args:
record (dict): the record to select items from
item_keys (list): the keys of the items to select
Returns:
dict: the selected items
"""
return {key: record[key] for key in item_keys if key in record}
def load_data_files(data_folder: str, files: list) -> list:
"""
Load data from a list of files in a specified folder.
Args:
data_folder (str): The path to the folder containing the data files.
files (list): A list of filenames to be loaded from the data folder.
Returns:
list[dict]: A list of dictionaries containing the data from the files.
Raises:
FileNotFoundError: If any of the specified files do not exist in the data folder.
"""
data = []
for file in files:
file_path = os.path.join(data_folder, file)
if not os.path.exists(file_path):
raise FileNotFoundError(f"Missing file: {file_path}")
with open(file_path, mode="r") as f:
reader = csv.DictReader(f, delimiter="\t", quoting=csv.QUOTE_NONE)
data.extend(reader)
return data
def load_cellMarkers(data_folder):
"""Converting data into expected JSON format
Args:
data_folder (str): the relative data path to the data source
Return:
the yield JSON data
"""
# load data
data = load_data_files(data_folder, FILES)
results = {}
for record in data:
# converting all the key to standard format
record = dict_convert(record, keyfn=lambda k: k.replace(" ", "_").lower())
# ignore geneID that is missing or contains non-numeric value
if (
record[GENE_ID].casefold() == "na"
or not record[GENE_ID].isnumeric()
or record[GENE_ID].casefold() == ""
):
continue
# zip these elements together to get multiple copies
for gene_id in str_to_list(record[GENE_ID]):
_id = gene_id
if _id.casefold() == "na" or _id.casefold() == "":
continue
results.setdefault(_id, {})
gene_id_dict = results[_id]
# identify source key
if record["markerresource"].casefold() != "company":
resource_key = "pmid"
record_resource_key = "pmid"
else:
resource_key = "company"
record_resource_key = "company"
# if tissuetype is undefined, we will make it empty
if record["tissuetype"].casefold() == "undefined":
record["tissuetype"] = ""
gene_id_dict.setdefault("cellmarker", []).append(
dict_sweep(
{
"cellontology": record["cellontologyid"]
.replace(" ", "_")
.lower(),
"cellname": record["cellname"].replace(" ", "_").lower(),
"celltype": record["celltype"].replace(" ", "_").lower(),
"cancertype": record["cancertype"].replace(" ", "_").lower(),
"tissue": record["tissuetype"].replace(" ", "_").lower(),
"uberon": record["uberonontologyid"].replace(" ", "_").lower(),
"species": record["speciestype"].replace(" ", "_").lower(),
"marker_resource": record["markerresource"]
.replace(" ", "_")
.lower(),
f"{resource_key}": record[f"{record_resource_key}"]
.replace(" ", "_")
.lower(),
}
)
)
# return each gene_id with yield and remove duplicate from the dictionary
for _id, related_info in results.items():
yield {
"_id": _id,
"cellmarker": make_uniqueMarker(related_info["cellmarker"]),
}
# if __name__ == "__main__":
# import doctest
# doctest.testmod()
# x = load_cellMarkers("data")
# y = [i for i in x]
# remember = []
# # check if all the geneid is number
# for i in y:
# if not i["_id"].isnumeric():
# remember.append(i["_id"])
# breakpoint()