all: sync with master
This commit is contained in:
@@ -196,7 +196,7 @@ func newLogEntry(params *AddParams) (entry *logEntry) {
|
||||
// Add implements the [QueryLog] interface for *queryLog.
|
||||
func (l *queryLog) Add(params *AddParams) {
|
||||
var isEnabled, fileIsEnabled bool
|
||||
var memSize int
|
||||
var memSize uint
|
||||
func() {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
@@ -205,7 +205,7 @@ func (l *queryLog) Add(params *AddParams) {
|
||||
memSize = l.conf.MemSize
|
||||
}()
|
||||
|
||||
if !isEnabled || memSize == 0 {
|
||||
if !isEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -230,6 +230,7 @@ func (l *queryLog) Add(params *AddParams) {
|
||||
if !l.flushPending && fileIsEnabled && l.buffer.Len() >= memSize {
|
||||
l.flushPending = true
|
||||
|
||||
// TODO(s.chzhen): Fix occasional rewrite of entires.
|
||||
go func() {
|
||||
flushErr := l.flushLogBuffer()
|
||||
if flushErr != nil {
|
||||
|
||||
@@ -131,7 +131,6 @@ func (r *qLogReader) ReadNext() (string, error) {
|
||||
|
||||
// Set its position to the start right away.
|
||||
_, err = q.SeekStart()
|
||||
|
||||
// This is unexpected, return an error right away.
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
@@ -63,7 +63,7 @@ type Config struct {
|
||||
|
||||
// MemSize is the number of entries kept in a memory buffer before they are
|
||||
// flushed to disk.
|
||||
MemSize int
|
||||
MemSize uint
|
||||
|
||||
// Enabled tells if the query log is enabled.
|
||||
Enabled bool
|
||||
@@ -143,14 +143,17 @@ func newQueryLog(conf Config) (l *queryLog, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
if conf.MemSize < 0 {
|
||||
return nil, errors.Error("memory size must be greater or equal to zero")
|
||||
memSize := conf.MemSize
|
||||
if memSize == 0 {
|
||||
// If query log is enabled, we still need to write entries to a file.
|
||||
// And all writing goes through a buffer.
|
||||
memSize = 1
|
||||
}
|
||||
|
||||
l = &queryLog{
|
||||
findClient: findClient,
|
||||
|
||||
buffer: aghalg.NewRingBuffer[*logEntry](conf.MemSize),
|
||||
buffer: aghalg.NewRingBuffer[*logEntry](memSize),
|
||||
|
||||
conf: &Config{},
|
||||
confMu: &sync.RWMutex{},
|
||||
|
||||
@@ -47,7 +47,14 @@ func (l *queryLog) client(clientID, ip string, cache clientCache) (c *Client, er
|
||||
// searchMemory looks up log records which are currently in the in-memory
|
||||
// buffer. It optionally uses the client cache, if provided. It also returns
|
||||
// the total amount of records in the buffer at the moment of searching.
|
||||
// l.confMu is expected to be locked.
|
||||
func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entries []*logEntry, total int) {
|
||||
// We use this configuration check because a buffer can contain a single log
|
||||
// record. See [newQueryLog].
|
||||
if l.conf.MemSize == 0 {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
l.bufferLock.Lock()
|
||||
defer l.bufferLock.Unlock()
|
||||
|
||||
@@ -73,11 +80,12 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie
|
||||
return true
|
||||
})
|
||||
|
||||
return entries, l.buffer.Len()
|
||||
return entries, int(l.buffer.Len())
|
||||
}
|
||||
|
||||
// search - searches log entries in the query log using specified parameters
|
||||
// returns the list of entries found + time of the oldest entry
|
||||
// search searches log entries in memory buffer and log file using specified
|
||||
// parameters and returns the list of entries found and the time of the oldest
|
||||
// entry. l.confMu is expected to be locked.
|
||||
func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest time.Time) {
|
||||
start := time.Now()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user