Skip to content

Commit 9e5daf8

Browse files
committed
Also fix concurrency for connection control
1 parent dae8ea2 commit 9e5daf8

File tree

2 files changed

+68
-58
lines changed

2 files changed

+68
-58
lines changed

archive/archive.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) {
284284

285285
start := time.Now()
286286
for archive.Control.State.state != ControlStateConnected && archive.Control.State.err == nil {
287-
fragments := archive.Control.Poll(ConnectionControlFragmentHandler, 1)
287+
fragments := archive.Control.PollWithContext(ConnectionControlFragmentHandler, correlationID, 1)
288288
if fragments > 0 {
289289
logger.Debugf("Read %d fragment(s)\n", fragments)
290290
}

archive/control.go

+67-57
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ func init() {
102102

103103
// The current subscription handler doesn't provide a mechanism for passing context
104104
// so we return data via the control's Results
105-
func controlFragmentHandler(context int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
106-
logger.Debugf("controlFragmentHandler: correlationID:%d offset:%d length:%d header:%#v", context, offset, length, header)
105+
func controlFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
106+
logger.Debugf("controlFragmentHandler: correlationID:%d offset:%d length:%d header:%#v", correlationID, offset, length, header)
107107
var hdr codecs.SbeGoMessageHeader
108108

109109
buf := new(bytes.Buffer)
@@ -120,14 +120,14 @@ func controlFragmentHandler(context int64, buffer *atomic.Buffer, offset int32,
120120
return
121121
}
122122

123-
// Look up our control via the context
124-
c, ok := correlations.Load(context)
123+
// Look up our control
124+
c, ok := correlations.Load(correlationID)
125125
if !ok {
126126
// something has gone horribly wrong and we can't correlate
127127
if Listeners.ErrorListener != nil {
128-
Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", context))
128+
Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", correlationID))
129129
}
130-
logger.Debugf("failed to locate control via correlationID %d", context)
130+
logger.Debugf("failed to locate control via correlationID %d", correlationID)
131131
return
132132
}
133133
control := c.(*Control)
@@ -147,7 +147,7 @@ func controlFragmentHandler(context int64, buffer *atomic.Buffer, offset int32,
147147
}
148148

149149
// Check this was for us
150-
if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == context {
150+
if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == correlationID {
151151
// Set our state to let the caller of Poll() which triggered this know they have something
152152
// We're basically finished so prepare our OOB return values and log some info if we can
153153
logger.Debugf("controlFragmentHandler/controlResponse: received for sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId)
@@ -185,8 +185,8 @@ func controlFragmentHandler(context int64, buffer *atomic.Buffer, offset int32,
185185

186186
// ConnectionControlFragmentHandler is the connection handling specific fragment handler.
187187
// This mechanism only alows us to pass results back via global state which we do in control.State
188-
func ConnectionControlFragmentHandler(buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
189-
logger.Debugf("ControlSubscriptionHandler: offset:%d length: %d header: %#v", offset, length, header)
188+
func ConnectionControlFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
189+
logger.Debugf("ConnectionControlFragmentHandler: correlationID:%d offset:%d length: %d header: %#v", correlationID, offset, length, header)
190190

191191
var hdr codecs.SbeGoMessageHeader
192192

@@ -213,42 +213,48 @@ func ConnectionControlFragmentHandler(buffer *atomic.Buffer, offset int32, lengt
213213
if Listeners.ErrorListener != nil {
214214
Listeners.ErrorListener(err2)
215215
}
216+
logger.Debugf("ConnectionControlFragmentHandler failed to decode control response: %w", err)
216217
return
217218
}
218219

219220
// Look it up
220221
c, ok := correlations.Load(controlResponse.CorrelationId)
221222
if !ok {
222-
// Must have been for someone else which can happen if two or more clients have
223-
// use the same channel/stream
224223
logger.Debugf("connectionControlFragmentHandler/controlResponse: ignoring correlationID=%d [%s]\n%#v", controlResponse.CorrelationId, string(controlResponse.ErrorMessage), controlResponse)
225224
return
226225
}
227226
control := c.(*Control)
228227

229-
// Check result
230-
if controlResponse.Code != codecs.ControlResponseCode.OK {
231-
control.State.state = ControlStateError
232-
control.State.err = fmt.Errorf("Control Response failure: %s", controlResponse.ErrorMessage)
233-
if Listeners.ErrorListener != nil {
234-
Listeners.ErrorListener(control.State.err)
228+
// Check this was for us
229+
if controlResponse.CorrelationId == correlationID {
230+
// Check result
231+
if controlResponse.Code != codecs.ControlResponseCode.OK {
232+
control.State.state = ControlStateError
233+
control.State.err = fmt.Errorf("Control Response failure: %s", controlResponse.ErrorMessage)
234+
if Listeners.ErrorListener != nil {
235+
Listeners.ErrorListener(control.State.err)
236+
}
237+
return
235238
}
236-
return
237-
}
238239

239-
// assert state change
240-
if control.State.state != ControlStateConnectRequestSent {
241-
control.State.state = ControlStateError
242-
control.State.err = fmt.Errorf("Control Response not expecting response")
243-
if Listeners.ErrorListener != nil {
244-
Listeners.ErrorListener(control.State.err)
240+
// assert state change
241+
if control.State.state != ControlStateConnectRequestSent {
242+
control.State.state = ControlStateError
243+
control.State.err = fmt.Errorf("Control Response not expecting response")
244+
if Listeners.ErrorListener != nil {
245+
Listeners.ErrorListener(control.State.err)
246+
}
245247
}
246-
}
247248

248-
// Looking good, so update state and store the SessionID
249-
control.State.state = ControlStateConnected
250-
control.State.err = nil
251-
control.archive.SessionID = controlResponse.ControlSessionId
249+
// Looking good, so update state and store the SessionID
250+
control.State.state = ControlStateConnected
251+
control.State.err = nil
252+
control.archive.SessionID = controlResponse.ControlSessionId
253+
} else {
254+
// It's conceivable if the same application is making concurrent connection attempts using
255+
// the same channel/stream that we can reach here which is our parent's problem
256+
logger.Debugf("connectionControlFragmentHandler/controlResponse: ignoring correlationID=%d", controlResponse.CorrelationId)
257+
}
252258

253259
case codecIds.challenge:
254260
var challenge = new(codecs.Challenge)
@@ -266,29 +272,33 @@ func ConnectionControlFragmentHandler(buffer *atomic.Buffer, offset int32, lengt
266272
// Look it up
267273
c, ok := correlations.Load(challenge.CorrelationId)
268274
if !ok {
269-
// Not much to be done here as we can't correlate
270-
err := fmt.Errorf("connectionControlFragmentHandler: ignoring uncorrelated correlationID=%d", challenge.CorrelationId)
271-
if Listeners.ErrorListener != nil {
272-
Listeners.ErrorListener(err)
273-
}
275+
logger.Debugf("connectionControlFragmentHandler/controlResponse: ignoring correlationID=%d", challenge.CorrelationId)
274276
return
275277
}
276278
control := c.(*Control)
277279

278-
// Check the challenge is expected iff our option for this is not nil
279-
if control.archive.Options.AuthChallenge != nil {
280-
if !bytes.Equal(control.archive.Options.AuthChallenge, challenge.EncodedChallenge) {
281-
control.State.err = fmt.Errorf("ChallengeResponse Unexpected: expected:%v received:%v", control.archive.Options.AuthChallenge, challenge.EncodedChallenge)
282-
return
280+
// Check this was for us
281+
if challenge.CorrelationId == correlationID {
282+
283+
// Check the challenge is expected iff our option for this is not nil
284+
if control.archive.Options.AuthChallenge != nil {
285+
if !bytes.Equal(control.archive.Options.AuthChallenge, challenge.EncodedChallenge) {
286+
control.State.err = fmt.Errorf("ChallengeResponse Unexpected: expected:%v received:%v", control.archive.Options.AuthChallenge, challenge.EncodedChallenge)
287+
return
288+
}
283289
}
284-
}
285290

286-
// Send the response
287-
// Looking good, so update state and store the SessionID
288-
control.State.state = ControlStateChallenged
289-
control.State.err = nil
290-
control.archive.SessionID = challenge.ControlSessionId
291-
control.archive.Proxy.ChallengeResponse(challenge.CorrelationId, control.archive.Options.AuthResponse)
291+
// Send the response
292+
// Looking good, so update state and store the SessionID
293+
control.State.state = ControlStateChallenged
294+
control.State.err = nil
295+
control.archive.SessionID = challenge.ControlSessionId
296+
control.archive.Proxy.ChallengeResponse(challenge.CorrelationId, control.archive.Options.AuthResponse)
297+
} else {
298+
// It's conceivable if the same application is making concurrent connection attempts using
299+
// the same channel/stream that we can reach here which is our parent's problem
300+
logger.Debugf("connectionControlFragmentHandler/challengr: ignoring correlationID=%d", challenge.CorrelationId)
301+
}
292302

293303
// These can happen when testing/reconnecting or if multiple clients are on the same channel/stream
294304
case codecIds.recordingDescriptor:
@@ -316,7 +326,7 @@ func (control *Control) Poll(handler term.FragmentHandler, fragmentLimit int) in
316326
return control.Subscription.Poll(handler, fragmentLimit)
317327
}
318328

319-
// PollWithContext provides a Poll with context via the context argument
329+
// PollWithContext provides a Poll() with a context argument
320330
func (control *Control) PollWithContext(handler term.FragmentHandlerWithContext, context int64, fragmentLimit int) int {
321331

322332
// Update our globals in case they've changed so we use the current state in our callback
@@ -380,8 +390,8 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i
380390
// DescriptorFragmentHandler is used to poll for descriptors (both recording and subscription)
381391
// The current subscription handler doesn't provide a mechanism for passing a context
382392
// so we return data via the control's Results
383-
func DescriptorFragmentHandler(context int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
384-
// logger.Debugf("DescriptorFragmentHandler: context:%d offset:%d length: %d header: %#v\n", context, offset, length, header)
393+
func DescriptorFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
394+
// logger.Debugf("DescriptorFragmentHandler: correlationID:%d offset:%d length: %d header: %#v\n", correlationID, offset, length, header)
385395

386396
var hdr codecs.SbeGoMessageHeader
387397

@@ -399,14 +409,14 @@ func DescriptorFragmentHandler(context int64, buffer *atomic.Buffer, offset int3
399409
return
400410
}
401411

402-
// Look up our control via the context
403-
c, ok := correlations.Load(context)
412+
// Look up our control
413+
c, ok := correlations.Load(correlationID)
404414
if !ok {
405415
// something has gone horribly wrong and we can't correlate
406416
if Listeners.ErrorListener != nil {
407-
Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", context))
417+
Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", correlationID))
408418
}
409-
logger.Debugf("failed to locate control via correlationID %d", context)
419+
logger.Debugf("failed to locate control via correlationID %d", correlationID)
410420
return
411421
}
412422
control := c.(*Control)
@@ -426,7 +436,7 @@ func DescriptorFragmentHandler(context int64, buffer *atomic.Buffer, offset int3
426436
logger.Debugf("RecordingDescriptor: %#v", recordingDescriptor)
427437

428438
// Check this was for us
429-
if recordingDescriptor.ControlSessionId == control.archive.SessionID && recordingDescriptor.CorrelationId == context {
439+
if recordingDescriptor.ControlSessionId == control.archive.SessionID && recordingDescriptor.CorrelationId == correlationID {
430440
// Set our state to let the caller of Poll() which triggered this know they have something
431441
control.Results.RecordingDescriptors = append(control.Results.RecordingDescriptors, recordingDescriptor)
432442
control.Results.FragmentsReceived++
@@ -447,7 +457,7 @@ func DescriptorFragmentHandler(context int64, buffer *atomic.Buffer, offset int3
447457
}
448458

449459
// Check this was for us
450-
if recordingSubscriptionDescriptor.ControlSessionId == control.archive.SessionID && recordingSubscriptionDescriptor.CorrelationId == context {
460+
if recordingSubscriptionDescriptor.ControlSessionId == control.archive.SessionID && recordingSubscriptionDescriptor.CorrelationId == correlationID {
451461
// Set our state to let the caller of Poll() which triggered this know they have something
452462
control.Results.RecordingSubscriptionDescriptors = append(control.Results.RecordingSubscriptionDescriptors, recordingSubscriptionDescriptor)
453463
control.Results.FragmentsReceived++
@@ -468,7 +478,7 @@ func DescriptorFragmentHandler(context int64, buffer *atomic.Buffer, offset int3
468478
}
469479

470480
// Check this was for us
471-
if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == context {
481+
if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == correlationID {
472482
// Set our state to let the caller of Poll() which triggered this know they have something
473483
// We're basically finished so prepare our OOB return values and log some info if we can
474484
logger.Debugf("descriptorFragmentHandler/controlResponse: received for sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId)

0 commit comments

Comments
 (0)