*(querylog): added offset/limit parameters

Actually, this is a serious refactoring of the query log module. The
rest API part is refactored, it's now more clear how the search is
conducted. Split the logic into more files and added more tests.

Closes: https://github.com/AdguardTeam/AdGuardHome/issues/1559
This commit is contained in:
Andrey Meshkov
2020-05-26 15:37:37 +03:00
parent 6f3cd4e7eb
commit ceedc2d1ff
12 changed files with 1013 additions and 776 deletions

View File

@@ -1,18 +1,72 @@
package querylog
import (
"encoding/base64"
"io"
"strconv"
"strings"
"time"
"github.com/AdguardTeam/AdGuardHome/dnsfilter"
"github.com/AdguardTeam/AdGuardHome/util"
"github.com/AdguardTeam/golibs/log"
"github.com/miekg/dns"
)
// search - searches log entries in the query log using specified parameters
// returns the list of entries found + time of the oldest entry
func (l *queryLog) search(params *searchParams) ([]*logEntry, time.Time) {
now := time.Now()
if params.limit == 0 {
return []*logEntry{}, time.Time{}
}
// add from file
fileEntries, oldest, total := l.searchFiles(params)
// add from memory buffer
l.bufferLock.Lock()
total += len(l.buffer)
memoryEntries := make([]*logEntry, 0)
// go through the buffer in the reverse order
// from NEWER to OLDER
for i := len(l.buffer) - 1; i >= 0; i-- {
entry := l.buffer[i]
if !params.match(entry) {
continue
}
memoryEntries = append(memoryEntries, entry)
}
l.bufferLock.Unlock()
// limits
totalLimit := params.offset + params.limit
// now let's get a unified collection
entries := append(memoryEntries, fileEntries...)
if len(entries) > totalLimit {
// remove extra records
entries = entries[:totalLimit]
}
if params.offset > 0 {
if len(entries) > params.offset {
entries = entries[params.offset:]
} else {
entries = make([]*logEntry, 0)
oldest = time.Time{}
}
}
if len(entries) == totalLimit {
// change the "oldest" value here.
// we cannot use the "oldest" we got from "searchFiles" anymore
// because after adding in-memory records and removing extra records
// the situation has changed
oldest = entries[len(entries)-1].Time
}
log.Debug("QueryLog: prepared data (%d/%d) older than %s in %s",
len(entries), total, params.olderThan, time.Since(now))
return entries, oldest
}
// searchFiles reads log entries from all log files and applies the specified search criteria.
// IMPORTANT: this method does not scan more than "maxSearchEntries" so you
// may need to call it many times.
@@ -21,7 +75,7 @@ import (
// * an array of log entries that we have read
// * time of the oldest processed entry (even if it was discarded)
// * total number of processed entries (including discarded).
func (l *queryLog) searchFiles(params getDataParams) ([]*logEntry, time.Time, int) {
func (l *queryLog) searchFiles(params *searchParams) ([]*logEntry, time.Time, int) {
entries := make([]*logEntry, 0)
oldest := time.Time{}
@@ -32,10 +86,10 @@ func (l *queryLog) searchFiles(params getDataParams) ([]*logEntry, time.Time, in
}
defer r.Close()
if params.OlderThan.IsZero() {
if params.olderThan.IsZero() {
err = r.SeekStart()
} else {
err = r.Seek(params.OlderThan.UnixNano())
err = r.Seek(params.olderThan.UnixNano())
if err == nil {
// Read to the next record right away
// The one that was specified in the "oldest" param is not needed,
@@ -45,14 +99,17 @@ func (l *queryLog) searchFiles(params getDataParams) ([]*logEntry, time.Time, in
}
if err != nil {
log.Debug("Cannot Seek() to %v: %v", params.OlderThan, err)
log.Debug("Cannot Seek() to %v: %v", params.olderThan, err)
return entries, oldest, 0
}
totalLimit := params.offset + params.limit
total := 0
oldestNano := int64(0)
// Do not scan more than 50k at once
for total <= maxSearchEntries {
// By default, we do not scan more than "maxFileScanEntries" at once
// The idea is to make search calls faster so that the UI could handle it and show something
// This behavior can be overridden if "maxFileScanEntries" is set to 0
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
entry, ts, err := l.readNextEntry(r, params)
if err == io.EOF {
@@ -65,8 +122,8 @@ func (l *queryLog) searchFiles(params getDataParams) ([]*logEntry, time.Time, in
if entry != nil {
entries = append(entries, entry)
if len(entries) == getDataLimit {
// Do not read more than "getDataLimit" records at once
if len(entries) == totalLimit {
// Do not read more than "totalLimit" records at once
break
}
}
@@ -82,7 +139,7 @@ func (l *queryLog) searchFiles(params getDataParams) ([]*logEntry, time.Time, in
// * log entry that matches search criteria or null if it was discarded (or if there's nothing to read)
// * timestamp of the processed log entry
// * error if we can't read anymore
func (l *queryLog) readNextEntry(r *QLogReader, params getDataParams) (*logEntry, int64, error) {
func (l *queryLog) readNextEntry(r *QLogReader, params *searchParams) (*logEntry, int64, error) {
line, err := r.ReadNext()
if err != nil {
return nil, 0, err
@@ -92,7 +149,7 @@ func (l *queryLog) readNextEntry(r *QLogReader, params getDataParams) (*logEntry
timestamp := readQLogTimestamp(line)
// Quick check without deserializing log entry
if !quickMatchesGetDataParams(line, params) {
if !params.quickMatch(line) {
return nil, timestamp, nil
}
@@ -100,7 +157,7 @@ func (l *queryLog) readNextEntry(r *QLogReader, params getDataParams) (*logEntry
decodeLogEntry(&entry, line)
// Full check of the deserialized log entry
if !matchesGetDataParams(&entry, params) {
if !params.match(&entry) {
return nil, timestamp, nil
}
@@ -120,257 +177,3 @@ func (l *queryLog) openReader() (*QLogReader, error) {
return NewQLogReader(files)
}
// quickMatchesGetDataParams - quickly checks if the line matches getDataParams
// this method does not guarantee anything and the reason is to do a quick check
// without deserializing anything
func quickMatchesGetDataParams(line string, params getDataParams) bool {
if params.ResponseStatus == responseStatusFiltered {
boolVal, ok := readJSONBool(line, "IsFiltered")
if !ok || !boolVal {
return false
}
}
if len(params.Domain) != 0 {
val := readJSONValue(line, "QH")
if len(val) == 0 {
return false
}
if (params.StrictMatchDomain && val != params.Domain) ||
(!params.StrictMatchDomain && strings.Index(val, params.Domain) == -1) {
return false
}
}
if len(params.QuestionType) != 0 {
val := readJSONValue(line, "QT")
if val != params.QuestionType {
return false
}
}
if len(params.Client) != 0 {
val := readJSONValue(line, "IP")
if len(val) == 0 {
log.Debug("QueryLog: failed to decodeLogEntry")
return false
}
if (params.StrictMatchClient && val != params.Client) ||
(!params.StrictMatchClient && strings.Index(val, params.Client) == -1) {
return false
}
}
return true
}
// matchesGetDataParams - returns true if the entry matches the search parameters
func matchesGetDataParams(entry *logEntry, params getDataParams) bool {
if params.ResponseStatus == responseStatusFiltered && !entry.Result.IsFiltered {
return false
}
if len(params.QuestionType) != 0 {
if entry.QType != params.QuestionType {
return false
}
}
if len(params.Domain) != 0 {
if (params.StrictMatchDomain && entry.QHost != params.Domain) ||
(!params.StrictMatchDomain && strings.Index(entry.QHost, params.Domain) == -1) {
return false
}
}
if len(params.Client) != 0 {
if (params.StrictMatchClient && entry.IP != params.Client) ||
(!params.StrictMatchClient && strings.Index(entry.IP, params.Client) == -1) {
return false
}
}
return true
}
// decodeLogEntry - decodes query log entry from a line
// nolint (gocyclo)
func decodeLogEntry(ent *logEntry, str string) {
var b bool
var i int
var err error
for {
k, v, t := readJSON(&str)
if t == jsonTErr {
break
}
switch k {
case "IP":
if len(ent.IP) == 0 {
ent.IP = v
}
case "T":
ent.Time, err = time.Parse(time.RFC3339, v)
case "QH":
ent.QHost = v
case "QT":
ent.QType = v
case "QC":
ent.QClass = v
case "Answer":
ent.Answer, err = base64.StdEncoding.DecodeString(v)
case "OrigAnswer":
ent.OrigAnswer, err = base64.StdEncoding.DecodeString(v)
case "IsFiltered":
b, err = strconv.ParseBool(v)
ent.Result.IsFiltered = b
case "Rule":
ent.Result.Rule = v
case "FilterID":
i, err = strconv.Atoi(v)
ent.Result.FilterID = int64(i)
case "Reason":
i, err = strconv.Atoi(v)
ent.Result.Reason = dnsfilter.Reason(i)
case "Upstream":
ent.Upstream = v
case "Elapsed":
i, err = strconv.Atoi(v)
ent.Elapsed = time.Duration(i)
// pre-v0.99.3 compatibility:
case "Question":
var qstr []byte
qstr, err = base64.StdEncoding.DecodeString(v)
if err != nil {
break
}
q := new(dns.Msg)
err = q.Unpack(qstr)
if err != nil {
break
}
ent.QHost = q.Question[0].Name
if len(ent.QHost) == 0 {
break
}
ent.QHost = ent.QHost[:len(ent.QHost)-1]
ent.QType = dns.TypeToString[q.Question[0].Qtype]
ent.QClass = dns.ClassToString[q.Question[0].Qclass]
case "Time":
ent.Time, err = time.Parse(time.RFC3339, v)
}
if err != nil {
log.Debug("decodeLogEntry err: %s", err)
break
}
}
}
// Get bool value from "key":bool
func readJSONBool(s, name string) (bool, bool) {
i := strings.Index(s, "\""+name+"\":")
if i == -1 {
return false, false
}
start := i + 1 + len(name) + 2
b := false
if strings.HasPrefix(s[start:], "true") {
b = true
} else if !strings.HasPrefix(s[start:], "false") {
return false, false
}
return b, true
}
// Get value from "key":"value"
func readJSONValue(s, name string) string {
i := strings.Index(s, "\""+name+"\":\"")
if i == -1 {
return ""
}
start := i + 1 + len(name) + 3
i = strings.IndexByte(s[start:], '"')
if i == -1 {
return ""
}
end := start + i
return s[start:end]
}
const (
jsonTErr = iota
jsonTObj
jsonTStr
jsonTNum
jsonTBool
)
// Parse JSON key-value pair
// e.g.: "key":VALUE where VALUE is "string", true|false (boolean), or 123.456 (number)
// Note the limitations:
// . doesn't support whitespace
// . doesn't support "null"
// . doesn't validate boolean or number
// . no proper handling of {} braces
// . no handling of [] brackets
// Return (key, value, type)
func readJSON(ps *string) (string, string, int32) {
s := *ps
k := ""
v := ""
t := int32(jsonTErr)
q1 := strings.IndexByte(s, '"')
if q1 == -1 {
return k, v, t
}
q2 := strings.IndexByte(s[q1+1:], '"')
if q2 == -1 {
return k, v, t
}
k = s[q1+1 : q1+1+q2]
s = s[q1+1+q2+1:]
if len(s) < 2 || s[0] != ':' {
return k, v, t
}
if s[1] == '"' {
q2 = strings.IndexByte(s[2:], '"')
if q2 == -1 {
return k, v, t
}
v = s[2 : 2+q2]
t = jsonTStr
s = s[2+q2+1:]
} else if s[1] == '{' {
t = jsonTObj
s = s[1+1:]
} else {
sep := strings.IndexAny(s[1:], ",}")
if sep == -1 {
return k, v, t
}
v = s[1 : 1+sep]
if s[1] == 't' || s[1] == 'f' {
t = jsonTBool
} else if s[1] == '.' || (s[1] >= '0' && s[1] <= '9') {
t = jsonTNum
}
s = s[1+sep+1:]
}
*ps = s
return k, v, t
}