all: sync with master; upd chlog
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
package querylog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
@@ -134,84 +136,112 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
|
||||
return entries, oldest
|
||||
}
|
||||
|
||||
// searchFiles looks up log records from all log files. It optionally uses the
|
||||
// client cache, if provided. searchFiles does not scan more than
|
||||
// maxFileScanEntries so callers may need to call it several times to get all
|
||||
// results. oldest and total are the time of the oldest processed entry and the
|
||||
// total number of processed entries, including discarded ones, correspondingly.
|
||||
func (l *queryLog) searchFiles(
|
||||
params *searchParams,
|
||||
cache clientCache,
|
||||
) (entries []*logEntry, oldest time.Time, total int) {
|
||||
// seekRecord changes the current position to the next record older than the
|
||||
// provided parameter.
|
||||
func (r *qLogReader) seekRecord(olderThan time.Time) (err error) {
|
||||
if olderThan.IsZero() {
|
||||
return r.SeekStart()
|
||||
}
|
||||
|
||||
err = r.seekTS(olderThan.UnixNano())
|
||||
if err == nil {
|
||||
// Read to the next record, because we only need the one that goes
|
||||
// after it.
|
||||
_, err = r.ReadNext()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// setQLogReader creates a reader with the specified files and sets the
|
||||
// position to the next record older than the provided parameter.
|
||||
func (l *queryLog) setQLogReader(olderThan time.Time) (qr *qLogReader, err error) {
|
||||
files := []string{
|
||||
l.logFile + ".1",
|
||||
l.logFile,
|
||||
}
|
||||
|
||||
r, err := NewQLogReader(files)
|
||||
r, err := newQLogReader(files)
|
||||
if err != nil {
|
||||
log.Error("querylog: opening qlog reader: %s", err)
|
||||
|
||||
return entries, oldest, 0
|
||||
}
|
||||
|
||||
defer func() {
|
||||
closeErr := r.Close()
|
||||
if closeErr != nil {
|
||||
log.Error("querylog: closing file: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if params.olderThan.IsZero() {
|
||||
err = r.SeekStart()
|
||||
} else {
|
||||
err = r.seekTS(params.olderThan.UnixNano())
|
||||
if err == nil {
|
||||
// Read to the next record, because we only need the one that goes
|
||||
// after it.
|
||||
_, err = r.ReadNext()
|
||||
}
|
||||
return nil, fmt.Errorf("opening qlog reader: %s", err)
|
||||
}
|
||||
|
||||
err = r.seekRecord(olderThan)
|
||||
if err != nil {
|
||||
log.Debug("querylog: cannot seek to %s: %s", params.olderThan, err)
|
||||
defer func() { err = errors.WithDeferred(err, r.Close()) }()
|
||||
log.Debug("querylog: cannot seek to %s: %s", olderThan, err)
|
||||
|
||||
return entries, oldest, 0
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
totalLimit := params.offset + params.limit
|
||||
oldestNano := int64(0)
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// By default, we do not scan more than maxFileScanEntries at once. The
|
||||
// idea is to make search calls faster so that the UI could handle it and
|
||||
// show something quicker. This behavior can be overridden if
|
||||
// maxFileScanEntries is set to 0.
|
||||
// readEntries reads entries from the reader to totalLimit. By default, we do
|
||||
// not scan more than maxFileScanEntries at once. The idea is to make search
|
||||
// calls faster so that the UI could handle it and show something quicker.
|
||||
// This behavior can be overridden if maxFileScanEntries is set to 0.
|
||||
func (l *queryLog) readEntries(
|
||||
r *qLogReader,
|
||||
params *searchParams,
|
||||
cache clientCache,
|
||||
totalLimit int,
|
||||
) (entries []*logEntry, oldestNano int64, total int) {
|
||||
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
|
||||
var e *logEntry
|
||||
var ts int64
|
||||
|
||||
e, ts, err = l.readNextEntry(r, params, cache)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
ent, ts, rErr := l.readNextEntry(r, params, cache)
|
||||
if rErr != nil {
|
||||
if rErr == io.EOF {
|
||||
oldestNano = 0
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
log.Error("querylog: reading next entry: %s", err)
|
||||
log.Error("querylog: reading next entry: %s", rErr)
|
||||
}
|
||||
|
||||
oldestNano = ts
|
||||
total++
|
||||
|
||||
if e != nil {
|
||||
entries = append(entries, e)
|
||||
if len(entries) == totalLimit {
|
||||
break
|
||||
}
|
||||
if ent == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
entries = append(entries, ent)
|
||||
if len(entries) == totalLimit {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return entries, oldestNano, total
|
||||
}
|
||||
|
||||
// searchFiles looks up log records from all log files. It optionally uses the
|
||||
// client cache, if provided. searchFiles does not scan more than
|
||||
// maxFileScanEntries so callers may need to call it several times to get all
|
||||
// the results. oldest and total are the time of the oldest processed entry
|
||||
// and the total number of processed entries, including discarded ones,
|
||||
// correspondingly.
|
||||
func (l *queryLog) searchFiles(
|
||||
params *searchParams,
|
||||
cache clientCache,
|
||||
) (entries []*logEntry, oldest time.Time, total int) {
|
||||
r, err := l.setQLogReader(params.olderThan)
|
||||
if err != nil {
|
||||
log.Error("querylog: %s", err)
|
||||
}
|
||||
|
||||
if r == nil {
|
||||
return entries, oldest, 0
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if closeErr := r.Close(); closeErr != nil {
|
||||
log.Error("querylog: closing file: %s", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
totalLimit := params.offset + params.limit
|
||||
entries, oldestNano, total := l.readEntries(r, params, cache, totalLimit)
|
||||
if oldestNano != 0 {
|
||||
oldest = time.Unix(0, oldestNano)
|
||||
}
|
||||
@@ -243,11 +273,11 @@ func (f quickMatchClientFinder) findClient(clientID, ip string) (c *Client) {
|
||||
}
|
||||
|
||||
// readNextEntry reads the next log entry and checks if it matches the search
|
||||
// criteria. It optionally uses the client cache, if provided. e is nil if the
|
||||
// entry doesn't match the search criteria. ts is the timestamp of the
|
||||
// criteria. It optionally uses the client cache, if provided. e is nil if
|
||||
// the entry doesn't match the search criteria. ts is the timestamp of the
|
||||
// processed entry.
|
||||
func (l *queryLog) readNextEntry(
|
||||
r *QLogReader,
|
||||
r *qLogReader,
|
||||
params *searchParams,
|
||||
cache clientCache,
|
||||
) (e *logEntry, ts int64, err error) {
|
||||
|
||||
Reference in New Issue
Block a user