-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjpda_naive.py
178 lines (147 loc) · 7.11 KB
/
jpda_naive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import numpy as np
from track_holder import TrackHolder
from scipy.spatial.distance import mahalanobis
from scipy.stats.distributions import chi2
from scipy.stats import multivariate_normal
from scipy.optimize import linear_sum_assignment
from track_object import TrackObject
P_DET = 0.95
P_GATE = 0.9997
BETA_FA = 10 / (1000 ** 2)
BETA_NT = 3 / 350/(1000 ** 2)
GATE_THRESHOLD = np.sqrt(chi2.ppf(P_GATE, df=4))
time = 0
all_measurements = np.load("/home/talha/git/multi-object-tracking/measurements.npy", allow_pickle=True)
def check_distance(kf_obj, meas):
return mahalanobis(meas, kf_obj.get_output(), np.linalg.inv(kf_obj.get_innovation_covariance()))
def MakeTimeUpdate(tracks):
for track in tracks:
track.predict()
return tracks
def InitializeTracks(tracks, measurements):
"""
Initialize track objects for each measurement and append them to the tracks list.
:param tracks: List of existing track objects.
:param measurements: List of measurements, each a tuple or list of values.
:return: Updated list of track objects including those initialized from measurements.
"""
for measurement in measurements:
# Directly construct the mean array and create a new TrackObject
mean = np.array([measurement[0], 0, measurement[1], 0, measurement[2], measurement[3]])
track = TrackObject(mean)
tracks.append(track)
return tracks
def create_validation_matrix(tracks, measurements, gate_threshold):
validation_matrix = np.zeros((len(measurements), len(tracks)))
for i, measurement in enumerate(measurements):
for j, track in enumerate(tracks):
if check_distance(track, measurement) <= gate_threshold:
validation_matrix[i, j] = 1
return validation_matrix
def calculate_track_associations(tracks, measurements, validation_matrix):
"""
Finds possible measurement associations for each track.
Args:
tracks: A list of TrackObject instances.
measurements: A NumPy array of measurements.
validation_matrix: A NumPy array indicating valid track-measurement associations.
Returns:
A list where each element is a list of possible measurement indices
(including 0 for no association) for the corresponding track.
"""
tracks_possible_associations = []
for i, track in enumerate(tracks):
valid_measurement_indices = np.nonzero(validation_matrix[:, i])[0] # Find indices with True values
associations = [0] + (valid_measurement_indices + 1).tolist() # Add 1 for measurement indexing
tracks_possible_associations.append(associations)
return tracks_possible_associations
import itertools
def generate_hypotheses(tracks, measurements, validation_matrix):
"""
Generates all possible association hypotheses.
Args:
tracks: A list of TrackObject instances.
measurements: A NumPy array of measurements.
validation_matrix: A NumPy array indicating valid track-measurement associations.
Returns:
A list of lists, where each inner list represents a hypothesis.
Example: [[0, 1], [2, 0]] -> Track 1 has no association,
Track 2 is associated with measurement 2.
"""
tracks_possible_associations = calculate_track_associations(tracks, measurements, validation_matrix)
all_hypotheses = list(itertools.product(*tracks_possible_associations))
return all_hypotheses
def calculate_jpda_probabilities(tracks, measurements, validation_matrix, pd, pg, beta_fa):
all_hypotheses = generate_hypotheses(tracks, measurements, validation_matrix)
def calculate_hypothesis_probability(tracks, measurements, hypothesis, pd, pg, beta_fa):
probability_factors = []
for i, track_index in enumerate(hypothesis):
track = tracks[i]
if track_index > 0: # Track is associated with a measurement
meas_index = track_index - 1
measurement = measurements[meas_index]
innovation = measurement - track.get_output()
S = track.get_innovation_covariance()
rv = multivariate_normal(mean=np.zeros(S.shape[0]), cov=S) # Assuming Gaussian
probability_factors.append(pd * rv.pdf(innovation) / (1 - pd * pg))
else: # Track is not associated with a measurement
probability_factors.append(1 - pd * pg)
num_false_alarms = 0
for track_index in hypothesis:
if track_index == 0: # Check for no association
num_false_alarms += 1
return np.prod(probability_factors) * (beta_fa ** num_false_alarms)
hypothesis_probabilities = [calculate_hypothesis_probability(tracks, measurements, h, pd, pg, beta_fa) for h in all_hypotheses]
num_tracks = len(tracks)
num_meas = len(measurements)
jpda_probs = np.zeros((num_tracks, num_meas + 1))
tracks_possible_associations = calculate_track_associations(tracks, measurements, validation_matrix)
for i, track in enumerate(tracks):
for j, meas_index in enumerate(tracks_possible_associations[i]):
if meas_index > 0:
jpda_probs[i, meas_index - 1] = sum(hypothesis_probabilities[h_idx] for h_idx, h in enumerate(all_hypotheses) if h[i] == j)
else:
jpda_probs[i, -1] = sum(hypothesis_probabilities[h_idx] for h_idx, h in enumerate(all_hypotheses) if h[i] == j)
return jpda_probs
def main(measurements):
global time
confirmed_tracks = track_holder.get_confirmed_tracks()
candidate_tracks = track_holder.get_candidate_tracks()
MakeTimeUpdate(confirmed_tracks)
MakeTimeUpdate(candidate_tracks)
validation_matrix = create_validation_matrix(confirmed_tracks + candidate_tracks, measurements, GATE_THRESHOLD)
# hypotheses = generate_hypotheses(confirmed_tracks + candidate_tracks, measurements, validation_matrix)
# print(hypotheses)
jpda_probs = calculate_jpda_probabilities(confirmed_tracks + candidate_tracks, measurements, validation_matrix, P_DET, P_GATE, BETA_FA)
print(jpda_probs)
if len(measurements) > 0:
InitializeTracks(candidate_tracks, measurements)
time += 1
if time % 3 == 0:
track_holder.kill_all_tracks()
# track_holder.kill_confirmed_tracks()
# track_holder.kill_candidate_tracks()
# track_holder.confirm_candidate_tracks()
print("Candidate Tracks : ",len(candidate_tracks))
print("Confirmed Tracks : ",len(confirmed_tracks))
track_holder = TrackHolder()
tracks_hist = []
for i in range(2):
main(all_measurements[i])
tracks = track_holder.get_old_tracks()
for track in tracks:
history = track.get_history()
tracks_hist.append(history)
for track in track_holder.get_confirmed_tracks():
history = track.get_history()
tracks_hist.append(history)
tracks_hist = np.array(tracks_hist, dtype=object)
np.save('tracks',tracks_hist)
# plt.plot(history[:,0], history[:,1], markersize = 5)
# plt.grid(True)
# plt.xlim((-3000,3000)),plt.ylim((-3000,3000))
# plt.title("GNN Tracker")
# plt.xlabel("X Position")
# plt.ylabel("Y Position")
# plt.legend(["Track 1", "Track 2", "Track 3", "Track 4", "Track 5"])
# plt.show()