forked from kind-lab/transformer-deid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconvert_data_to_gs.py
410 lines (322 loc) · 13 KB
/
convert_data_to_gs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# reformats various datasets for de-identification into our format
import argparse
import os
import sys
import xml.etree.ElementTree as ET
import logging
import pandas as pd
from tqdm import tqdm
from functools import partial
logging.basicConfig(level=logging.WARNING,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
_LOGGER = logging.getLogger(__name__)
def parse_args():
parser = argparse.ArgumentParser(description='Convert i2b2 annotations')
parser.add_argument('-d',
'--data_type',
type=str,
default=None,
required=True,
choices=[
'i2b2_2006', 'i2b2_2014', 'physionet',
'physionet_google', 'opendeid'
],
help='source dataset (impacts processing)')
parser.add_argument('-i',
'--input',
type=str,
default=None,
required=True,
help='folder or file to convert')
parser.add_argument('-o',
'--output',
type=str,
default=None,
required=True,
help='folder to output converted annotations')
# optionally also output a single CSV with all the data
parser.add_argument('-q',
'--quiet',
action='store_true',
help='suppress peasants discussing their work')
# optionally only create annotation files
parser.add_argument(
'-a',
'--annotation_only',
default=False,
required=False,
help=
'if True, saves only annotation files; if False (default) returns both annotations and plain text'
)
args = parser.parse_args()
return args
# define a dictionary of constant values for each dataset
i2b2_2014 = {'tag_list': ['id', 'start', 'end', 'text', 'TYPE', 'comment']}
physionet_gs = {
'columns':
['patient_id', 'record_id', 'start', 'stop', 'entity_type', 'entity']
}
physionet_google = {'columns': ['record_id', 'begin', 'length', 'type']}
opendeid = {'tag_list': ['id', 'start', 'end', 'text', 'TYPE']}
# our output dataframe will have consistent columns
COLUMN_NAMES = [
'document_id', 'annotation_id', 'start', 'stop', 'entity', 'entity_type',
'comment'
]
def load_physionet_text(text_filename):
"""Loads text from the PhysioNet id.text file.
Output
reports - list with each element being the text of a single record
document_ids - list with the document_id for the record
"""
reports, document_ids = [], []
with open(text_filename, 'r') as fp:
END_OF_RECORD = True
reader = fp.readlines()
if _LOGGER.level <= 20:
reader = tqdm(reader)
for line in reader:
if END_OF_RECORD:
# skip empty rows
if line == '\n':
continue
# make sure this is the start of a new record
if line[0:16] != 'START_OF_RECORD=':
raise ValueError(
'Record ended, but "START_OF_RECORD" not found in next line.'
)
line = line[16:].split('||||')
# last element will be newline, so we ignore it
text = []
pt_id = line[0]
doc_id = line[1]
END_OF_RECORD = False
continue
if line == '||||END_OF_RECORD\n':
END_OF_RECORD = True
reports.append(''.join(text))
document_id = pt_id + '-' + doc_id
document_ids.append(document_id)
continue
text.append(line)
return reports, document_ids
def load_physionet_gs(input_path):
text_filename = os.path.join(input_path, 'id.text')
ann_filename = os.path.join(input_path, 'id-phi.phrase')
_LOGGER.info(f'Loading text from {text_filename}')
# read in text into list of lists
# each sublist has:
# patient id, record id, text
reports, document_ids = load_physionet_text(text_filename)
_LOGGER.info(f'Loading annotations from {ann_filename}')
# load in PHI annotations
annotations = []
with open(ann_filename, 'r') as fp:
reader = fp.readlines()
if _LOGGER.level <= 20:
reader = tqdm(reader)
for line in reader:
annot = line[0:-1].split(' ')
# reconstitute final entity as it may have a space
annotations.append(annot[0:5] + [' '.join(annot[5:])])
# convert annotations to dataframe
df = pd.DataFrame(annotations, columns=physionet_gs['columns'])
# unique document identifier is 'pt_id-rec_id'
df['document_id'] = df['patient_id'] + '-' + df['record_id']
df.drop(['patient_id', 'record_id'], axis=1, inplace=True)
df['start'] = df['start'].astype(int)
df['stop'] = df['stop'].astype(int)
# create other columns for needed output fields
df.sort_values(['document_id', 'start', 'stop'], inplace=True)
df['annotation_id'] = df.groupby('document_id').cumcount() + 1
df['comment'] = None
return reports, df, document_ids
def load_physionet_google(input_path):
text_filename = os.path.join(input_path, 'id.text')
ann_filename = os.path.join(input_path,
'I2B2-2014-Relabeled-PhysionetGoldCorpus.csv')
_LOGGER.info(f'Loading text from {text_filename}')
# read in text into list of lists
# each sublist has:
# patient id, record id, text
reports, document_ids = load_physionet_text(text_filename)
_LOGGER.info(f'Loading annotations from {ann_filename}')
# load in PHI annotations
df = pd.read_csv(ann_filename, header=0, sep=',')
# unique document identifier is 'pt_id-rec_id'
df['document_id'] = df['record_id'].apply(
lambda x: '-'.join(x.split('||||')[:2]))
df['start'] = df['begin'].astype(int)
df['stop'] = df['start'] + df['length'].astype(int)
df.drop(['record_id', 'begin', 'length'], axis=1, inplace=True)
df.rename(columns={'type': 'entity_type'}, inplace=True)
# create other columns for needed output fields
df.sort_values(['document_id', 'start', 'stop'], inplace=True)
df['annotation_id'] = df.groupby('document_id').cumcount() + 1
df['comment'] = None
# add the entity to the annotation dataframe
entities = []
for i, row in df.iterrows():
idx = document_ids.index(row['document_id'])
entities.append(reports[idx][row['start']:row['stop']])
df['entity'] = entities
return reports, df, document_ids
def load_i2b2_2014_format_xml(input_path,
taglist=i2b2_2014['tag_list'],
comments=True):
files = os.listdir(input_path)
# filter to files of a given extension
files = [f for f in files if f.endswith('.xml')]
if len(files) == 0:
print(f'No files found in folder {input_path}')
return None, None, None
_LOGGER.info(f'Processing {len(files)} files found in {input_path}')
if _LOGGER.level <= 20:
files = tqdm(files)
records, annotations, document_ids = [], [], []
for f in files:
_LOGGER.info(f'Loading annotations from {f}')
# document ID is filename minus last extension
document_id = f.split('.')
if len(document_id) > 1:
document_id = '.'.join(document_id[0:-1])
else:
document_id = document_id[0]
# load as XML tree
fn = os.path.join(input_path, f)
with open(fn, 'r', encoding='UTF-8') as fp:
xml_data = fp.read()
tree = ET.fromstring(xml_data)
# get the text from TEXT field
text = tree.find('TEXT')
if text is not None:
text = text.text
else:
print(f'WARNING: {fn} did not have any text.')
# the <TAGS> section has deid annotations
tags_xml = tree.find('TAGS')
# example tag:
# <DATE id="P0" start="16" end="20" text="2069" TYPE="DATE" comment="" />
addition = []
if not comments:
addition = ['']
tags = list()
for tag in tags_xml:
tags.append([document_id] + [tag.get(t)
for t in taglist] + addition)
records.append(text)
annotations.extend(tags)
document_ids.append(document_id)
# convert annotations to dataframe
annotations = pd.DataFrame(annotations, columns=COLUMN_NAMES)
annotations['start'] = annotations['start'].astype(int)
annotations['stop'] = annotations['stop'].astype(int)
return records, annotations, document_ids
def load_i2b2_2006(input_path):
# input_path should be the name of a file
# text_filename = os.path.join(input_path, 'id.text')
# ann_filename = os.path.join(input_path, 'id-phi.phrase')
# e.g. i2b2_2006/deid_surrogate_test_all_groundtruth_version2.xml
# load as XML tree
with open(input_path, 'r', encoding='UTF-8') as fp:
xml_data = fp.read()
tree = ET.fromstring(xml_data)
reader = tree.iter('RECORD')
N = len(tree.findall('RECORD'))
_LOGGER.info(f'Processing {N} records found in {input_path}')
if _LOGGER.level <= 20:
reader = tqdm(tree.iter('RECORD'), total=N)
records, ann, document_ids = [], [], []
# get the text from TEXT field
for record in reader:
document_id = record.get('ID')
# the <TEXT> element contains text like so:
# <TEXT>This is a note, with <PHI TYPE="NAME">Peter's</PHI> name</TEXT>
# need to iterate through the elements and track offsets
# also build the text string along the way
text_tag = record.find('TEXT')
n = 0
# initialize with text in the text tag
text = text_tag.text
n += len(text)
ann_id = 0
for t in list(text_tag):
if t.tag == 'PHI':
start = n
stop = n + len(t.text)
ann.append(
[document_id, ann_id, start, stop, t.text,
t.get('TYPE')])
text += t.text
n += len(t.text)
text += t.tail
n += len(t.tail)
records.append(text)
document_ids.append(document_id)
# convert annotations to dataframe
ann = pd.DataFrame(ann,
columns=[
'document_id', 'annotation_id', 'start', 'stop',
'entity', 'entity_type'
])
ann['start'] = ann['start'].astype(int)
ann['stop'] = ann['stop'].astype(int)
ann['comment'] = None
return records, ann, document_ids
def get_data_type_info(data_type):
if data_type == 'i2b2_2014':
return load_i2b2_2014_format_xml
elif data_type == 'physionet':
return load_physionet_gs
elif data_type == 'physionet_google':
return load_physionet_google
elif data_type == 'i2b2_2006':
return load_i2b2_2006
elif data_type == 'opendeid':
load_opendeid = partial(load_i2b2_2014_format_xml,
taglist=opendeid['tag_list'],
comments=False)
return load_opendeid
else:
raise ValueError(f'Unrecognized: --data {data_type}')
def main():
args = parse_args()
input_path = args.input
out_path = args.output
verbose_flag = not args.quiet
annotations_only = args.annotation_only
if verbose_flag:
_LOGGER.setLevel(logging.INFO)
# prep output folders if they don't exist
if not os.path.exists(out_path):
os.mkdir(out_path)
if not os.path.exists(os.path.join(out_path, 'ann')):
os.mkdir(os.path.join(out_path, 'ann'))
if not os.path.exists(os.path.join(out_path, 'txt')):
os.mkdir(os.path.join(out_path, 'txt'))
load_dataset = get_data_type_info(args.data_type)
reports, annotations, document_ids = load_dataset(input_path)
if document_ids is None:
# no data was loaded
return
_LOGGER.info(f'Writing out annotation and text files.')
if _LOGGER.level <= 20:
document_ids = tqdm(document_ids)
# loop through reports to output files
for i, document_id in enumerate(document_ids):
idx = annotations['document_id'] == document_id
df_out = annotations.loc[idx, COLUMN_NAMES]
# output dataframe style PHI
df_out.to_csv(os.path.join(out_path, 'ann', document_id + '.gs'),
index=False)
if not annotations_only:
with open(os.path.join(out_path, 'txt', document_id + '.txt'),
'w') as fp:
fp.write(reports[i])
i += 1
_LOGGER.info(f'Success!')
_LOGGER.info(f'Output {i} files to {out_path}{os.sep}ann')
_LOGGER.info(f' and {i} files to {out_path}{os.sep}txt')
if __name__ == '__main__':
main()