Skip to content

Commit

Permalink
feat: add context support for opensearch log.
Browse files Browse the repository at this point in the history
  • Loading branch information
coghost committed Jan 2, 2025
1 parent 8e08d91 commit 595e42c
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 59 deletions.
147 changes: 105 additions & 42 deletions opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -18,11 +19,22 @@ import (
"github.com/opensearch-project/opensearch-go/opensearchutil"
)

type CleanUp func(context.Context) error

const (
writerCtxTimeout = 5 * time.Second
numberOfWorkers = 2
flushBytes = 256 * 1024
flushInterval = 10 * time.Second
)

var ErrCreateOpensearchCore = errors.New("failed to create OpenSearch core")

func DefaultOpenSearchConfig(url string, insecure bool) opensearch.Config {
return opensearch.Config{
Addresses: []string{url},
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure},
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}, //nolint:gosec
},
}
}
Expand All @@ -47,7 +59,7 @@ func DefaultOpenSearchConfig(url string, insecure bool) opensearch.Config {
//
// The function supports both console and OpenSearch output. When OpenSearch is enabled,
// both openSearchConfig and openSearchIndex must be provided through the options.
func MustNewZapLoggerWithOpenSearch(opts ...LogOptFunc) (*zap.Logger, func() error) {
func MustNewZapLoggerWithOpenSearch(opts ...LogOptFunc) (*zap.Logger, CleanUp) {
opt := &LogOpts{
level: zapcore.InfoLevel,
withConsole: false,
Expand Down Expand Up @@ -94,9 +106,11 @@ func MustNewZapLoggerWithOpenSearch(opts ...LogOptFunc) (*zap.Logger, func() err
opt.internalLogger,
)
if err != nil {
return nil, fmt.Errorf("failed to create OpenSearch core: %v", err)
return nil, fmt.Errorf("failed to create OpenSearch core: %w", err)
}

openSearchWriter = writer

return core, nil
}

Expand All @@ -114,24 +128,27 @@ func MustNewZapLoggerWithOpenSearch(opts ...LogOptFunc) (*zap.Logger, func() err
coreTee := zapcore.NewTee(cores...)
logger := zap.New(coreTee, zap.AddCaller())

flushFunc := func() error {
flushFunc := func(ctx context.Context) error {
if openSearchWriter != nil {
err := openSearchWriter.Flush()
if err != nil {
return err
if err := openSearchWriter.FlushWithContext(ctx); err != nil {
return fmt.Errorf("flush error: %w", err)
}

newCore, err := createOpenSearchCore()
if err != nil {
return err
}

for i, core := range cores {
if core == openSearchCore {
cores[i] = newCore
openSearchCore = newCore

break
}
}
}

return nil
}

Expand All @@ -140,23 +157,17 @@ func MustNewZapLoggerWithOpenSearch(opts ...LogOptFunc) (*zap.Logger, func() err

// FlushLogsWithTimeout attempts to flush logs with a timeout.
// It returns a function suitable for use with defer.
func FlushLogsWithTimeout(flushFunc func() error, timeout time.Duration, logger *zap.Logger) func() {
func FlushLogsWithTimeout(flushFunc CleanUp, timeout time.Duration, logger *zap.Logger) func() {
return func() {
flushDone := make(chan error, 1)
go func() {
flushDone <- flushFunc()
}()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

select {
case err := <-flushDone:
if err != nil {
logger.Error("Error during flush", zap.Error(err))
} else {
logger.Info("Logs flushed successfully")
}
case <-time.After(timeout):
logger.Error("Flush operation timed out", zap.Duration("timeout", timeout))
if err := flushFunc(ctx); err != nil {
logger.Error("Error during flush", zap.Error(err))
return
}

logger.Info("Logs flushed successfully")
}
}

Expand All @@ -169,14 +180,21 @@ type openSearchWriter struct {
logger *zap.Logger

indexNameGenerator *IndexGenerator

stopChan chan struct{}
}

func (w *openSearchWriter) Write(p []byte) (n int, err error) {
func (w *openSearchWriter) Write(buffer []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()

if w.closed {
return 0, ErrWriterClosed
}

var logEntry map[string]interface{}
err = json.Unmarshal(p, &logEntry)

err = json.Unmarshal(buffer, &logEntry)
if err != nil {
return 0, fmt.Errorf("failed to parse log entry: %w", err)
}
Expand All @@ -186,29 +204,73 @@ func (w *openSearchWriter) Write(p []byte) (n int, err error) {
return 0, fmt.Errorf("failed to re-encode log entry: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), writerCtxTimeout)
defer cancel()

err = w.indexer.Add(
ctx,
opensearchutil.BulkIndexerItem{
Action: "index",
Index: w.indexNameGenerator.GetIndexName(), // Use dynamic index name
Body: bytes.NewReader(encodedEntry),
},
)
if err != nil {
return 0, fmt.Errorf("failed to add document to bulk indexer: %w", err)
select {
case <-w.stopChan:
return 0, ErrWriterIsStopping
default:
err = w.indexer.Add(
ctx,
opensearchutil.BulkIndexerItem{
Action: "index",
Index: w.indexNameGenerator.GetIndexName(),
Body: bytes.NewReader(encodedEntry),
},
)
if err != nil {
return 0, fmt.Errorf("failed to add document to bulk indexer: %w", err)
}
}

return len(buffer), nil
}

var (
ErrWriterClosed = errors.New("writer already closed")
ErrWriterIsStopping = errors.New("writer is stopping")
)

// FlushWithContext flushes logs with context support
func (w *openSearchWriter) FlushWithContext(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()

if w.closed {
return ErrWriterClosed
}

return len(p), nil
// Signal to stop accepting new writes
close(w.stopChan)
w.closed = true

stats := w.indexer.Stats()
w.logger.Info("Starting flush",
zap.Uint64("added", stats.NumAdded),
zap.Uint64("flushed", stats.NumFlushed),
zap.Uint64("failed", stats.NumFailed))

// Use provided context for closing
if err := w.indexer.Close(ctx); err != nil {
w.logger.Error("Error closing bulk indexer", zap.Error(err))
return fmt.Errorf("error closing bulk indexer: %w", err)
}

finalStats := w.indexer.Stats()
w.logger.Info("Flush completed",
zap.Uint64("added", finalStats.NumAdded),
zap.Uint64("flushed", finalStats.NumFlushed),
zap.Uint64("failed", finalStats.NumFailed))

return nil
}

func (w *openSearchWriter) Flush() error {
w.mu.Lock()
defer w.mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) //nolint:mnd
defer cancel()

stats := w.indexer.Stats()
Expand Down Expand Up @@ -256,23 +318,23 @@ func (w *openSearchWriter) Flush() error {
func newOpenSearchCore(config *opensearch.Config, indexNameGenerator *IndexGenerator, level zapcore.Level, logger *zap.Logger) (zapcore.Core, *openSearchWriter, error) {
client, err := opensearch.NewClient(*config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create OpenSearch client: %v", err)
return nil, nil, fmt.Errorf("failed to create OpenSearch client: %w", err)
}

indexerConfig := opensearchutil.BulkIndexerConfig{
Client: client,
Index: indexNameGenerator.GetIndexName(),
NumWorkers: 2,
FlushBytes: 256 * 1024,
FlushInterval: 10 * time.Second,
NumWorkers: numberOfWorkers,
FlushBytes: flushBytes,
FlushInterval: flushInterval,
OnError: func(ctx context.Context, err error) {
logger.Error("Bulk indexer error", zap.Error(err))
},
}

indexer, err := opensearchutil.NewBulkIndexer(indexerConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create bulk indexer: %v", err)
return nil, nil, fmt.Errorf("failed to create bulk indexer: %w", err)
}

writer := &openSearchWriter{
Expand All @@ -282,6 +344,7 @@ func newOpenSearchCore(config *opensearch.Config, indexNameGenerator *IndexGener
logger: logger,
// dynamically generate index name
indexNameGenerator: indexNameGenerator,
stopChan: make(chan struct{}),
}

encoderConfig := zap.NewProductionEncoderConfig()
Expand All @@ -306,7 +369,7 @@ func IsOpenSearchReady(url string, timeout time.Duration, insecure bool) bool {
}

transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure},
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}, //nolint:gosec
}
client := &http.Client{Transport: transport}

Expand Down
4 changes: 2 additions & 2 deletions zlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ func MustNewZapLogger(opts ...LogOptFunc) *zap.Logger {
return logger
}

func genProdEncoder() zapcore.Encoder { //nolint
func genProdEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder

return zapcore.NewConsoleEncoder(encoderConfig)
}

func genDevEncoder(isConsole bool) zapcore.Encoder { //nolint
func genDevEncoder(isConsole bool) zapcore.Encoder {
encoderConfig := zap.NewDevelopmentEncoderConfig()
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("15:04:05")
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
Expand Down
35 changes: 20 additions & 15 deletions zlog_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zlog

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -9,19 +10,23 @@ import (
"go.uber.org/zap/zapcore"
)

func TestMustNewZapLoggerWithOpenSearch(t *testing.T) {
opensearchURL := "https://localhost:9200"
insecure := true // Set to false if you have valid certificates
const (
_testOpensearchFlushTimeout = 5 * time.Second
_testOpensearchURL = "http://localhost:9200"

_testIsInsecure = true // Set to false if you have valid certificates
)

func TestMustNewZapLoggerWithOpenSearch(t *testing.T) {
// Check if OpenSearch is ready
if !IsOpenSearchReady(opensearchURL, 5*time.Second, insecure) {
if !IsOpenSearchReady(_testOpensearchURL, 5*time.Second, _testIsInsecure) {
msg := "OpenSearch is not ready. Skipping test."
fmt.Println(msg)
t.Skip(msg)
}

// Create a logger with OpenSearch enabled
defaultConfig := DefaultOpenSearchConfig(opensearchURL, insecure)
defaultConfig := DefaultOpenSearchConfig(_testOpensearchURL, _testIsInsecure)
logger, flushFunc := MustNewZapLoggerWithOpenSearch(
WithOpenSearchConfig(&defaultConfig),
WithOpenSearchIndex("zlog-test", string(DateFormatDot)),
Expand Down Expand Up @@ -59,38 +64,38 @@ func TestMustNewZapLoggerWithOpenSearch(t *testing.T) {
}

func TestLogToOpenSearch(t *testing.T) {
opensearchURL := "https://localhost:9200"
insecure := true // Set to false if you have valid certificates

// Check if OpenSearch is ready
if !IsOpenSearchReady(opensearchURL, 5*time.Second, insecure) {
if !IsOpenSearchReady(_testOpensearchURL, 5*time.Second, _testIsInsecure) {
msg := "OpenSearch is not ready. Skipping test."
fmt.Println(msg)
t.Skip(msg)
}

defaultConfig := DefaultOpenSearchConfig(opensearchURL, insecure)
defaultConfig := DefaultOpenSearchConfig(_testOpensearchURL, _testIsInsecure)
logger, flushFunc := MustNewZapLoggerWithOpenSearch(
WithOpenSearchConfig(&defaultConfig),
WithOpenSearchIndex("zlog-test", string(DateFormatDot)),
WithLogLevel(zapcore.InfoLevel),
WithConsole(true),
)

ctx, cancel := context.WithTimeout(context.Background(), _testOpensearchFlushTimeout)
defer cancel()

// Log some test messages with timestamps and flush after each
now := time.Now()
logger.Info("Test info message v4", zap.Time("timestamp", now))
if err := flushFunc(); err != nil {
if err := flushFunc(ctx); err != nil {
t.Errorf("Failed to flush logs: %v", err)
}

logger.Warn("Test warning message v4", zap.Time("timestamp", now.Add(time.Second)))
if err := flushFunc(); err != nil {
if err := flushFunc(ctx); err != nil {
t.Errorf("Failed to flush logs: %v", err)
}

logger.Error("Test error message v4", zap.Time("timestamp", now.Add(2*time.Second)))
if err := flushFunc(); err != nil {
if err := flushFunc(ctx); err != nil {
t.Errorf("Failed to flush logs: %v", err)
}

Expand All @@ -101,7 +106,7 @@ func TestLogToOpenSearch(t *testing.T) {
zap.Int("attempt", 3),
zap.Duration("backoff", time.Second*5),
)
if err := flushFunc(); err != nil {
if err := flushFunc(ctx); err != nil {
t.Errorf("Failed to flush logs: %v", err)
}

Expand All @@ -111,7 +116,7 @@ func TestLogToOpenSearch(t *testing.T) {
zap.Time("timestamp", now.Add(time.Duration(4+i)*time.Second)),
zap.Int("iteration", i+1),
)
if err := flushFunc(); err != nil {
if err := flushFunc(ctx); err != nil {
t.Errorf("Failed to flush logs: %v", err)
}
time.Sleep(100 * time.Millisecond) // Small delay between logs
Expand Down

0 comments on commit 595e42c

Please sign in to comment.