forked from Maxahoy/TDMSData_CDME
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtdms2h5.py
179 lines (149 loc) · 7.96 KB
/
tdms2h5.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import argparse
import h5py
import nptdms
import numpy as np
import re
from contextlib import ExitStack
from pathlib import Path
from typing import Any, AnyStr, Dict, Generator, List, Match, Pattern
VERSION_KEY: str = 'Version'
SLICES_KEY: str = 'TDMSData'
INDEX_KEY: str = 'Index'
LAYER_START_TIME_KEY: str = 'LayerStartTime'
LAYER_END_TIME_KEY: str = 'LayerEndTime'
PART_START_TIME_KEY: str = 'PartStartTime'
PART_END_TIME_KEY: str = 'PartEndTime'
TDMS_GROUP_NAME_KEY: str = 'TDMS_GroupName'
VERTICES_KEY: str = 'Vertices'
def _write_tdms_properties(h5_group: h5py.Group, tdms_dict: Dict[str, Any], replacements: Dict[str, str]) -> None:
key: str
value: Any
for key, value in tdms_dict.items():
if key in replacements:
key = replacements[key]
if isinstance(value, np.datetime64):
h5_group.attrs[key] = str(np.datetime_as_string(value, unit='us', timezone='UTC'))
else:
h5_group.attrs[key] = value
def tdms2h5(input_dir: Path, output_dir: Path, prefix: str, area_offset: int, intensity_offset: int, laser_offset: int, groups: List[str] = [], verbose: bool = False) -> None:
largest_offset = max(area_offset, intensity_offset)
if not output_dir.exists():
if verbose:
print(f'Creating directory \"{output_dir}\"')
output_dir.mkdir(parents=True)
paths_generator: Generator[(Path, None, None)] = input_dir.glob('*.[Tt][Dd][Mm][Ss]')
regex_name: Pattern[AnyStr] = re.compile(fr'{prefix}(\d+)')
with ExitStack() as exitStack:
h5_files: Dict[str, h5py.File] = {}
slice_indices: List[int] = []
path: Path
for path in filter(lambda item: regex_name.search(item.stem), paths_generator):
if verbose:
print(f'Converting \"{path}\"')
match: Match[AnyStr] = regex_name.search(path.stem)
slice_index = int(match.group(1))
slice_indices.append(slice_index)
with nptdms.TdmsFile(path) as tdmsFile:
build_firmware_version:int = 3
try:
print(f'===> Looking for Bitgain OS 1')
bitgain_os_1: float = tdmsFile.properties['Bitgain OS 1']
bitgain_os_2: float = tdmsFile.properties['Bitgain OS 2']
except:
try:
print(f'===> Bitgain OS 1 NOT FOUND.... Looking for Scanner.Bitgain.OS1....')
build_firmware_version = 4
bitgain_os_1: float = tdmsFile.properties['Scanner.Bitgain.OS1']
bitgain_os_2: float = tdmsFile.properties['Scanner.Bitgain.OS2']
except:
print("Scanner.Bitgain.OS1 NOT FOUND. Exiting....")
return
group: nptdms.TdmsGroup
for group in tdmsFile.groups():
if groups and not any(re.match(pattern, group.name) for pattern in groups):
continue
output_file_path = output_dir / f'{group.name}.h5'
if group.name not in h5_files:
h5_files[group.name] = exitStack.enter_context(h5py.File(output_file_path, 'w'))
h5_file = h5_files[group.name]
h5_file.attrs[VERSION_KEY] = build_firmware_version
h5_group = h5_file.create_group(SLICES_KEY)
h5_group.attrs[TDMS_GROUP_NAME_KEY] = group.name
h5_file = h5_files[group.name]
h5_group: h5py.Group = h5_file[SLICES_KEY].create_group(str(slice_index))
layer_replacements = {
'StartTime' : LAYER_START_TIME_KEY,
'EndTime' : LAYER_END_TIME_KEY
}
_write_tdms_properties(h5_group, tdmsFile.properties, layer_replacements)
part_replacements = {
'StartTime' : PART_START_TIME_KEY,
'EndTime' : PART_END_TIME_KEY,
'Part.StartTime': PART_START_TIME_KEY,
'Part.EndTime' : PART_END_TIME_KEY
}
_write_tdms_properties(h5_group, group.properties, part_replacements)
# LaserTTL only uses laser_offset. The end has to be adjusted to make the resulting array consistent
laser_channel: nptdms.TdmsChannel = group['LaserTTL']
laser_end_index: int = len(laser_channel) - (laser_offset)
h5_group.create_dataset(laser_channel.name, data=laser_channel[laser_offset : laser_end_index])
# Intensity and Area use laser_offset
area_channel: nptdms.TdmsChannel = group['Area']
# At this point for illustrative purposes, since the laser_offset is always the largest
end_index: int = len(area_channel) - (area_offset)
h5_group.create_dataset(area_channel.name, data=area_channel[area_offset : end_index])
intensity_channel: nptdms.TdmsChannel = group['Intensity']
end_index: int = len(intensity_channel) - (intensity_offset)
h5_group.create_dataset(intensity_channel.name, data=intensity_channel[intensity_offset : end_index])
# Have not figured out how to correlate parameter to the actual parameter used, just use the same as Laser TTL since it is a machine setting
parameter_channel: nptdms.TdmsChannel = group['Parameter']
h5_group.create_dataset(parameter_channel.name, data=parameter_channel[:])
# X and Y channels just adjust the maximum
x_channel: nptdms.TdmsChannel = group['X-Axis']
x_dataset = h5_group.create_dataset(x_channel.name, data=(x_channel[:] / bitgain_os_1), dtype=np.float32)
x_dataset.attrs['Units'] = 'μm'
y_channel: nptdms.TdmsChannel = group['Y-Axis']
y_dataset = h5_group.create_dataset(y_channel.name, data=(y_channel[:] / bitgain_os_2), dtype=np.float32)
y_dataset.attrs['Units'] = 'μm'
# Resulting slices will be aligned with the same number of data points for each channel
slice_indices = sorted(slice_indices)
for h5_file in h5_files.values():
index_dataset = np.zeros((len(slice_indices), 3), dtype=np.int64)
for i, index in enumerate(slice_indices):
index_dataset[i][0] = index
index_dataset[i][1] = h5_file[SLICES_KEY][str(index)].attrs['layerThickness']
index_dataset[i][2] = h5_file[SLICES_KEY][str(index)]['X-Axis'].size
dataset: h5py.Dataset = h5_file.create_dataset(INDEX_KEY, data=index_dataset)
dataset.attrs['Column0'] = 'SliceIndex'
dataset.attrs['Column1'] = 'LayerThickness (μm)'
dataset.attrs['Column2'] = 'NumVertices'
if verbose:
print('\nWrote files:')
h5_file: h5py.File
for h5_file in h5_files.values():
print(f' \"{h5_file.filename}\"')
def main() -> None:
parser = argparse.ArgumentParser(description='Converts TDMS files to a HDF5 format. Input files must be named in the following regex format \"[prefix]\\d+.tdms\"')
parser.add_argument('input_dir', type=Path, help='Input directory where .tdms files are located')
parser.add_argument('output_dir', type=Path, help='Output directory where .h5 files are generated')
parser.add_argument('prefix', help='Specifies the file prefix to search for as regex.')
parser.add_argument('-g', '--groups', nargs='+', help='Specifies which TDMS groups should be converted')
parser.add_argument('-v', '--verbose', action='store_true', help='Prints additional information while converting')
parser.add_argument('-a', '--area-offset', type=int, default=0, help='Area index offset (CamOffset)')
parser.add_argument('-i', '--intensity-offset', type=int, default=0, help='Intensity index offset (PhdOffset)')
parser.add_argument('-l', '--laser-offset', type=int, default=0, help='Laser index offset')
args = parser.parse_args()
if args.verbose:
print('args:')
print(f' input_dir = \"{args.input_dir}\"')
print(f' output_dir = \"{args.output_dir}\"')
print(f' prefix = \"{args.prefix}\"')
print(f' groups = {args.groups}')
print(f' verbose = {args.verbose}')
print(f' area-offset = {args.area_offset}')
print(f' intensity-offset = {args.intensity_offset}')
print(f' laser-offset = {args.laser_offset}')
print('')
tdms2h5(args.input_dir, args.output_dir, args.prefix, args.area_offset, args.intensity_offset, args.laser_offset, args.groups, args.verbose)
if __name__ == '__main__':
main()