Pull request: querylog: search clients by name, enrich http resp
Updates #1273. Squashed commit of the following: commit 55b78153b1b775c855e759011141bbbe6d4b962c Author: Artem Baskal <a.baskal@adguard.com> Date: Fri Apr 2 16:55:39 2021 +0300 Update client_info in case of null commit 5c80c1438ed9d961af11617831b704d6ae15cc34 Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Fri Apr 2 16:24:14 2021 +0300 querylog: always set client_info commit b48efd64d757cc0bcf5b34de22fdd0b0464d98a6 Merge: 4ed7eab523c9f528Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Fri Apr 2 16:22:08 2021 +0300 Merge branch 'master' into 1273-querylog-client-name commit 4ed7eab52b6b5b0c0ddb5aa5a3225a62d1f9265b Merge: dbf990eb70d4c70eAuthor: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Fri Apr 2 12:57:17 2021 +0300 Merge branch 'master' into 1273-querylog-client-name commit dbf990eb881116754554270e7b691b5db8e9ee34 Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Fri Apr 2 12:56:13 2021 +0300 home: imp names commit c2cfdef494ca26fff62b9fa008f1b389d9d4d46b Author: Artem Baskal <a.baskal@adguard.com> Date: Thu Apr 1 19:26:04 2021 +0300 Rename to whois commit e3cc4a68ee576770b1922680155308e33bed31e8 Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Thu Apr 1 19:03:42 2021 +0300 home: imp whois more commit 3b8ef8691c298aff35946b35923ef2e5b1f9bbbe Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Thu Apr 1 18:51:14 2021 +0300 home: imp whois resp commit fb97e0d74976723a512d6ff4c69e830fe59c8df8 Author: Artem Baskal <a.baskal@adguard.com> Date: Thu Apr 1 18:00:03 2021 +0300 Fix client_info ids prop types commit 298005189e372651ceff453e88aca19ee925a138 Author: Artem Baskal <a.baskal@adguard.com> Date: Thu Apr 1 17:58:14 2021 +0300 Adapt changes on client commit aa1769f64197d865478a66271da483babfc5dfd0 Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Thu Apr 1 17:18:36 2021 +0300 all: add more fields to querylog client commit 4b2a2dbd380ec410f3068d15ea16430912e03e33 Merge: cda92c3f2e4e2f62Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Thu Apr 1 16:57:26 2021 +0300 Merge branch 'master' into 1273-querylog-client-name commit cda92c3f0331cbac252f3163d31457f716bc7f2c Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Mon Mar 29 18:03:51 2021 +0300 querylog: fix windows tests commit 5a56f0a32608869ed93a38f18f63ea3a20f7bde2 Merge: 627e4958e710ce11Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Mon Mar 29 17:45:53 2021 +0300 Merge branch 'master' into 1273-querylog-client-name commit 627e495828e82d44cc77aa393536479f23cc68b7 Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Mon Mar 29 17:44:49 2021 +0300 querylog: add tests, imp code, docs commit 6dec468a2f0c29357875ff99458e0e8f8e580e6d Author: Ainar Garipov <A.Garipov@AdGuard.COM> Date: Fri Mar 26 16:10:47 2021 +0300 querylog: search clients by name, enrich http resp
This commit is contained in:
33
internal/querylog/client.go
Normal file
33
internal/querylog/client.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package querylog
|
||||
|
||||
// Client is the information required by the query log to match against clients
|
||||
// during searches.
|
||||
type Client struct {
|
||||
Name string `json:"name"`
|
||||
DisallowedRule string `json:"disallowed_rule"`
|
||||
Whois *ClientWhois `json:"whois,omitempty"`
|
||||
IDs []string `json:"ids"`
|
||||
Disallowed bool `json:"disallowed"`
|
||||
}
|
||||
|
||||
// ClientWhois is the filtered WHOIS data for the client.
|
||||
//
|
||||
// TODO(a.garipov): Merge with home.RuntimeClientWhoisInfo after the
|
||||
// refactoring is done.
|
||||
type ClientWhois struct {
|
||||
City string `json:"city,omitempty"`
|
||||
Country string `json:"country,omitempty"`
|
||||
Orgname string `json:"orgname,omitempty"`
|
||||
}
|
||||
|
||||
// clientCacheKey is the key by which a cached client information is found.
|
||||
type clientCacheKey struct {
|
||||
clientID string
|
||||
ip string
|
||||
}
|
||||
|
||||
// clientCache is the cache of client information found throughout a request to
|
||||
// the query log API. It is used both to speed up the lookup, as well as to
|
||||
// make sure that changes in client data between two lookups don't create
|
||||
// discrepancies in our response.
|
||||
type clientCache map[clientCacheKey]*Client
|
||||
@@ -68,7 +68,7 @@ func (l *queryLog) handleQueryLogClear(_ http.ResponseWriter, _ *http.Request) {
|
||||
func (l *queryLog) handleQueryLogInfo(w http.ResponseWriter, r *http.Request) {
|
||||
resp := qlogConfig{}
|
||||
resp.Enabled = l.conf.Enabled
|
||||
resp.Interval = l.conf.Interval
|
||||
resp.Interval = l.conf.RotationIvl
|
||||
resp.AnonymizeClientIP = l.conf.AnonymizeClientIP
|
||||
|
||||
jsonVal, err := json.Marshal(resp)
|
||||
@@ -104,7 +104,7 @@ func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request)
|
||||
conf.Enabled = d.Enabled
|
||||
}
|
||||
if req.Exists("interval") {
|
||||
conf.Interval = d.Interval
|
||||
conf.RotationIvl = d.Interval
|
||||
}
|
||||
if req.Exists("anonymize_client_ip") {
|
||||
conf.AnonymizeClientIP = d.AnonymizeClientIP
|
||||
|
||||
@@ -71,6 +71,7 @@ func (l *queryLog) logEntryToJSONEntry(entry *logEntry) (jsonEntry jobject) {
|
||||
"elapsedMs": strconv.FormatFloat(entry.Elapsed.Seconds()*1000, 'f', -1, 64),
|
||||
"time": entry.Time.Format(time.RFC3339Nano),
|
||||
"client": l.getClientIP(entry.IP),
|
||||
"client_info": entry.client,
|
||||
"client_proto": entry.ClientProto,
|
||||
"upstream": entry.Upstream,
|
||||
"question": jobject{
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -22,12 +21,17 @@ const (
|
||||
|
||||
// queryLog is a structure that writes and reads the DNS query log
|
||||
type queryLog struct {
|
||||
findClient func(ids []string) (c *Client, err error)
|
||||
|
||||
conf *Config
|
||||
lock sync.Mutex
|
||||
logFile string // path to the log file
|
||||
|
||||
bufferLock sync.RWMutex
|
||||
buffer []*logEntry
|
||||
// bufferLock protects buffer.
|
||||
bufferLock sync.RWMutex
|
||||
// buffer contains recent log entries.
|
||||
buffer []*logEntry
|
||||
|
||||
fileFlushLock sync.Mutex // synchronize a file-flushing goroutine and main thread
|
||||
flushPending bool // don't start another goroutine while the previous one is still running
|
||||
fileWriteLock sync.Mutex
|
||||
@@ -64,6 +68,9 @@ func NewClientProto(s string) (cp ClientProto, err error) {
|
||||
|
||||
// logEntry - represents a single log entry
|
||||
type logEntry struct {
|
||||
// client is the found client information, if any.
|
||||
client *Client
|
||||
|
||||
IP net.IP `json:"IP"` // Client IP
|
||||
Time time.Time `json:"T"`
|
||||
|
||||
@@ -82,18 +89,6 @@ type logEntry struct {
|
||||
Upstream string `json:",omitempty"` // if empty, means it was cached
|
||||
}
|
||||
|
||||
// create a new instance of the query log
|
||||
func newQueryLog(conf Config) *queryLog {
|
||||
l := queryLog{}
|
||||
l.logFile = filepath.Join(conf.BaseDir, queryLogFileName)
|
||||
l.conf = &Config{}
|
||||
*l.conf = conf
|
||||
if !checkInterval(l.conf.Interval) {
|
||||
l.conf.Interval = 1
|
||||
}
|
||||
return &l
|
||||
}
|
||||
|
||||
func (l *queryLog) Start() {
|
||||
if l.conf.HTTPRegister != nil {
|
||||
l.initWeb()
|
||||
@@ -138,12 +133,16 @@ func (l *queryLog) clear() {
|
||||
}
|
||||
|
||||
func (l *queryLog) Add(params AddParams) {
|
||||
var err error
|
||||
|
||||
if !l.conf.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
if params.Question == nil || len(params.Question.Question) != 1 || len(params.Question.Question[0].Name) == 0 ||
|
||||
params.ClientIP == nil {
|
||||
err = params.validate()
|
||||
if err != nil {
|
||||
log.Error("querylog: adding record: %s, skipping", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -168,20 +167,26 @@ func (l *queryLog) Add(params AddParams) {
|
||||
entry.QClass = dns.Class(q.Qclass).String()
|
||||
|
||||
if params.Answer != nil {
|
||||
a, err := params.Answer.Pack()
|
||||
var a []byte
|
||||
a, err = params.Answer.Pack()
|
||||
if err != nil {
|
||||
log.Info("Querylog: Answer.Pack(): %s", err)
|
||||
log.Error("querylog: Answer.Pack(): %s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
entry.Answer = a
|
||||
}
|
||||
|
||||
if params.OrigAnswer != nil {
|
||||
a, err := params.OrigAnswer.Pack()
|
||||
var a []byte
|
||||
a, err = params.OrigAnswer.Pack()
|
||||
if err != nil {
|
||||
log.Info("Querylog: OrigAnswer.Pack(): %s", err)
|
||||
log.Error("querylog: OrigAnswer.Pack(): %s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
entry.OrigAnswer = a
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ func TestQueryLog(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
FileEnabled: true,
|
||||
Interval: 1,
|
||||
RotationIvl: 1,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
@@ -127,10 +127,10 @@ func TestQueryLog(t *testing.T) {
|
||||
|
||||
func TestQueryLogOffsetLimit(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
Interval: 1,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
Enabled: true,
|
||||
RotationIvl: 1,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
|
||||
const (
|
||||
@@ -202,7 +202,7 @@ func TestQueryLogMaxFileScanEntries(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
FileEnabled: true,
|
||||
Interval: 1,
|
||||
RotationIvl: 1,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
@@ -230,7 +230,7 @@ func TestQueryLogFileDisabled(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
FileEnabled: false,
|
||||
Interval: 1,
|
||||
RotationIvl: 1,
|
||||
MemSize: 2,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
|
||||
@@ -3,9 +3,12 @@ package querylog
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/agherr"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/dnsfilter"
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
"github.com/miekg/dns"
|
||||
)
|
||||
|
||||
@@ -25,18 +28,37 @@ type QueryLog interface {
|
||||
|
||||
// Config - configuration object
|
||||
type Config struct {
|
||||
Enabled bool // enable the module
|
||||
FileEnabled bool // write logs to file
|
||||
BaseDir string // directory where log file is stored
|
||||
Interval uint32 // interval to rotate logs (in days)
|
||||
MemSize uint32 // number of entries kept in memory before they are flushed to disk
|
||||
AnonymizeClientIP bool // anonymize clients' IP addresses
|
||||
|
||||
// Called when the configuration is changed by HTTP request
|
||||
// ConfigModified is called when the configuration is changed, for
|
||||
// example by HTTP requests.
|
||||
ConfigModified func()
|
||||
|
||||
// Register an HTTP handler
|
||||
// HTTPRegister registers an HTTP handler.
|
||||
HTTPRegister func(string, string, func(http.ResponseWriter, *http.Request))
|
||||
|
||||
// FindClient returns client information by their IDs.
|
||||
FindClient func(ids []string) (c *Client, err error)
|
||||
|
||||
// BaseDir is the base directory for log files.
|
||||
BaseDir string
|
||||
|
||||
// RotationIvl is the interval for log rotation, in days. After that
|
||||
// period, the old log file will be renamed, NOT deleted, so the actual
|
||||
// log retention time is twice the interval.
|
||||
RotationIvl uint32
|
||||
|
||||
// MemSize is the number of entries kept in a memory buffer before they
|
||||
// are flushed to disk.
|
||||
MemSize uint32
|
||||
|
||||
// Enabled tells if the query log is enabled.
|
||||
Enabled bool
|
||||
|
||||
// FileEnabled tells if the query log writes logs to files.
|
||||
FileEnabled bool
|
||||
|
||||
// AnonymizeClientIP tells if the query log should anonymize clients' IP
|
||||
// addresses.
|
||||
AnonymizeClientIP bool
|
||||
}
|
||||
|
||||
// AddParams - parameters for Add()
|
||||
@@ -52,7 +74,52 @@ type AddParams struct {
|
||||
ClientProto ClientProto
|
||||
}
|
||||
|
||||
// New - create a new instance of the query log
|
||||
func New(conf Config) QueryLog {
|
||||
// validate returns an error if the parameters aren't valid.
|
||||
func (p *AddParams) validate() (err error) {
|
||||
switch {
|
||||
case p.Question == nil:
|
||||
return agherr.Error("question is nil")
|
||||
case len(p.Question.Question) != 1:
|
||||
return agherr.Error("more than one question")
|
||||
case len(p.Question.Question[0].Name) == 0:
|
||||
return agherr.Error("no host in question")
|
||||
case p.ClientIP == nil:
|
||||
return agherr.Error("no client ip")
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new instance of the query log.
|
||||
func New(conf Config) (ql QueryLog) {
|
||||
return newQueryLog(conf)
|
||||
}
|
||||
|
||||
// newQueryLog crates a new queryLog.
|
||||
func newQueryLog(conf Config) (l *queryLog) {
|
||||
findClient := conf.FindClient
|
||||
if findClient == nil {
|
||||
findClient = func(_ []string) (_ *Client, _ error) {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
l = &queryLog{
|
||||
findClient: findClient,
|
||||
|
||||
logFile: filepath.Join(conf.BaseDir, queryLogFileName),
|
||||
}
|
||||
|
||||
l.conf = &Config{}
|
||||
*l.conf = conf
|
||||
|
||||
if !checkInterval(conf.RotationIvl) {
|
||||
log.Info(
|
||||
"querylog: warning: unsupported rotation interval %d, setting to 1 day",
|
||||
conf.RotationIvl,
|
||||
)
|
||||
l.conf.RotationIvl = 1
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ func (l *queryLog) readFileFirstTimeValue() int64 {
|
||||
}
|
||||
|
||||
func (l *queryLog) periodicRotate() {
|
||||
intervalSeconds := uint64(l.conf.Interval) * 24 * 60 * 60
|
||||
intervalSeconds := uint64(l.conf.RotationIvl) * 24 * 60 * 60
|
||||
for {
|
||||
oldest := l.readFileFirstTimeValue()
|
||||
if uint64(oldest)+intervalSeconds <= uint64(time.Now().Unix()) {
|
||||
|
||||
@@ -8,6 +8,67 @@ import (
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
)
|
||||
|
||||
// client finds the client info, if any, by its client ID and IP address,
|
||||
// optionally checking the provided cache. It will use the IP address
|
||||
// regardless of if the IP anonymization is enabled now, because the
|
||||
// anonymization could have been disabled in the past, and client will try to
|
||||
// find those records as well.
|
||||
func (l *queryLog) client(clientID, ip string, cache clientCache) (c *Client, err error) {
|
||||
cck := clientCacheKey{clientID: clientID, ip: ip}
|
||||
if c = cache[cck]; c != nil {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
var ids []string
|
||||
if clientID != "" {
|
||||
ids = append(ids, clientID)
|
||||
}
|
||||
|
||||
if ip != "" {
|
||||
ids = append(ids, ip)
|
||||
}
|
||||
|
||||
c, err = l.findClient(ids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cache != nil {
|
||||
cache[cck] = c
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// searchMemory looks up log records which are currently in the in-memory
|
||||
// buffer. It optionally uses the client cache, if provided. It also returns
|
||||
// the total amount of records in the buffer at the moment of searching.
|
||||
func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entries []*logEntry, total int) {
|
||||
l.bufferLock.Lock()
|
||||
defer l.bufferLock.Unlock()
|
||||
|
||||
// Go through the buffer in the reverse order, from newer to older.
|
||||
var err error
|
||||
for i := len(l.buffer) - 1; i >= 0; i-- {
|
||||
e := l.buffer[i]
|
||||
|
||||
e.client, err = l.client(e.ClientID, e.IP.String(), cache)
|
||||
if err != nil {
|
||||
msg := "querylog: enriching memory record at time %s" +
|
||||
" for client %q (client id %q): %s"
|
||||
log.Error(msg, e.Time, e.IP, e.ClientID, err)
|
||||
|
||||
// Go on and try to match anyway.
|
||||
}
|
||||
|
||||
if params.match(e) {
|
||||
entries = append(entries, e)
|
||||
}
|
||||
}
|
||||
|
||||
return entries, len(l.buffer)
|
||||
}
|
||||
|
||||
// search - searches log entries in the query log using specified parameters
|
||||
// returns the list of entries found + time of the oldest entry
|
||||
func (l *queryLog) search(params *searchParams) ([]*logEntry, time.Time) {
|
||||
@@ -17,26 +78,11 @@ func (l *queryLog) search(params *searchParams) ([]*logEntry, time.Time) {
|
||||
return []*logEntry{}, time.Time{}
|
||||
}
|
||||
|
||||
// add from file
|
||||
fileEntries, oldest, total := l.searchFiles(params)
|
||||
cache := clientCache{}
|
||||
fileEntries, oldest, total := l.searchFiles(params, cache)
|
||||
memoryEntries, bufLen := l.searchMemory(params, cache)
|
||||
total += bufLen
|
||||
|
||||
// add from memory buffer
|
||||
l.bufferLock.Lock()
|
||||
total += len(l.buffer)
|
||||
memoryEntries := make([]*logEntry, 0)
|
||||
|
||||
// go through the buffer in the reverse order
|
||||
// from NEWER to OLDER
|
||||
for i := len(l.buffer) - 1; i >= 0; i-- {
|
||||
entry := l.buffer[i]
|
||||
if !params.match(entry) {
|
||||
continue
|
||||
}
|
||||
memoryEntries = append(memoryEntries, entry)
|
||||
}
|
||||
l.bufferLock.Unlock()
|
||||
|
||||
// limits
|
||||
totalLimit := params.offset + params.limit
|
||||
|
||||
// now let's get a unified collection
|
||||
@@ -74,18 +120,15 @@ func (l *queryLog) search(params *searchParams) ([]*logEntry, time.Time) {
|
||||
return entries, oldest
|
||||
}
|
||||
|
||||
// searchFiles reads log entries from all log files and applies the specified search criteria.
|
||||
// IMPORTANT: this method does not scan more than "maxSearchEntries" so you
|
||||
// may need to call it many times.
|
||||
//
|
||||
// it returns:
|
||||
// * an array of log entries that we have read
|
||||
// * time of the oldest processed entry (even if it was discarded)
|
||||
// * total number of processed entries (including discarded).
|
||||
func (l *queryLog) searchFiles(params *searchParams) ([]*logEntry, time.Time, int) {
|
||||
entries := make([]*logEntry, 0)
|
||||
oldest := time.Time{}
|
||||
|
||||
// searchFiles looks up log records from all log files. It optionally uses the
|
||||
// client cache, if provided. searchFiles does not scan more than
|
||||
// maxFileScanEntries so callers may need to call it several times to get all
|
||||
// results. oldset and total are the time of the oldest processed entry and the
|
||||
// total number of processed entries, including discarded ones, correspondingly.
|
||||
func (l *queryLog) searchFiles(
|
||||
params *searchParams,
|
||||
cache clientCache,
|
||||
) (entries []*logEntry, oldest time.Time, total int) {
|
||||
files := []string{
|
||||
l.logFile + ".1",
|
||||
l.logFile,
|
||||
@@ -104,40 +147,43 @@ func (l *queryLog) searchFiles(params *searchParams) ([]*logEntry, time.Time, in
|
||||
} else {
|
||||
err = r.SeekTS(params.olderThan.UnixNano())
|
||||
if err == nil {
|
||||
// Read to the next record right away
|
||||
// The one that was specified in the "oldest" param is not needed,
|
||||
// we need only the one next to it
|
||||
// Read to the next record, because we only need the one
|
||||
// that goes after it.
|
||||
_, err = r.ReadNext()
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Debug("Cannot SeekTS() to %v: %v", params.olderThan, err)
|
||||
log.Debug("querylog: cannot seek to %s: %s", params.olderThan, err)
|
||||
|
||||
return entries, oldest, 0
|
||||
}
|
||||
|
||||
totalLimit := params.offset + params.limit
|
||||
total := 0
|
||||
oldestNano := int64(0)
|
||||
// By default, we do not scan more than "maxFileScanEntries" at once
|
||||
// The idea is to make search calls faster so that the UI could handle it and show something
|
||||
// This behavior can be overridden if "maxFileScanEntries" is set to 0
|
||||
|
||||
// By default, we do not scan more than maxFileScanEntries at once.
|
||||
// The idea is to make search calls faster so that the UI could handle
|
||||
// it and show something quicker. This behavior can be overridden if
|
||||
// maxFileScanEntries is set to 0.
|
||||
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
|
||||
var entry *logEntry
|
||||
var e *logEntry
|
||||
var ts int64
|
||||
entry, ts, err = l.readNextEntry(r, params)
|
||||
if err == io.EOF {
|
||||
// there's nothing to read anymore
|
||||
break
|
||||
e, ts, err = l.readNextEntry(r, params, cache)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
log.Error("querylog: reading next entry: %s", err)
|
||||
}
|
||||
|
||||
oldestNano = ts
|
||||
total++
|
||||
|
||||
if entry != nil {
|
||||
entries = append(entries, entry)
|
||||
if e != nil {
|
||||
entries = append(entries, e)
|
||||
if len(entries) == totalLimit {
|
||||
// Do not read more than "totalLimit" records at once
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -146,36 +192,46 @@ func (l *queryLog) searchFiles(params *searchParams) ([]*logEntry, time.Time, in
|
||||
if oldestNano != 0 {
|
||||
oldest = time.Unix(0, oldestNano)
|
||||
}
|
||||
|
||||
return entries, oldest, total
|
||||
}
|
||||
|
||||
// readNextEntry - reads the next log entry and checks if it matches the search criteria (getDataParams)
|
||||
//
|
||||
// returns:
|
||||
// * log entry that matches search criteria or null if it was discarded (or if there's nothing to read)
|
||||
// * timestamp of the processed log entry
|
||||
// * error if we can't read anymore
|
||||
func (l *queryLog) readNextEntry(r *QLogReader, params *searchParams) (*logEntry, int64, error) {
|
||||
line, err := r.ReadNext()
|
||||
// readNextEntry reads the next log entry and checks if it matches the search
|
||||
// criteria. It optionally uses the client cache, if provided. e is nil if the
|
||||
// entry doesn't match the search criteria. ts is the timestamp of the
|
||||
// processed entry.
|
||||
func (l *queryLog) readNextEntry(
|
||||
r *QLogReader,
|
||||
params *searchParams,
|
||||
cache clientCache,
|
||||
) (e *logEntry, ts int64, err error) {
|
||||
var line string
|
||||
line, err = r.ReadNext()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Read the log record timestamp right away
|
||||
timestamp := readQLogTimestamp(line)
|
||||
e = &logEntry{}
|
||||
decodeLogEntry(e, line)
|
||||
|
||||
// Quick check without deserializing log entry
|
||||
if !params.quickMatch(line) {
|
||||
return nil, timestamp, nil
|
||||
e.client, err = l.client(e.ClientID, e.IP.String(), cache)
|
||||
if err != nil {
|
||||
log.Error(
|
||||
"querylog: enriching file record at time %s"+
|
||||
" for client %q (client id %q): %s",
|
||||
e.Time,
|
||||
e.IP,
|
||||
e.ClientID,
|
||||
err,
|
||||
)
|
||||
|
||||
// Go on and try to match anyway.
|
||||
}
|
||||
|
||||
entry := logEntry{}
|
||||
decodeLogEntry(&entry, line)
|
||||
|
||||
// Full check of the deserialized log entry
|
||||
if !params.match(&entry) {
|
||||
return nil, timestamp, nil
|
||||
ts = e.Time.UnixNano()
|
||||
if !params.match(e) {
|
||||
return nil, ts, nil
|
||||
}
|
||||
|
||||
return &entry, timestamp, nil
|
||||
return e, ts, nil
|
||||
}
|
||||
|
||||
95
internal/querylog/search_test.go
Normal file
95
internal/querylog/search_test.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package querylog
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestQueryLog_Search_findClient(t *testing.T) {
|
||||
const knownClientID = "client-1"
|
||||
const knownClientName = "Known Client 1"
|
||||
const unknownClientID = "client-2"
|
||||
|
||||
knownClient := &Client{
|
||||
IDs: []string{knownClientID},
|
||||
Name: knownClientName,
|
||||
}
|
||||
|
||||
findClientCalls := 0
|
||||
findClient := func(ids []string) (c *Client, _ error) {
|
||||
defer func() { findClientCalls++ }()
|
||||
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if ids[0] == knownClientID {
|
||||
return knownClient, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
l := newQueryLog(Config{
|
||||
FindClient: findClient,
|
||||
BaseDir: t.TempDir(),
|
||||
RotationIvl: 1,
|
||||
MemSize: 100,
|
||||
Enabled: true,
|
||||
FileEnabled: true,
|
||||
AnonymizeClientIP: false,
|
||||
})
|
||||
t.Cleanup(l.Close)
|
||||
|
||||
q := &dns.Msg{
|
||||
Question: []dns.Question{{
|
||||
Name: "example.com",
|
||||
}},
|
||||
}
|
||||
|
||||
l.Add(AddParams{
|
||||
Question: q,
|
||||
ClientID: knownClientID,
|
||||
ClientIP: net.IP{1, 2, 3, 4},
|
||||
})
|
||||
|
||||
// Add the same thing again to test the cache.
|
||||
l.Add(AddParams{
|
||||
Question: q,
|
||||
ClientID: knownClientID,
|
||||
ClientIP: net.IP{1, 2, 3, 4},
|
||||
})
|
||||
|
||||
l.Add(AddParams{
|
||||
Question: q,
|
||||
ClientID: unknownClientID,
|
||||
ClientIP: net.IP{1, 2, 3, 5},
|
||||
})
|
||||
|
||||
sp := &searchParams{
|
||||
// Add some time to the "current" one to protect against
|
||||
// low-resolution timers on some Windows machines.
|
||||
//
|
||||
// TODO(a.garipov): Use some kind of timeSource interface
|
||||
// instead of relying on time.Now() in tests.
|
||||
olderThan: time.Now().Add(10 * time.Second),
|
||||
limit: 3,
|
||||
}
|
||||
entries, _ := l.search(sp)
|
||||
assert.Equal(t, 2, findClientCalls)
|
||||
|
||||
require.Len(t, entries, 3)
|
||||
|
||||
assert.Nil(t, entries[0].client)
|
||||
|
||||
gotClient := entries[2].client
|
||||
require.NotNil(t, gotClient)
|
||||
|
||||
assert.Equal(t, knownClientName, gotClient.Name)
|
||||
assert.Equal(t, []string{knownClientID}, gotClient.IDs)
|
||||
}
|
||||
@@ -48,40 +48,6 @@ type searchCriteria struct {
|
||||
strict bool // should we strictly match (equality) or not (indexOf)
|
||||
}
|
||||
|
||||
// quickMatch - quickly checks if the log entry matches this search criteria
|
||||
// the reason is to do it as quickly as possible without de-serializing the entry
|
||||
func (c *searchCriteria) quickMatch(line string) bool {
|
||||
// note that we do this only for a limited set of criteria
|
||||
|
||||
switch c.criteriaType {
|
||||
case ctDomainOrClient:
|
||||
return c.quickMatchJSONValue(line, "QH") ||
|
||||
c.quickMatchJSONValue(line, "IP") ||
|
||||
c.quickMatchJSONValue(line, "CID")
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// quickMatchJSONValue - helper used by quickMatch
|
||||
func (c *searchCriteria) quickMatchJSONValue(line, propertyName string) bool {
|
||||
val := readJSONValue(line, propertyName)
|
||||
if len(val) == 0 {
|
||||
return false
|
||||
}
|
||||
val = strings.ToLower(val)
|
||||
searchVal := strings.ToLower(c.value)
|
||||
|
||||
if c.strict && searchVal == val {
|
||||
return true
|
||||
}
|
||||
if !c.strict && strings.Contains(val, searchVal) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// match - checks if the log entry matches this search criteria
|
||||
func (c *searchCriteria) match(entry *logEntry) bool {
|
||||
switch c.criteriaType {
|
||||
@@ -94,28 +60,41 @@ func (c *searchCriteria) match(entry *logEntry) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *searchCriteria) ctDomainOrClientCase(entry *logEntry) bool {
|
||||
clientID := strings.ToLower(entry.ClientID)
|
||||
qhost := strings.ToLower(entry.QHost)
|
||||
searchVal := strings.ToLower(c.value)
|
||||
if c.strict && (qhost == searchVal || clientID == searchVal) {
|
||||
return true
|
||||
func (c *searchCriteria) ctDomainOrClientCaseStrict(term, clientID, name, host, ip string) bool {
|
||||
return strings.EqualFold(host, term) ||
|
||||
strings.EqualFold(clientID, term) ||
|
||||
strings.EqualFold(ip, term) ||
|
||||
strings.EqualFold(name, term)
|
||||
}
|
||||
|
||||
func (c *searchCriteria) ctDomainOrClientCase(e *logEntry) bool {
|
||||
clientID := e.ClientID
|
||||
host := e.QHost
|
||||
|
||||
var name string
|
||||
if e.client != nil {
|
||||
name = e.client.Name
|
||||
}
|
||||
|
||||
if !c.strict && (strings.Contains(qhost, searchVal) || strings.Contains(clientID, searchVal)) {
|
||||
return true
|
||||
ip := e.IP.String()
|
||||
term := strings.ToLower(c.value)
|
||||
if c.strict {
|
||||
return c.ctDomainOrClientCaseStrict(term, clientID, name, host, ip)
|
||||
}
|
||||
|
||||
ipStr := entry.IP.String()
|
||||
if c.strict && ipStr == c.value {
|
||||
return true
|
||||
}
|
||||
// TODO(a.garipov): Write a case-insensitive version of strings.Contains
|
||||
// instead of generating garbage. Or, perhaps in the future, use
|
||||
// a locale-appropriate matcher from golang.org/x/text.
|
||||
clientID = strings.ToLower(clientID)
|
||||
host = strings.ToLower(host)
|
||||
ip = strings.ToLower(ip)
|
||||
name = strings.ToLower(name)
|
||||
term = strings.ToLower(term)
|
||||
|
||||
if !c.strict && strings.Contains(ipStr, c.value) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
return strings.Contains(clientID, term) ||
|
||||
strings.Contains(host, term) ||
|
||||
strings.Contains(ip, term) ||
|
||||
strings.Contains(name, term)
|
||||
}
|
||||
|
||||
func (c *searchCriteria) ctFilteringStatusCase(res dnsfilter.Result) bool {
|
||||
|
||||
@@ -27,19 +27,6 @@ func newSearchParams() *searchParams {
|
||||
}
|
||||
}
|
||||
|
||||
// quickMatchesGetDataParams - quickly checks if the line matches the searchParams
|
||||
// this method does not guarantee anything and the reason is to do a quick check
|
||||
// without deserializing anything
|
||||
func (s *searchParams) quickMatch(line string) bool {
|
||||
for _, c := range s.searchCriteria {
|
||||
if !c.quickMatch(line) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// match - checks if the logEntry matches the searchParams
|
||||
func (s *searchParams) match(entry *logEntry) bool {
|
||||
if !s.olderThan.IsZero() && entry.Time.UnixNano() >= s.olderThan.UnixNano() {
|
||||
|
||||
Reference in New Issue
Block a user