Pull request 2304: AGDNS-2374-slog-querylog

Squashed commit of the following:

commit e98e5efaaf5388551322933321df0707ad7b2a9c
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Nov 21 13:15:51 2024 +0300

    all: imp code

commit fbe728c9aa03a325c2733c214412f9071faba5ed
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Nov 18 20:57:15 2024 +0300

    all: imp code

commit ef715c58cb6621236424f55268390aa3f997f883
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Nov 18 16:39:35 2024 +0300

    all: imp code

commit cbb993f7ae4311b2a73ace7066a5dabf190291be
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Nov 18 14:03:42 2024 +0300

    all: imp code

commit 8d88d799303c7e3d15322fee87780fedb408ea13
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Fri Nov 15 15:57:07 2024 +0300

    all: slog querylog
This commit is contained in:
Stanislav Chzhen
2024-11-21 20:19:39 +03:00
parent 1d6d85cff4
commit d96e65cb0c
18 changed files with 524 additions and 236 deletions

View File

@@ -1,13 +1,15 @@
package querylog
import (
"context"
"fmt"
"io"
"log/slog"
"slices"
"time"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/logutil/slogutil"
)
// client finds the client info, if any, by its ClientID and IP address,
@@ -48,7 +50,11 @@ func (l *queryLog) client(clientID, ip string, cache clientCache) (c *Client, er
// buffer. It optionally uses the client cache, if provided. It also returns
// the total amount of records in the buffer at the moment of searching.
// l.confMu is expected to be locked.
func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entries []*logEntry, total int) {
func (l *queryLog) searchMemory(
ctx context.Context,
params *searchParams,
cache clientCache,
) (entries []*logEntry, total int) {
// Check memory size, as the buffer can contain a single log record. See
// [newQueryLog].
if l.conf.MemSize == 0 {
@@ -66,9 +72,14 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie
var err error
e.client, err = l.client(e.ClientID, e.IP.String(), cache)
if err != nil {
msg := "querylog: enriching memory record at time %s" +
" for client %q (clientid %q): %s"
log.Error(msg, e.Time, e.IP, e.ClientID, err)
l.logger.ErrorContext(
ctx,
"enriching memory record",
"at", e.Time,
"client_ip", e.IP,
"client_id", e.ClientID,
slogutil.KeyError, err,
)
// Go on and try to match anyway.
}
@@ -86,7 +97,10 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie
// search searches log entries in memory buffer and log file using specified
// parameters and returns the list of entries found and the time of the oldest
// entry. l.confMu is expected to be locked.
func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest time.Time) {
func (l *queryLog) search(
ctx context.Context,
params *searchParams,
) (entries []*logEntry, oldest time.Time) {
start := time.Now()
if params.limit == 0 {
@@ -95,11 +109,11 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
cache := clientCache{}
memoryEntries, bufLen := l.searchMemory(params, cache)
log.Debug("querylog: got %d entries from memory", len(memoryEntries))
memoryEntries, bufLen := l.searchMemory(ctx, params, cache)
l.logger.DebugContext(ctx, "got entries from memory", "count", len(memoryEntries))
fileEntries, oldest, total := l.searchFiles(params, cache)
log.Debug("querylog: got %d entries from files", len(fileEntries))
fileEntries, oldest, total := l.searchFiles(ctx, params, cache)
l.logger.DebugContext(ctx, "got entries from files", "count", len(fileEntries))
total += bufLen
@@ -134,12 +148,13 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
oldest = entries[len(entries)-1].Time
}
log.Debug(
"querylog: prepared data (%d/%d) older than %s in %s",
len(entries),
total,
params.olderThan,
time.Since(start),
l.logger.DebugContext(
ctx,
"prepared data",
"count", len(entries),
"total", total,
"older_than", params.olderThan,
"elapsed", time.Since(start),
)
return entries, oldest
@@ -147,12 +162,12 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
// seekRecord changes the current position to the next record older than the
// provided parameter.
func (r *qLogReader) seekRecord(olderThan time.Time) (err error) {
func (r *qLogReader) seekRecord(ctx context.Context, olderThan time.Time) (err error) {
if olderThan.IsZero() {
return r.SeekStart()
}
err = r.seekTS(olderThan.UnixNano())
err = r.seekTS(ctx, olderThan.UnixNano())
if err == nil {
// Read to the next record, because we only need the one that goes
// after it.
@@ -164,21 +179,24 @@ func (r *qLogReader) seekRecord(olderThan time.Time) (err error) {
// setQLogReader creates a reader with the specified files and sets the
// position to the next record older than the provided parameter.
func (l *queryLog) setQLogReader(olderThan time.Time) (qr *qLogReader, err error) {
func (l *queryLog) setQLogReader(
ctx context.Context,
olderThan time.Time,
) (qr *qLogReader, err error) {
files := []string{
l.logFile + ".1",
l.logFile,
}
r, err := newQLogReader(files)
r, err := newQLogReader(ctx, l.logger, files)
if err != nil {
return nil, fmt.Errorf("opening qlog reader: %s", err)
return nil, fmt.Errorf("opening qlog reader: %w", err)
}
err = r.seekRecord(olderThan)
err = r.seekRecord(ctx, olderThan)
if err != nil {
defer func() { err = errors.WithDeferred(err, r.Close()) }()
log.Debug("querylog: cannot seek to %s: %s", olderThan, err)
l.logger.DebugContext(ctx, "cannot seek", "older_than", olderThan, slogutil.KeyError, err)
return nil, nil
}
@@ -191,13 +209,14 @@ func (l *queryLog) setQLogReader(olderThan time.Time) (qr *qLogReader, err error
// calls faster so that the UI could handle it and show something quicker.
// This behavior can be overridden if maxFileScanEntries is set to 0.
func (l *queryLog) readEntries(
ctx context.Context,
r *qLogReader,
params *searchParams,
cache clientCache,
totalLimit int,
) (entries []*logEntry, oldestNano int64, total int) {
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
ent, ts, rErr := l.readNextEntry(r, params, cache)
ent, ts, rErr := l.readNextEntry(ctx, r, params, cache)
if rErr != nil {
if rErr == io.EOF {
oldestNano = 0
@@ -205,7 +224,7 @@ func (l *queryLog) readEntries(
break
}
log.Error("querylog: reading next entry: %s", rErr)
l.logger.ErrorContext(ctx, "reading next entry", slogutil.KeyError, rErr)
}
oldestNano = ts
@@ -231,12 +250,13 @@ func (l *queryLog) readEntries(
// and the total number of processed entries, including discarded ones,
// correspondingly.
func (l *queryLog) searchFiles(
ctx context.Context,
params *searchParams,
cache clientCache,
) (entries []*logEntry, oldest time.Time, total int) {
r, err := l.setQLogReader(params.olderThan)
r, err := l.setQLogReader(ctx, params.olderThan)
if err != nil {
log.Error("querylog: %s", err)
l.logger.ErrorContext(ctx, "searching files", slogutil.KeyError, err)
}
if r == nil {
@@ -245,12 +265,12 @@ func (l *queryLog) searchFiles(
defer func() {
if closeErr := r.Close(); closeErr != nil {
log.Error("querylog: closing file: %s", closeErr)
l.logger.ErrorContext(ctx, "closing files", slogutil.KeyError, closeErr)
}
}()
totalLimit := params.offset + params.limit
entries, oldestNano, total := l.readEntries(r, params, cache, totalLimit)
entries, oldestNano, total := l.readEntries(ctx, r, params, cache, totalLimit)
if oldestNano != 0 {
oldest = time.Unix(0, oldestNano)
}
@@ -266,15 +286,21 @@ type quickMatchClientFinder struct {
}
// findClient is a method that can be used as a quickMatchClientFinder.
func (f quickMatchClientFinder) findClient(clientID, ip string) (c *Client) {
func (f quickMatchClientFinder) findClient(
ctx context.Context,
logger *slog.Logger,
clientID string,
ip string,
) (c *Client) {
var err error
c, err = f.client(clientID, ip, f.cache)
if err != nil {
log.Error(
"querylog: enriching file record for quick search: for client %q (clientid %q): %s",
ip,
clientID,
err,
logger.ErrorContext(
ctx,
"enriching file record for quick search",
"client_ip", ip,
"client_id", clientID,
slogutil.KeyError, err,
)
}
@@ -286,6 +312,7 @@ func (f quickMatchClientFinder) findClient(clientID, ip string) (c *Client) {
// the entry doesn't match the search criteria. ts is the timestamp of the
// processed entry.
func (l *queryLog) readNextEntry(
ctx context.Context,
r *qLogReader,
params *searchParams,
cache clientCache,
@@ -301,14 +328,14 @@ func (l *queryLog) readNextEntry(
cache: cache,
}
if !params.quickMatch(line, clientFinder.findClient) {
ts = readQLogTimestamp(line)
if !params.quickMatch(ctx, l.logger, line, clientFinder.findClient) {
ts = readQLogTimestamp(ctx, l.logger, line)
return nil, ts, nil
}
e = &logEntry{}
decodeLogEntry(e, line)
l.decodeLogEntry(ctx, e, line)
if l.isIgnored(e.QHost) {
return nil, ts, nil
@@ -316,12 +343,13 @@ func (l *queryLog) readNextEntry(
e.client, err = l.client(e.ClientID, e.IP.String(), cache)
if err != nil {
log.Error(
"querylog: enriching file record at time %s for client %q (clientid %q): %s",
e.Time,
e.IP,
e.ClientID,
err,
l.logger.ErrorContext(
ctx,
"enriching file record",
"at", e.Time,
"client_ip", e.IP,
"client_id", e.ClientID,
slogutil.KeyError, err,
)
// Go on and try to match anyway.