Brijesh's Git Server — argus_client @ 7b93aa77b6802acd952901f91a3cb618ab2a67cd

feat: send logs in batches

either when there are 30 logs in the buffer or every 5 seconds
Brijesh Wawdhane ops@brijesh.dev
Sat, 28 Dec 2024 16:29:10 +0530
commit

7b93aa77b6802acd952901f91a3cb618ab2a67cd

parent

e16080a7a532ac6c4d7a6c951736f31c8a185941

1 files changed, 71 insertions(+), 7 deletions(-)

jump to
M client.goclient.go

@@ -6,6 +6,10 @@ "errors"

"log" "log/slog" "net/http" + "os" + "os/signal" + "sync" + "syscall" "time" "github.com/google/uuid"

@@ -72,13 +76,55 @@

return err } -// for slog handler +// Custom slog.Handler implementation with batching and graceful shutdown type ArgusHandler struct { - client *Client + client *Client + logBuffer []LogEntry + bufferMutex sync.Mutex + flushTicker *time.Ticker + stopChan chan struct{} } func NewArgusHandler(client *Client) *ArgusHandler { - return &ArgusHandler{client: client} + handler := &ArgusHandler{ + client: client, + logBuffer: make([]LogEntry, 0, 30), + flushTicker: time.NewTicker(5 * time.Second), // Adjust the interval as needed + stopChan: make(chan struct{}), + } + + go handler.startBatching() + handler.setupSignalHandler() + + return handler +} + +func (h *ArgusHandler) startBatching() { + for { + select { + case <-h.flushTicker.C: + h.flushLogs() + case <-h.stopChan: + h.flushLogs() + return + } + } +} + +func (h *ArgusHandler) flushLogs() { + h.bufferMutex.Lock() + defer h.bufferMutex.Unlock() + + if len(h.logBuffer) == 0 { + return + } + + err := h.client.SendLogs(h.logBuffer) + if err != nil { + log.Println("Failed to send logs: ", err) + } + + h.logBuffer = h.logBuffer[:0] // Reset the buffer } func (h *ArgusHandler) Handle(ctx context.Context, record slog.Record) error {

@@ -94,12 +140,14 @@ Level: level,

Message: record.Message, } - err := h.client.SendLogs([]LogEntry{entry}) - if err != nil { - log.Println("Failed to send log: ", err) + h.bufferMutex.Lock() + h.logBuffer = append(h.logBuffer, entry) + if len(h.logBuffer) >= 30 { + go h.flushLogs() } + h.bufferMutex.Unlock() - return err + return nil } func (h *ArgusHandler) Enabled(ctx context.Context, level slog.Level) bool {

@@ -113,3 +161,19 @@

func (h *ArgusHandler) WithGroup(name string) slog.Handler { return h } + +func (h *ArgusHandler) setupSignalHandler() { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-signalChan + h.flushTicker.Stop() + close(h.stopChan) + }() +} + +func (h *ArgusHandler) Flush() { + h.flushTicker.Stop() + h.flushLogs() +}