all: sync with master; upd chlog
This commit is contained in:
@@ -7,6 +7,7 @@ type Client struct {
|
||||
Name string `json:"name"`
|
||||
DisallowedRule string `json:"disallowed_rule"`
|
||||
Disallowed bool `json:"disallowed"`
|
||||
IgnoreQueryLog bool `json:"-"`
|
||||
}
|
||||
|
||||
// ClientWHOIS is the filtered WHOIS data for the client.
|
||||
|
||||
@@ -166,86 +166,6 @@ var logEntryHandlers = map[string]logEntryHandler{
|
||||
},
|
||||
}
|
||||
|
||||
var resultHandlers = map[string]logEntryHandler{
|
||||
"IsFiltered": func(t json.Token, ent *logEntry) error {
|
||||
v, ok := t.(bool)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
ent.Result.IsFiltered = v
|
||||
return nil
|
||||
},
|
||||
"Rule": func(t json.Token, ent *logEntry) error {
|
||||
s, ok := t.(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
l := len(ent.Result.Rules)
|
||||
if l == 0 {
|
||||
ent.Result.Rules = []*filtering.ResultRule{{}}
|
||||
l++
|
||||
}
|
||||
|
||||
ent.Result.Rules[l-1].Text = s
|
||||
|
||||
return nil
|
||||
},
|
||||
"FilterID": func(t json.Token, ent *logEntry) error {
|
||||
n, ok := t.(json.Number)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
i, err := n.Int64()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l := len(ent.Result.Rules)
|
||||
if l == 0 {
|
||||
ent.Result.Rules = []*filtering.ResultRule{{}}
|
||||
l++
|
||||
}
|
||||
|
||||
ent.Result.Rules[l-1].FilterListID = i
|
||||
|
||||
return nil
|
||||
},
|
||||
"Reason": func(t json.Token, ent *logEntry) error {
|
||||
v, ok := t.(json.Number)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
i, err := v.Int64()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ent.Result.Reason = filtering.Reason(i)
|
||||
return nil
|
||||
},
|
||||
"ServiceName": func(t json.Token, ent *logEntry) error {
|
||||
s, ok := t.(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ent.Result.ServiceName = s
|
||||
|
||||
return nil
|
||||
},
|
||||
"CanonName": func(t json.Token, ent *logEntry) error {
|
||||
s, ok := t.(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ent.Result.CanonName = s
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func decodeResultRuleKey(key string, i int, dec *json.Decoder, ent *logEntry) {
|
||||
var vToken json.Token
|
||||
switch key {
|
||||
@@ -582,25 +502,11 @@ func decodeResult(dec *json.Decoder, ent *logEntry) {
|
||||
return
|
||||
}
|
||||
|
||||
switch key {
|
||||
case "ReverseHosts":
|
||||
decodeResultReverseHosts(dec, ent)
|
||||
decHandler, ok := resultDecHandlers[key]
|
||||
if ok {
|
||||
decHandler(dec, ent)
|
||||
|
||||
continue
|
||||
case "IPList":
|
||||
decodeResultIPList(dec, ent)
|
||||
|
||||
continue
|
||||
case "Rules":
|
||||
decodeResultRules(dec, ent)
|
||||
|
||||
continue
|
||||
case "DNSRewriteResult":
|
||||
decodeResultDNSRewriteResult(dec, ent)
|
||||
|
||||
continue
|
||||
default:
|
||||
// Go on.
|
||||
}
|
||||
|
||||
handler, ok := resultHandlers[key]
|
||||
@@ -621,6 +527,93 @@ func decodeResult(dec *json.Decoder, ent *logEntry) {
|
||||
}
|
||||
}
|
||||
|
||||
var resultHandlers = map[string]logEntryHandler{
|
||||
"IsFiltered": func(t json.Token, ent *logEntry) error {
|
||||
v, ok := t.(bool)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
ent.Result.IsFiltered = v
|
||||
return nil
|
||||
},
|
||||
"Rule": func(t json.Token, ent *logEntry) error {
|
||||
s, ok := t.(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
l := len(ent.Result.Rules)
|
||||
if l == 0 {
|
||||
ent.Result.Rules = []*filtering.ResultRule{{}}
|
||||
l++
|
||||
}
|
||||
|
||||
ent.Result.Rules[l-1].Text = s
|
||||
|
||||
return nil
|
||||
},
|
||||
"FilterID": func(t json.Token, ent *logEntry) error {
|
||||
n, ok := t.(json.Number)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
i, err := n.Int64()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l := len(ent.Result.Rules)
|
||||
if l == 0 {
|
||||
ent.Result.Rules = []*filtering.ResultRule{{}}
|
||||
l++
|
||||
}
|
||||
|
||||
ent.Result.Rules[l-1].FilterListID = i
|
||||
|
||||
return nil
|
||||
},
|
||||
"Reason": func(t json.Token, ent *logEntry) error {
|
||||
v, ok := t.(json.Number)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
i, err := v.Int64()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ent.Result.Reason = filtering.Reason(i)
|
||||
return nil
|
||||
},
|
||||
"ServiceName": func(t json.Token, ent *logEntry) error {
|
||||
s, ok := t.(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ent.Result.ServiceName = s
|
||||
|
||||
return nil
|
||||
},
|
||||
"CanonName": func(t json.Token, ent *logEntry) error {
|
||||
s, ok := t.(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ent.Result.CanonName = s
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var resultDecHandlers = map[string]func(dec *json.Decoder, ent *logEntry){
|
||||
"ReverseHosts": decodeResultReverseHosts,
|
||||
"IPList": decodeResultIPList,
|
||||
"Rules": decodeResultRules,
|
||||
"DNSRewriteResult": decodeResultDNSRewriteResult,
|
||||
}
|
||||
|
||||
func decodeLogEntry(ent *logEntry, str string) {
|
||||
dec := json.NewDecoder(strings.NewReader(str))
|
||||
dec.UseNumber()
|
||||
|
||||
70
internal/querylog/entry.go
Normal file
70
internal/querylog/entry.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package querylog
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/filtering"
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
"github.com/miekg/dns"
|
||||
)
|
||||
|
||||
// logEntry represents a single entry in the file.
|
||||
type logEntry struct {
|
||||
// client is the found client information, if any.
|
||||
client *Client
|
||||
|
||||
Time time.Time `json:"T"`
|
||||
|
||||
QHost string `json:"QH"`
|
||||
QType string `json:"QT"`
|
||||
QClass string `json:"QC"`
|
||||
|
||||
ReqECS string `json:"ECS,omitempty"`
|
||||
|
||||
ClientID string `json:"CID,omitempty"`
|
||||
ClientProto ClientProto `json:"CP"`
|
||||
|
||||
Upstream string `json:",omitempty"`
|
||||
|
||||
Answer []byte `json:",omitempty"`
|
||||
OrigAnswer []byte `json:",omitempty"`
|
||||
|
||||
IP net.IP `json:"IP"`
|
||||
|
||||
Result filtering.Result
|
||||
|
||||
Elapsed time.Duration
|
||||
|
||||
Cached bool `json:",omitempty"`
|
||||
AuthenticatedData bool `json:"AD,omitempty"`
|
||||
}
|
||||
|
||||
// shallowClone returns a shallow clone of e.
|
||||
func (e *logEntry) shallowClone() (clone *logEntry) {
|
||||
cloneVal := *e
|
||||
|
||||
return &cloneVal
|
||||
}
|
||||
|
||||
// addResponse adds data from resp to e.Answer if resp is not nil. If isOrig is
|
||||
// true, addResponse sets the e.OrigAnswer field instead of e.Answer. Any
|
||||
// errors are logged.
|
||||
func (e *logEntry) addResponse(resp *dns.Msg, isOrig bool) {
|
||||
if resp == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
if isOrig {
|
||||
e.Answer, err = resp.Pack()
|
||||
err = errors.Annotate(err, "packing answer: %w")
|
||||
} else {
|
||||
e.OrigAnswer, err = resp.Pack()
|
||||
err = errors.Annotate(err, "packing orig answer: %w")
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("querylog: %s", err)
|
||||
}
|
||||
}
|
||||
@@ -13,9 +13,11 @@ import (
|
||||
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghalg"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghnet"
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
"github.com/AdguardTeam/golibs/stringutil"
|
||||
"github.com/AdguardTeam/golibs/timeutil"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/net/idna"
|
||||
)
|
||||
|
||||
@@ -25,8 +27,8 @@ type configJSON struct {
|
||||
// fractional numbers and not mess the API users by changing the units.
|
||||
Interval float64 `json:"interval"`
|
||||
|
||||
// Enabled shows if the querylog is enabled. It is an [aghalg.NullBool]
|
||||
// to be able to tell when it's set without using pointers.
|
||||
// Enabled shows if the querylog is enabled. It is an aghalg.NullBool to
|
||||
// be able to tell when it's set without using pointers.
|
||||
Enabled aghalg.NullBool `json:"enabled"`
|
||||
|
||||
// AnonymizeClientIP shows if the clients' IP addresses must be anonymized.
|
||||
@@ -35,44 +37,115 @@ type configJSON struct {
|
||||
AnonymizeClientIP aghalg.NullBool `json:"anonymize_client_ip"`
|
||||
}
|
||||
|
||||
// getConfigResp is the JSON structure for the querylog configuration.
|
||||
type getConfigResp struct {
|
||||
// Ignored is the list of host names, which should not be written to log.
|
||||
Ignored []string `json:"ignored"`
|
||||
|
||||
// Interval is the querylog rotation interval in milliseconds.
|
||||
Interval float64 `json:"interval"`
|
||||
|
||||
// Enabled shows if the querylog is enabled. It is an aghalg.NullBool to
|
||||
// be able to tell when it's set without using pointers.
|
||||
Enabled aghalg.NullBool `json:"enabled"`
|
||||
|
||||
// AnonymizeClientIP shows if the clients' IP addresses must be anonymized.
|
||||
// It is an aghalg.NullBool to be able to tell when it's set without using
|
||||
// pointers.
|
||||
//
|
||||
// TODO(a.garipov): Consider using separate setting for statistics.
|
||||
AnonymizeClientIP aghalg.NullBool `json:"anonymize_client_ip"`
|
||||
}
|
||||
|
||||
// Register web handlers
|
||||
func (l *queryLog) initWeb() {
|
||||
l.conf.HTTPRegister(http.MethodGet, "/control/querylog", l.handleQueryLog)
|
||||
l.conf.HTTPRegister(http.MethodGet, "/control/querylog_info", l.handleQueryLogInfo)
|
||||
l.conf.HTTPRegister(http.MethodPost, "/control/querylog_clear", l.handleQueryLogClear)
|
||||
l.conf.HTTPRegister(http.MethodGet, "/control/querylog/config", l.handleGetQueryLogConfig)
|
||||
l.conf.HTTPRegister(
|
||||
http.MethodPut,
|
||||
"/control/querylog/config/update",
|
||||
l.handlePutQueryLogConfig,
|
||||
)
|
||||
|
||||
// Deprecated handlers.
|
||||
l.conf.HTTPRegister(http.MethodGet, "/control/querylog_info", l.handleQueryLogInfo)
|
||||
l.conf.HTTPRegister(http.MethodPost, "/control/querylog_config", l.handleQueryLogConfig)
|
||||
}
|
||||
|
||||
// handleQueryLog is the handler for the GET /control/querylog HTTP API.
|
||||
func (l *queryLog) handleQueryLog(w http.ResponseWriter, r *http.Request) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
params, err := l.parseSearchParams(r)
|
||||
params, err := parseSearchParams(r)
|
||||
if err != nil {
|
||||
aghhttp.Error(r, w, http.StatusBadRequest, "failed to parse params: %s", err)
|
||||
aghhttp.Error(r, w, http.StatusBadRequest, "parsing params: %s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
entries, oldest := l.search(params)
|
||||
data := l.entriesToJSON(entries, oldest)
|
||||
var entries []*logEntry
|
||||
var oldest time.Time
|
||||
func() {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
_ = aghhttp.WriteJSONResponse(w, r, data)
|
||||
entries, oldest = l.search(params)
|
||||
}()
|
||||
|
||||
resp := entriesToJSON(entries, oldest, l.anonymizer.Load())
|
||||
|
||||
_ = aghhttp.WriteJSONResponse(w, r, resp)
|
||||
}
|
||||
|
||||
// handleQueryLogClear is the handler for the POST /control/querylog/clear HTTP
|
||||
// API.
|
||||
func (l *queryLog) handleQueryLogClear(_ http.ResponseWriter, _ *http.Request) {
|
||||
l.clear()
|
||||
}
|
||||
|
||||
// Get configuration
|
||||
// handleQueryLogInfo is the handler for the GET /control/querylog_info HTTP
|
||||
// API.
|
||||
//
|
||||
// Deprecated: Remove it when migration to the new API is over.
|
||||
func (l *queryLog) handleQueryLogInfo(w http.ResponseWriter, r *http.Request) {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
ivl := l.conf.RotationIvl
|
||||
|
||||
if !checkInterval(ivl) {
|
||||
// NOTE: If interval is custom we set it to 90 days for compatibility
|
||||
// with old API.
|
||||
ivl = timeutil.Day * 90
|
||||
}
|
||||
|
||||
_ = aghhttp.WriteJSONResponse(w, r, configJSON{
|
||||
Enabled: aghalg.BoolToNullBool(l.conf.Enabled),
|
||||
Interval: l.conf.RotationIvl.Hours() / 24,
|
||||
Interval: ivl.Hours() / 24,
|
||||
AnonymizeClientIP: aghalg.BoolToNullBool(l.conf.AnonymizeClientIP),
|
||||
})
|
||||
}
|
||||
|
||||
// handleGetQueryLogConfig is the handler for the GET /control/querylog/config
|
||||
// HTTP API.
|
||||
func (l *queryLog) handleGetQueryLogConfig(w http.ResponseWriter, r *http.Request) {
|
||||
var resp *getConfigResp
|
||||
func() {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
resp = &getConfigResp{
|
||||
Interval: float64(l.conf.RotationIvl.Milliseconds()),
|
||||
Enabled: aghalg.BoolToNullBool(l.conf.Enabled),
|
||||
AnonymizeClientIP: aghalg.BoolToNullBool(l.conf.AnonymizeClientIP),
|
||||
Ignored: l.conf.Ignored.Values(),
|
||||
}
|
||||
}()
|
||||
|
||||
slices.Sort(resp.Ignored)
|
||||
|
||||
_ = aghhttp.WriteJSONResponse(w, r, resp)
|
||||
}
|
||||
|
||||
// AnonymizeIP masks ip to anonymize the client if the ip is a valid one.
|
||||
func AnonymizeIP(ip net.IP) {
|
||||
// zeroes is a slice of zero bytes from which the IP address tail is copied.
|
||||
@@ -87,7 +160,10 @@ func AnonymizeIP(ip net.IP) {
|
||||
}
|
||||
}
|
||||
|
||||
// handleQueryLogConfig handles the POST /control/querylog_config queries.
|
||||
// handleQueryLogConfig is the handler for the POST /control/querylog_config
|
||||
// HTTP API.
|
||||
//
|
||||
// Deprecated: Remove it when migration to the new API is over.
|
||||
func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request) {
|
||||
// Set NaN as initial value to be able to know if it changed later by
|
||||
// comparing it to NaN.
|
||||
@@ -103,6 +179,7 @@ func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
ivl := time.Duration(float64(timeutil.Day) * newConf.Interval)
|
||||
|
||||
hasIvl := !math.IsNaN(newConf.Interval)
|
||||
if hasIvl && !checkInterval(ivl) {
|
||||
aghhttp.Error(r, w, http.StatusBadRequest, "unsupported interval")
|
||||
@@ -112,11 +189,9 @@ func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
defer l.conf.ConfigModified()
|
||||
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
l.confMu.Lock()
|
||||
defer l.confMu.Unlock()
|
||||
|
||||
// Copy data, modify it, then activate. Other threads (readers) don't need
|
||||
// to use this lock.
|
||||
conf := *l.conf
|
||||
if newConf.Enabled != aghalg.NBNull {
|
||||
conf.Enabled = newConf.Enabled == aghalg.NBTrue
|
||||
@@ -138,6 +213,65 @@ func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request)
|
||||
l.conf = &conf
|
||||
}
|
||||
|
||||
// handlePutQueryLogConfig is the handler for the PUT
|
||||
// /control/querylog/config/update HTTP API.
|
||||
func (l *queryLog) handlePutQueryLogConfig(w http.ResponseWriter, r *http.Request) {
|
||||
newConf := &getConfigResp{}
|
||||
err := json.NewDecoder(r.Body).Decode(newConf)
|
||||
if err != nil {
|
||||
aghhttp.Error(r, w, http.StatusBadRequest, "%s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
set, err := aghnet.NewDomainNameSet(newConf.Ignored)
|
||||
if err != nil {
|
||||
aghhttp.Error(r, w, http.StatusUnprocessableEntity, "ignored: %s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ivl := time.Duration(newConf.Interval) * time.Millisecond
|
||||
err = validateIvl(ivl)
|
||||
if err != nil {
|
||||
aghhttp.Error(r, w, http.StatusUnprocessableEntity, "unsupported interval: %s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if newConf.Enabled == aghalg.NBNull {
|
||||
aghhttp.Error(r, w, http.StatusUnprocessableEntity, "enabled is null")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if newConf.AnonymizeClientIP == aghalg.NBNull {
|
||||
aghhttp.Error(r, w, http.StatusUnprocessableEntity, "anonymize_client_ip is null")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer l.conf.ConfigModified()
|
||||
|
||||
l.confMu.Lock()
|
||||
defer l.confMu.Unlock()
|
||||
|
||||
conf := *l.conf
|
||||
|
||||
conf.Ignored = set
|
||||
conf.RotationIvl = ivl
|
||||
conf.Enabled = newConf.Enabled == aghalg.NBTrue
|
||||
|
||||
conf.AnonymizeClientIP = newConf.AnonymizeClientIP == aghalg.NBTrue
|
||||
if conf.AnonymizeClientIP {
|
||||
l.anonymizer.Store(AnonymizeIP)
|
||||
} else {
|
||||
l.anonymizer.Store(nil)
|
||||
}
|
||||
|
||||
l.conf = &conf
|
||||
}
|
||||
|
||||
// "value" -> value, return TRUE
|
||||
func getDoubleQuotesEnclosedValue(s *string) bool {
|
||||
t := *s
|
||||
@@ -149,7 +283,7 @@ func getDoubleQuotesEnclosedValue(s *string) bool {
|
||||
}
|
||||
|
||||
// parseSearchCriterion parses a search criterion from the query parameter.
|
||||
func (l *queryLog) parseSearchCriterion(q url.Values, name string, ct criterionType) (
|
||||
func parseSearchCriterion(q url.Values, name string, ct criterionType) (
|
||||
ok bool,
|
||||
sc searchCriterion,
|
||||
err error,
|
||||
@@ -198,8 +332,9 @@ func (l *queryLog) parseSearchCriterion(q url.Values, name string, ct criterionT
|
||||
return true, sc, nil
|
||||
}
|
||||
|
||||
// parseSearchParams - parses "searchParams" from the HTTP request's query string
|
||||
func (l *queryLog) parseSearchParams(r *http.Request) (p *searchParams, err error) {
|
||||
// parseSearchParams parses search parameters from the HTTP request's query
|
||||
// string.
|
||||
func parseSearchParams(r *http.Request) (p *searchParams, err error) {
|
||||
p = newSearchParams()
|
||||
|
||||
q := r.URL.Query()
|
||||
@@ -237,7 +372,7 @@ func (l *queryLog) parseSearchParams(r *http.Request) (p *searchParams, err erro
|
||||
}} {
|
||||
var ok bool
|
||||
var c searchCriterion
|
||||
ok, c, err = l.parseSearchCriterion(q, v.urlField, v.ct)
|
||||
ok, c, err = parseSearchCriterion(q, v.urlField, v.ct)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -19,12 +19,16 @@ import (
|
||||
type jobject = map[string]any
|
||||
|
||||
// entriesToJSON converts query log entries to JSON.
|
||||
func (l *queryLog) entriesToJSON(entries []*logEntry, oldest time.Time) (res jobject) {
|
||||
func entriesToJSON(
|
||||
entries []*logEntry,
|
||||
oldest time.Time,
|
||||
anonFunc aghnet.IPMutFunc,
|
||||
) (res jobject) {
|
||||
data := make([]jobject, 0, len(entries))
|
||||
|
||||
// The elements order is already reversed to be from newer to older.
|
||||
for _, entry := range entries {
|
||||
jsonEntry := l.entryToJSON(entry, l.anonymizer.Load())
|
||||
jsonEntry := entryToJSON(entry, anonFunc)
|
||||
data = append(data, jsonEntry)
|
||||
}
|
||||
|
||||
@@ -40,7 +44,7 @@ func (l *queryLog) entriesToJSON(entries []*logEntry, oldest time.Time) (res job
|
||||
}
|
||||
|
||||
// entryToJSON converts a log entry's data into an entry for the JSON API.
|
||||
func (l *queryLog) entryToJSON(entry *logEntry, anonFunc aghnet.IPMutFunc) (jsonEntry jobject) {
|
||||
func entryToJSON(entry *logEntry, anonFunc aghnet.IPMutFunc) (jsonEntry jobject) {
|
||||
hostname := entry.QHost
|
||||
question := jobject{
|
||||
"type": entry.QType,
|
||||
@@ -92,14 +96,14 @@ func (l *queryLog) entryToJSON(entry *logEntry, anonFunc aghnet.IPMutFunc) (json
|
||||
jsonEntry["service_name"] = entry.Result.ServiceName
|
||||
}
|
||||
|
||||
l.setMsgData(entry, jsonEntry)
|
||||
l.setOrigAns(entry, jsonEntry)
|
||||
setMsgData(entry, jsonEntry)
|
||||
setOrigAns(entry, jsonEntry)
|
||||
|
||||
return jsonEntry
|
||||
}
|
||||
|
||||
// setMsgData sets the message data in jsonEntry.
|
||||
func (l *queryLog) setMsgData(entry *logEntry, jsonEntry jobject) {
|
||||
func setMsgData(entry *logEntry, jsonEntry jobject) {
|
||||
if len(entry.Answer) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -122,7 +126,7 @@ func (l *queryLog) setMsgData(entry *logEntry, jsonEntry jobject) {
|
||||
}
|
||||
|
||||
// setOrigAns sets the original answer data in jsonEntry.
|
||||
func (l *queryLog) setOrigAns(entry *logEntry, jsonEntry jobject) {
|
||||
func setOrigAns(entry *logEntry, jsonEntry jobject) {
|
||||
if len(entry.OrigAnswer) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package querylog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -25,9 +24,12 @@ const (
|
||||
type queryLog struct {
|
||||
findClient func(ids []string) (c *Client, err error)
|
||||
|
||||
conf *Config
|
||||
lock sync.Mutex
|
||||
logFile string // path to the log file
|
||||
// confMu protects conf.
|
||||
confMu *sync.RWMutex
|
||||
conf *Config
|
||||
|
||||
// logFile is the path to the log file.
|
||||
logFile string
|
||||
|
||||
// bufferLock protects buffer.
|
||||
bufferLock sync.RWMutex
|
||||
@@ -71,52 +73,24 @@ func NewClientProto(s string) (cp ClientProto, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// logEntry - represents a single log entry
|
||||
type logEntry struct {
|
||||
// client is the found client information, if any.
|
||||
client *Client
|
||||
|
||||
Time time.Time `json:"T"`
|
||||
|
||||
QHost string `json:"QH"`
|
||||
QType string `json:"QT"`
|
||||
QClass string `json:"QC"`
|
||||
|
||||
ReqECS string `json:"ECS,omitempty"`
|
||||
|
||||
ClientID string `json:"CID,omitempty"`
|
||||
ClientProto ClientProto `json:"CP"`
|
||||
|
||||
Answer []byte `json:",omitempty"` // sometimes empty answers happen like binerdunt.top or rev2.globalrootservers.net
|
||||
OrigAnswer []byte `json:",omitempty"`
|
||||
|
||||
Result filtering.Result
|
||||
Upstream string `json:",omitempty"`
|
||||
|
||||
IP net.IP `json:"IP"`
|
||||
|
||||
Elapsed time.Duration
|
||||
|
||||
Cached bool `json:",omitempty"`
|
||||
AuthenticatedData bool `json:"AD,omitempty"`
|
||||
}
|
||||
|
||||
// shallowClone returns a shallow clone of e.
|
||||
func (e *logEntry) shallowClone() (clone *logEntry) {
|
||||
cloneVal := *e
|
||||
|
||||
return &cloneVal
|
||||
}
|
||||
|
||||
func (l *queryLog) Start() {
|
||||
if l.conf.HTTPRegister != nil {
|
||||
l.initWeb()
|
||||
}
|
||||
|
||||
go l.periodicRotate()
|
||||
}
|
||||
|
||||
func (l *queryLog) Close() {
|
||||
_ = l.flushLogBuffer(true)
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
if l.conf.FileEnabled {
|
||||
err := l.flushLogBuffer()
|
||||
if err != nil {
|
||||
log.Error("querylog: closing: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkInterval(ivl time.Duration) (ok bool) {
|
||||
@@ -132,8 +106,26 @@ func checkInterval(ivl time.Duration) (ok bool) {
|
||||
return ivl == quarterDay || ivl == day || ivl == week || ivl == month || ivl == threeMonths
|
||||
}
|
||||
|
||||
// validateIvl returns an error if ivl is less than an hour or more than a
|
||||
// year.
|
||||
func validateIvl(ivl time.Duration) (err error) {
|
||||
if ivl < time.Hour {
|
||||
return errors.Error("less than an hour")
|
||||
}
|
||||
|
||||
if ivl > timeutil.Day*365 {
|
||||
return errors.Error("more than a year")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *queryLog) WriteDiskConfig(c *Config) {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
*c = *l.conf
|
||||
c.Ignored = l.conf.Ignored.Clone()
|
||||
}
|
||||
|
||||
// Clear memory buffer and remove log files
|
||||
@@ -141,10 +133,13 @@ func (l *queryLog) clear() {
|
||||
l.fileFlushLock.Lock()
|
||||
defer l.fileFlushLock.Unlock()
|
||||
|
||||
l.bufferLock.Lock()
|
||||
l.buffer = nil
|
||||
l.flushPending = false
|
||||
l.bufferLock.Unlock()
|
||||
func() {
|
||||
l.bufferLock.Lock()
|
||||
defer l.bufferLock.Unlock()
|
||||
|
||||
l.buffer = nil
|
||||
l.flushPending = false
|
||||
}()
|
||||
|
||||
oldLogFile := l.logFile + ".1"
|
||||
err := os.Remove(oldLogFile)
|
||||
@@ -161,7 +156,17 @@ func (l *queryLog) clear() {
|
||||
}
|
||||
|
||||
func (l *queryLog) Add(params *AddParams) {
|
||||
if !l.conf.Enabled {
|
||||
var isEnabled, fileIsEnabled bool
|
||||
var memSize uint32
|
||||
func() {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
isEnabled, fileIsEnabled = l.conf.Enabled, l.conf.FileEnabled
|
||||
memSize = l.conf.MemSize
|
||||
}()
|
||||
|
||||
if !isEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -178,7 +183,7 @@ func (l *queryLog) Add(params *AddParams) {
|
||||
|
||||
now := time.Now()
|
||||
q := params.Question.Question[0]
|
||||
entry := logEntry{
|
||||
entry := &logEntry{
|
||||
Time: now,
|
||||
|
||||
QHost: strings.ToLower(q.Name[:len(q.Name)-1]),
|
||||
@@ -203,65 +208,63 @@ func (l *queryLog) Add(params *AddParams) {
|
||||
entry.ReqECS = params.ReqECS.String()
|
||||
}
|
||||
|
||||
if params.Answer != nil {
|
||||
var a []byte
|
||||
a, err = params.Answer.Pack()
|
||||
if err != nil {
|
||||
log.Error("querylog: Answer.Pack(): %s", err)
|
||||
entry.addResponse(params.Answer, false)
|
||||
entry.addResponse(params.OrigAnswer, true)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
entry.Answer = a
|
||||
}
|
||||
|
||||
if params.OrigAnswer != nil {
|
||||
var a []byte
|
||||
a, err = params.OrigAnswer.Pack()
|
||||
if err != nil {
|
||||
log.Error("querylog: OrigAnswer.Pack(): %s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
entry.OrigAnswer = a
|
||||
}
|
||||
|
||||
l.bufferLock.Lock()
|
||||
l.buffer = append(l.buffer, &entry)
|
||||
needFlush := false
|
||||
func() {
|
||||
l.bufferLock.Lock()
|
||||
defer l.bufferLock.Unlock()
|
||||
|
||||
if !l.conf.FileEnabled {
|
||||
if len(l.buffer) > int(l.conf.MemSize) {
|
||||
// writing to file is disabled - just remove the oldest entry from array
|
||||
//
|
||||
// TODO(a.garipov): This should be replaced by a proper ring buffer,
|
||||
// but it's currently difficult to do that.
|
||||
l.buffer[0] = nil
|
||||
l.buffer = l.buffer[1:]
|
||||
}
|
||||
} else if !l.flushPending {
|
||||
needFlush = len(l.buffer) >= int(l.conf.MemSize)
|
||||
if needFlush {
|
||||
l.flushPending = true
|
||||
}
|
||||
}
|
||||
l.bufferLock.Unlock()
|
||||
l.buffer = append(l.buffer, entry)
|
||||
|
||||
if !fileIsEnabled {
|
||||
if len(l.buffer) > int(memSize) {
|
||||
// Writing to file is disabled, so just remove the oldest entry
|
||||
// from the slices.
|
||||
//
|
||||
// TODO(a.garipov): This should be replaced by a proper ring
|
||||
// buffer, but it's currently difficult to do that.
|
||||
l.buffer[0] = nil
|
||||
l.buffer = l.buffer[1:]
|
||||
}
|
||||
} else if !l.flushPending {
|
||||
needFlush = len(l.buffer) >= int(memSize)
|
||||
if needFlush {
|
||||
l.flushPending = true
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// if buffer needs to be flushed to disk, do it now
|
||||
if needFlush {
|
||||
go func() {
|
||||
_ = l.flushLogBuffer(false)
|
||||
flushErr := l.flushLogBuffer()
|
||||
if flushErr != nil {
|
||||
log.Error("querylog: flushing after adding: %s", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// ShouldLog returns true if request for the host should be logged.
|
||||
func (l *queryLog) ShouldLog(host string, _, _ uint16) bool {
|
||||
func (l *queryLog) ShouldLog(host string, _, _ uint16, ids []string) bool {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
c, err := l.findClient(ids)
|
||||
if err != nil {
|
||||
log.Error("querylog: finding client: %s", err)
|
||||
}
|
||||
|
||||
if c != nil && c.IgnoreQueryLog {
|
||||
return false
|
||||
}
|
||||
|
||||
return !l.isIgnored(host)
|
||||
}
|
||||
|
||||
// isIgnored returns true if the host is in the Ignored list.
|
||||
// isIgnored returns true if the host is in the ignored domains list. It
|
||||
// assumes that l.confMu is locked for reading.
|
||||
func (l *queryLog) isIgnored(host string) bool {
|
||||
return l.conf.Ignored.Has(host)
|
||||
}
|
||||
|
||||
@@ -22,24 +22,25 @@ func TestMain(m *testing.M) {
|
||||
// TestQueryLog tests adding and loading (with filtering) entries from disk and
|
||||
// memory.
|
||||
func TestQueryLog(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
l, err := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
FileEnabled: true,
|
||||
RotationIvl: timeutil.Day,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add disk entries.
|
||||
addEntry(l, "example.org", net.IPv4(1, 1, 1, 1), net.IPv4(2, 2, 2, 1))
|
||||
// Write to disk (first file).
|
||||
require.NoError(t, l.flushLogBuffer(true))
|
||||
require.NoError(t, l.flushLogBuffer())
|
||||
// Start writing to the second file.
|
||||
require.NoError(t, l.rotate())
|
||||
// Add disk entries.
|
||||
addEntry(l, "example.org", net.IPv4(1, 1, 1, 2), net.IPv4(2, 2, 2, 2))
|
||||
// Write to disk.
|
||||
require.NoError(t, l.flushLogBuffer(true))
|
||||
require.NoError(t, l.flushLogBuffer())
|
||||
// Add memory entries.
|
||||
addEntry(l, "test.example.org", net.IPv4(1, 1, 1, 3), net.IPv4(2, 2, 2, 3))
|
||||
addEntry(l, "example.com", net.IPv4(1, 1, 1, 4), net.IPv4(2, 2, 2, 4))
|
||||
@@ -125,12 +126,13 @@ func TestQueryLog(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestQueryLogOffsetLimit(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
l, err := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
RotationIvl: timeutil.Day,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
const (
|
||||
entNum = 10
|
||||
@@ -142,7 +144,7 @@ func TestQueryLogOffsetLimit(t *testing.T) {
|
||||
addEntry(l, secondPageDomain, net.IPv4(1, 1, 1, 1), net.IPv4(2, 2, 2, 1))
|
||||
}
|
||||
// Write them to the first file.
|
||||
require.NoError(t, l.flushLogBuffer(true))
|
||||
require.NoError(t, l.flushLogBuffer())
|
||||
// Add more to the in-memory part of log.
|
||||
for i := 0; i < entNum; i++ {
|
||||
addEntry(l, firstPageDomain, net.IPv4(1, 1, 1, 1), net.IPv4(2, 2, 2, 1))
|
||||
@@ -199,13 +201,14 @@ func TestQueryLogOffsetLimit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestQueryLogMaxFileScanEntries(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
l, err := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
FileEnabled: true,
|
||||
RotationIvl: timeutil.Day,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
const entNum = 10
|
||||
// Add entries to the log.
|
||||
@@ -213,7 +216,7 @@ func TestQueryLogMaxFileScanEntries(t *testing.T) {
|
||||
addEntry(l, "example.org", net.IPv4(1, 1, 1, 1), net.IPv4(2, 2, 2, 1))
|
||||
}
|
||||
// Write them to disk.
|
||||
require.NoError(t, l.flushLogBuffer(true))
|
||||
require.NoError(t, l.flushLogBuffer())
|
||||
|
||||
params := newSearchParams()
|
||||
|
||||
@@ -227,13 +230,14 @@ func TestQueryLogMaxFileScanEntries(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestQueryLogFileDisabled(t *testing.T) {
|
||||
l := newQueryLog(Config{
|
||||
l, err := newQueryLog(Config{
|
||||
Enabled: true,
|
||||
FileEnabled: false,
|
||||
RotationIvl: timeutil.Day,
|
||||
MemSize: 2,
|
||||
BaseDir: t.TempDir(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
addEntry(l, "example1.org", net.IPv4(1, 1, 1, 1), net.IPv4(2, 2, 2, 1))
|
||||
addEntry(l, "example2.org", net.IPv4(1, 1, 1, 1), net.IPv4(2, 2, 2, 1))
|
||||
@@ -254,35 +258,52 @@ func TestQueryLogShouldLog(t *testing.T) {
|
||||
)
|
||||
set := stringutil.NewSet(ignored1, ignored2)
|
||||
|
||||
l := newQueryLog(Config{
|
||||
findClient := func(ids []string) (c *Client, err error) {
|
||||
log := ids[0] == "no_log"
|
||||
|
||||
return &Client{IgnoreQueryLog: log}, nil
|
||||
}
|
||||
|
||||
l, err := newQueryLog(Config{
|
||||
Ignored: set,
|
||||
Enabled: true,
|
||||
RotationIvl: timeutil.Day,
|
||||
MemSize: 100,
|
||||
BaseDir: t.TempDir(),
|
||||
Ignored: set,
|
||||
FindClient: findClient,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
host string
|
||||
ids []string
|
||||
wantLog bool
|
||||
}{{
|
||||
name: "log",
|
||||
host: "example.com",
|
||||
ids: []string{"whatever"},
|
||||
wantLog: true,
|
||||
}, {
|
||||
name: "no_log_ignored_1",
|
||||
host: ignored1,
|
||||
ids: []string{"whatever"},
|
||||
wantLog: false,
|
||||
}, {
|
||||
name: "no_log_ignored_2",
|
||||
host: ignored2,
|
||||
ids: []string{"whatever"},
|
||||
wantLog: false,
|
||||
}, {
|
||||
name: "no_log_client_ignore",
|
||||
host: "example.com",
|
||||
ids: []string{"no_log"},
|
||||
wantLog: false,
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
res := l.ShouldLog(tc.host, dns.TypeA, dns.ClassINET)
|
||||
res := l.ShouldLog(tc.host, dns.TypeA, dns.ClassINET, tc.ids)
|
||||
|
||||
assert.Equal(t, tc.wantLog, res)
|
||||
})
|
||||
|
||||
@@ -106,6 +106,7 @@ func (r *QLogReader) SeekStart() error {
|
||||
|
||||
r.currentFile = len(r.qFiles) - 1
|
||||
_, err := r.qFiles[r.currentFile].SeekStart()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
package querylog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghnet"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/filtering"
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
"github.com/AdguardTeam/golibs/stringutil"
|
||||
"github.com/AdguardTeam/golibs/timeutil"
|
||||
"github.com/miekg/dns"
|
||||
)
|
||||
|
||||
@@ -29,16 +29,21 @@ type QueryLog interface {
|
||||
WriteDiskConfig(c *Config)
|
||||
|
||||
// ShouldLog returns true if request for the host should be logged.
|
||||
ShouldLog(host string, qType, qClass uint16) bool
|
||||
ShouldLog(host string, qType, qClass uint16, ids []string) bool
|
||||
}
|
||||
|
||||
// Config is the query log configuration structure.
|
||||
//
|
||||
// Do not alter any fields of this structure after using it.
|
||||
type Config struct {
|
||||
// Ignored is the list of host names, which should not be written to log.
|
||||
Ignored *stringutil.Set
|
||||
|
||||
// Anonymizer processes the IP addresses to anonymize those if needed.
|
||||
Anonymizer *aghnet.IPMut
|
||||
|
||||
// ConfigModified is called when the configuration is changed, for
|
||||
// example by HTTP requests.
|
||||
// ConfigModified is called when the configuration is changed, for example
|
||||
// by HTTP requests.
|
||||
ConfigModified func()
|
||||
|
||||
// HTTPRegister registers an HTTP handler.
|
||||
@@ -50,20 +55,13 @@ type Config struct {
|
||||
// BaseDir is the base directory for log files.
|
||||
BaseDir string
|
||||
|
||||
// RotationIvl is the interval for log rotation. After that period, the
|
||||
// old log file will be renamed, NOT deleted, so the actual log
|
||||
// retention time is twice the interval. The value must be one of:
|
||||
//
|
||||
// 6 * time.Hour
|
||||
// 1 * timeutil.Day
|
||||
// 7 * timeutil.Day
|
||||
// 30 * timeutil.Day
|
||||
// 90 * timeutil.Day
|
||||
//
|
||||
// RotationIvl is the interval for log rotation. After that period, the old
|
||||
// log file will be renamed, NOT deleted, so the actual log retention time
|
||||
// is twice the interval.
|
||||
RotationIvl time.Duration
|
||||
|
||||
// MemSize is the number of entries kept in a memory buffer before they
|
||||
// are flushed to disk.
|
||||
// MemSize is the number of entries kept in a memory buffer before they are
|
||||
// flushed to disk.
|
||||
MemSize uint32
|
||||
|
||||
// Enabled tells if the query log is enabled.
|
||||
@@ -75,10 +73,6 @@ type Config struct {
|
||||
// AnonymizeClientIP tells if the query log should anonymize clients' IP
|
||||
// addresses.
|
||||
AnonymizeClientIP bool
|
||||
|
||||
// Ignored is the list of host names, which should not be written to
|
||||
// log.
|
||||
Ignored *stringutil.Set
|
||||
}
|
||||
|
||||
// AddParams is the parameters for adding an entry.
|
||||
@@ -135,12 +129,12 @@ func (p *AddParams) validate() (err error) {
|
||||
}
|
||||
|
||||
// New creates a new instance of the query log.
|
||||
func New(conf Config) (ql QueryLog) {
|
||||
func New(conf Config) (ql QueryLog, err error) {
|
||||
return newQueryLog(conf)
|
||||
}
|
||||
|
||||
// newQueryLog crates a new queryLog.
|
||||
func newQueryLog(conf Config) (l *queryLog) {
|
||||
func newQueryLog(conf Config) (l *queryLog, err error) {
|
||||
findClient := conf.FindClient
|
||||
if findClient == nil {
|
||||
findClient = func(_ []string) (_ *Client, _ error) {
|
||||
@@ -151,20 +145,19 @@ func newQueryLog(conf Config) (l *queryLog) {
|
||||
l = &queryLog{
|
||||
findClient: findClient,
|
||||
|
||||
logFile: filepath.Join(conf.BaseDir, queryLogFileName),
|
||||
conf: &Config{},
|
||||
confMu: &sync.RWMutex{},
|
||||
logFile: filepath.Join(conf.BaseDir, queryLogFileName),
|
||||
|
||||
anonymizer: conf.Anonymizer,
|
||||
}
|
||||
|
||||
l.conf = &Config{}
|
||||
*l.conf = conf
|
||||
|
||||
if !checkInterval(conf.RotationIvl) {
|
||||
log.Info(
|
||||
"querylog: warning: unsupported rotation interval %s, setting to 1 day",
|
||||
conf.RotationIvl,
|
||||
)
|
||||
l.conf.RotationIvl = timeutil.Day
|
||||
err = validateIvl(conf.RotationIvl)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unsupported interval: %w", err)
|
||||
}
|
||||
|
||||
return l
|
||||
return l, nil
|
||||
}
|
||||
|
||||
@@ -11,40 +11,35 @@ import (
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
)
|
||||
|
||||
// flushLogBuffer flushes the current buffer to file and resets the current buffer
|
||||
func (l *queryLog) flushLogBuffer(fullFlush bool) error {
|
||||
if !l.conf.FileEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// flushLogBuffer flushes the current buffer to file and resets the current
|
||||
// buffer.
|
||||
func (l *queryLog) flushLogBuffer() (err error) {
|
||||
l.fileFlushLock.Lock()
|
||||
defer l.fileFlushLock.Unlock()
|
||||
|
||||
// flush remainder to file
|
||||
l.bufferLock.Lock()
|
||||
needFlush := len(l.buffer) >= int(l.conf.MemSize)
|
||||
if !needFlush && !fullFlush {
|
||||
l.bufferLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
flushBuffer := l.buffer
|
||||
l.buffer = nil
|
||||
l.flushPending = false
|
||||
l.bufferLock.Unlock()
|
||||
err := l.flushToFile(flushBuffer)
|
||||
if err != nil {
|
||||
log.Error("Saving querylog to file failed: %s", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
var flushBuffer []*logEntry
|
||||
func() {
|
||||
l.bufferLock.Lock()
|
||||
defer l.bufferLock.Unlock()
|
||||
|
||||
flushBuffer = l.buffer
|
||||
l.buffer = nil
|
||||
l.flushPending = false
|
||||
}()
|
||||
|
||||
err = l.flushToFile(flushBuffer)
|
||||
|
||||
return errors.Annotate(err, "writing to file: %w")
|
||||
}
|
||||
|
||||
// flushToFile saves the specified log entries to the query log file
|
||||
func (l *queryLog) flushToFile(buffer []*logEntry) (err error) {
|
||||
if len(buffer) == 0 {
|
||||
log.Debug("querylog: there's nothing to write to a file")
|
||||
log.Debug("querylog: nothing to write to a file")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
var b bytes.Buffer
|
||||
@@ -155,8 +150,13 @@ func (l *queryLog) periodicRotate() {
|
||||
// checkAndRotate rotates log files if those are older than the specified
|
||||
// rotation interval.
|
||||
func (l *queryLog) checkAndRotate() {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
var rotationIvl time.Duration
|
||||
func() {
|
||||
l.confMu.RLock()
|
||||
defer l.confMu.RUnlock()
|
||||
|
||||
rotationIvl = l.conf.RotationIvl
|
||||
}()
|
||||
|
||||
oldest, err := l.readFileFirstTimeValue()
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
@@ -165,11 +165,11 @@ func (l *queryLog) checkAndRotate() {
|
||||
return
|
||||
}
|
||||
|
||||
if rot, now := oldest.Add(l.conf.RotationIvl), time.Now(); rot.After(now) {
|
||||
if rotTime, now := oldest.Add(rotationIvl), time.Now(); rotTime.After(now) {
|
||||
log.Debug(
|
||||
"querylog: %s <= %s, not rotating",
|
||||
now.Format(time.RFC3339),
|
||||
rot.Format(time.RFC3339),
|
||||
rotTime.Format(time.RFC3339),
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
@@ -76,15 +76,20 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie
|
||||
// search - searches log entries in the query log using specified parameters
|
||||
// returns the list of entries found + time of the oldest entry
|
||||
func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest time.Time) {
|
||||
now := time.Now()
|
||||
start := time.Now()
|
||||
|
||||
if params.limit == 0 {
|
||||
return []*logEntry{}, time.Time{}
|
||||
}
|
||||
|
||||
cache := clientCache{}
|
||||
fileEntries, oldest, total := l.searchFiles(params, cache)
|
||||
|
||||
memoryEntries, bufLen := l.searchMemory(params, cache)
|
||||
log.Debug("querylog: got %d entries from memory", len(memoryEntries))
|
||||
|
||||
fileEntries, oldest, total := l.searchFiles(params, cache)
|
||||
log.Debug("querylog: got %d entries from files", len(fileEntries))
|
||||
|
||||
total += bufLen
|
||||
|
||||
totalLimit := params.offset + params.limit
|
||||
@@ -123,7 +128,7 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
|
||||
len(entries),
|
||||
total,
|
||||
params.olderThan,
|
||||
time.Since(now),
|
||||
time.Since(start),
|
||||
)
|
||||
|
||||
return entries, oldest
|
||||
@@ -145,13 +150,14 @@ func (l *queryLog) searchFiles(
|
||||
|
||||
r, err := NewQLogReader(files)
|
||||
if err != nil {
|
||||
log.Error("querylog: failed to open qlog reader: %s", err)
|
||||
log.Error("querylog: opening qlog reader: %s", err)
|
||||
|
||||
return entries, oldest, 0
|
||||
}
|
||||
|
||||
defer func() {
|
||||
derr := r.Close()
|
||||
if derr != nil {
|
||||
closeErr := r.Close()
|
||||
if closeErr != nil {
|
||||
log.Error("querylog: closing file: %s", err)
|
||||
}
|
||||
}()
|
||||
@@ -161,8 +167,8 @@ func (l *queryLog) searchFiles(
|
||||
} else {
|
||||
err = r.seekTS(params.olderThan.UnixNano())
|
||||
if err == nil {
|
||||
// Read to the next record, because we only need the one
|
||||
// that goes after it.
|
||||
// Read to the next record, because we only need the one that goes
|
||||
// after it.
|
||||
_, err = r.ReadNext()
|
||||
}
|
||||
}
|
||||
@@ -176,9 +182,9 @@ func (l *queryLog) searchFiles(
|
||||
totalLimit := params.offset + params.limit
|
||||
oldestNano := int64(0)
|
||||
|
||||
// By default, we do not scan more than maxFileScanEntries at once.
|
||||
// The idea is to make search calls faster so that the UI could handle
|
||||
// it and show something quicker. This behavior can be overridden if
|
||||
// By default, we do not scan more than maxFileScanEntries at once. The
|
||||
// idea is to make search calls faster so that the UI could handle it and
|
||||
// show something quicker. This behavior can be overridden if
|
||||
// maxFileScanEntries is set to 0.
|
||||
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
|
||||
var e *logEntry
|
||||
|
||||
@@ -35,7 +35,7 @@ func TestQueryLog_Search_findClient(t *testing.T) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
l := newQueryLog(Config{
|
||||
l, err := newQueryLog(Config{
|
||||
FindClient: findClient,
|
||||
BaseDir: t.TempDir(),
|
||||
RotationIvl: timeutil.Day,
|
||||
@@ -44,6 +44,7 @@ func TestQueryLog_Search_findClient(t *testing.T) {
|
||||
FileEnabled: true,
|
||||
AnonymizeClientIP: false,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(l.Close)
|
||||
|
||||
q := &dns.Msg{
|
||||
|
||||
Reference in New Issue
Block a user