diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 0ea13f4e0a..7fc49e7eb3 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -123,12 +123,14 @@ func (tb *TelemetryBuffer) StartServer() error { tb.connections = remove(tb.connections, index) } }() - + reader := bufio.NewReader(conn) for { - reportStr, err := read(conn) - if err != nil { + reportStr, readErr := reader.ReadBytes(Delimiter) + if readErr != nil { return } + reportStr = reportStr[:len(reportStr)-1] + var tmp map[string]interface{} err = json.Unmarshal(reportStr, &tmp) if err != nil { @@ -195,16 +197,6 @@ func (tb *TelemetryBuffer) PushData(ctx context.Context) { } } -// read - read from the file descriptor -func read(conn net.Conn) (b []byte, err error) { - b, err = bufio.NewReader(conn).ReadBytes(Delimiter) - if err == nil { - b = b[:len(b)-1] - } - - return -} - // Write - write to the file descriptor. func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) { buf := make([]byte, len(b)) diff --git a/telemetry/telemetrybuffer_test.go b/telemetry/telemetrybuffer_test.go index f226c6a87b..cdb79849f7 100644 --- a/telemetry/telemetrybuffer_test.go +++ b/telemetry/telemetrybuffer_test.go @@ -67,6 +67,36 @@ func TestClientConnClose(t *testing.T) { tbClient.Close() } +func TestCloseOnWriteError(t *testing.T) { + tbServer, closeTBServer := createTBServer(t) + defer closeTBServer() + + tbClient := NewTelemetryBuffer() + err := tbClient.Connect() + require.NoError(t, err) + defer tbClient.Close() + + data := []byte("{\"good\":1}") + _, err = tbClient.Write(data) + require.NoError(t, err) + // need to wait for connection to populate in server + time.Sleep(1 * time.Second) + tbServer.mutex.Lock() + conns := tbServer.connections + tbServer.mutex.Unlock() + require.Len(t, conns, 1) + + // the connection should be automatically closed on failure + badData := []byte("} malformed json }}}") + _, err = tbClient.Write(badData) + require.NoError(t, err) + time.Sleep(1 * time.Second) + tbServer.mutex.Lock() + conns = tbServer.connections + tbServer.mutex.Unlock() + require.Empty(t, conns) +} + func TestWrite(t *testing.T) { _, closeTBServer := createTBServer(t) defer closeTBServer() @@ -84,8 +114,8 @@ func TestWrite(t *testing.T) { }{ { name: "write", - data: []byte("testdata"), - want: len("testdata") + 1, // +1 due to Delimiter('\n) + data: []byte("{\"testdata\":1}"), + want: len("{\"testdata\":1}") + 1, // +1 due to Delimiter('\n) wantErr: false, }, {