-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrx_utils.py
178 lines (148 loc) · 6.09 KB
/
rx_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
""" Utilities for RX
name:
status: initial+
v0.0.6 > get_data() input to iq/tau/snr
v0.0.5 > qpsk sum() error fix
last update: (17 May 2024, 00:15)
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# from sklearn.preprocessing import OneHotEncoder
def get_data(modulation, tau, snr, rec=True, NoD=int(1e6)): # noqa
"""
Retrieve data from the local file that keep the data stores in a csv file format and
the data in raw 0-1 bit values and noisy received floating data @RX side including channel and ISI noise
"""
# NoD = int(1e6) # number of test data # noqa
if NoD == -1:
NoD = None # noqa
file_path = 'data/data_{iq}/{iq}_t{tau:.1f}_{s}{snr}.csv'.format(iq=modulation,
tau=tau,
s='s' if isinstance(snr, int) else '',
snr=snr)
# infer data type
if modulation == 'bpsk':
# df = pd.read_csv(file_path, names=['y', 'X'], header=None, nrows=NoD)
df = pd.read_csv(file_path, header=0, nrows=NoD) # infer header from data
# names=['y', 'X'] # TODO fix header handling based on data (automate)
assert ['y', 'X'] == df.columns.values.tolist(), ("Header mismatch!:\tThe data file ({file}) does not contain "
"the required header ['y', 'X']".format(file=file_path))
x = np.array(df['X'])
y = np.array(df['y'].astype(np.int8))
# bpsk
if rec:
# BPSK; [0: -1], [1: 1]
# demapping y+1 / 2
y = (y + 1) // 2
# y[y==-1] = 0
# x = np.reshape(x, (-1, 1))
# y = np.reshape(y, (-1, 1))
return x, y
elif modulation == 'qpsk':
df = pd.read_csv(file_path, names=['y1', 'y2', 'x_real', 'x_imag'], header=None, nrows=NoD,
dtype={'y1': np.int8, 'y2': np.int8, 'x_real': np.float16, 'x_imag': np.float16})
# TODO: improve data import, replace on read; https://stackoverflow.com/a/18920156
# x = np.array(df['X'].str.replace('i', 'j').apply(lambda s: np.singlecomplex(s)))
x = np.array(df[['x_real', 'x_imag']].astype(np.float16))
# y = np.array(df[['y1', 'y2']].astype(np.int8))
# 00, 01, 10, 11 to --> 0 1 2 3 integer
y = np.array(df['y1'] * 2 + df['y2']).astype(np.int8)
# integer to one hot
# https://www.delftstack.com/howto/numpy/one-hot-encoding-numpy/
y_one_hot = np.zeros(shape=(y.size, y.max() + 1), dtype=np.int8)
y_one_hot[np.arange(y.size), y] = 1
return x, y_one_hot
def acc_data(tau):
""" accumulate data """
# combine several tau data
valid = False
if valid:
cdata = []
for tv in tau:
cdata += tv
raise NotImplementedError
# return x, y
def check_data(rx_data, ref_bit, modulation='bpsk'):
"""
:param rx_data: X
:param ref_bit: y
:param modulation:
:return:
"""
if modulation == 'qpsk':
# qpsk check
s3 = np.logical_and(rx_data[:, 0] > 0, rx_data[:, 1] > 0) # D
s2 = np.logical_and(rx_data[:, 0] > 0, rx_data[:, 1] < 0) # C
s1 = np.logical_and(rx_data[:, 0] < 0, rx_data[:, 1] > 0) # B
s0 = np.logical_and(rx_data[:, 0] < 0, rx_data[:, 1] < 0) # A
_df = pd.DataFrame({'s0': s0 * 1, 's1': s1 * 1, 's2': s2 * 1, 's3': s3 * 1})
# _df.sum()
# _df.sum().plot()
ref = pd.DataFrame(ref_bit)
ref.columns = ['s0', 's1', 's2', 's3']
nof_error = 0
for clm in _df.columns:
nof_error += np.asarray((_df[clm] != ref[clm])).sum()
elif modulation == 'bpsk':
# check for data
nof_error = sum(abs((rx_data > 0) * 1 - ref_bit))
print('#data: {NoD}\t#diff: {diff}'.format(NoD=ref_bit.size, diff=nof_error))
else:
raise NotImplementedError
return nof_error
def get_song_data(x_in, y_in, cl, m): # cl : capital l "L"
pad = (cl - m) // 2
# padding for initial and ending values
# d = len(in_data.shape)
# assert d < 3, 'high dimensional input does not supported, only 1D or 2D'
tmp_pad = abs(x_in[:pad] * 0)
data = np.concatenate((tmp_pad, x_in, tmp_pad), axis=0)
nos = (len(data) - cl) // m # number of record/sample
x_out = np.empty(shape=(nos, cl))
y_out = np.empty(shape=(nos, m))
for i in range(nos):
x_out[i, :] = data[i * m:(i * m + cl)]
y_out[i, :] = 2 * y_in[i * m:(i * m + m)] - 1
return x_out, y_out
def is_symmetric(cl):
"""
check if given list is symmetric or not
"""
assert NotImplementedError
return all(i == j for i, *j in zip(cl, *cl))
def prep_ts_data(in_data, lon=7):
# padding for initial and ending values
d = len(in_data.shape)
assert d == 1, 'high dimensional input does not supported, only 1D allowed'
# assert d < 3, 'high dimensional input does not supported, only 1D or 2D'
tmp_pad = abs(in_data[:lon] * 0)
data = np.concatenate((tmp_pad, in_data, tmp_pad), axis=0)
sl = list(in_data.shape)
sl.insert(1, 2 * lon + 1)
ts_x = np.empty(shape=tuple(sl))
if d == 1:
for i in range(sl[0]):
ts_x[i, :] = data[i:i + 2 * lon + 1]
else:
for i in range(sl[0]):
ts_x[i, :, :] = data[i:i + 2 * lon + 1, :]
return ts_x
def show_train(history):
n = len(history.history)
fig, axs = plt.subplots(nrows=1, ncols=n // 2)
# TODO : add no validation support
for i, metric in enumerate(history.history):
# print(metric)
if 'val_' in metric:
break
axs[i].plot(history.history[metric])
axs[i].plot(history.history['val_' + metric])
axs[i].set_title('model ' + metric)
axs[i].set_xlabel('epoch')
axs[i].set_ylabel(metric)
axs[i].legend(['train', 'val'])
# axs[i].legend(['train', 'val'], loc='upper left')
plt.show()
return fig
# https://matplotlib.org/stable/gallery/subplots_axes_and_figures/figure_title.html