* querylog: refactor: move HTTP handlers to querylog/
This commit is contained in:
@@ -16,7 +16,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
logBufferCap = 5000 // maximum capacity of logBuffer before it's flushed to disk
|
||||
logBufferCap = 5000 // maximum capacity of buffer before it's flushed to disk
|
||||
queryLogFileName = "querylog.json" // .gz added during compression
|
||||
getDataLimit = 500 // GetData(): maximum log entries to return
|
||||
|
||||
@@ -29,10 +29,11 @@ type queryLog struct {
|
||||
conf Config
|
||||
logFile string // path to the log file
|
||||
|
||||
logBufferLock sync.RWMutex
|
||||
logBuffer []*logEntry
|
||||
bufferLock sync.RWMutex
|
||||
buffer []*logEntry
|
||||
fileFlushLock sync.Mutex // synchronize a file-flushing goroutine and main thread
|
||||
flushPending bool // don't start another goroutine while the previous one is still running
|
||||
fileWriteLock sync.Mutex
|
||||
}
|
||||
|
||||
// create a new instance of the query log
|
||||
@@ -40,7 +41,13 @@ func newQueryLog(conf Config) *queryLog {
|
||||
l := queryLog{}
|
||||
l.logFile = filepath.Join(conf.BaseDir, queryLogFileName)
|
||||
l.conf = conf
|
||||
go l.periodicQueryLogRotate()
|
||||
if !checkInterval(l.conf.Interval) {
|
||||
l.conf.Interval = 1
|
||||
}
|
||||
if l.conf.HTTPRegister != nil {
|
||||
l.initWeb()
|
||||
}
|
||||
go l.periodicRotate()
|
||||
return &l
|
||||
}
|
||||
|
||||
@@ -48,18 +55,30 @@ func (l *queryLog) Close() {
|
||||
_ = l.flushLogBuffer(true)
|
||||
}
|
||||
|
||||
func (l *queryLog) Configure(conf Config) {
|
||||
l.conf = conf
|
||||
func checkInterval(days uint32) bool {
|
||||
return days == 1 || days == 7 || days == 30 || days == 90
|
||||
}
|
||||
|
||||
func (l *queryLog) Clear() {
|
||||
// Set new configuration at runtime
|
||||
func (l *queryLog) configure(conf Config) {
|
||||
l.conf.Enabled = conf.Enabled
|
||||
l.conf.Interval = conf.Interval
|
||||
}
|
||||
|
||||
func (l *queryLog) WriteDiskConfig(dc *DiskConfig) {
|
||||
dc.Enabled = l.conf.Enabled
|
||||
dc.Interval = l.conf.Interval
|
||||
}
|
||||
|
||||
// Clear memory buffer and remove log files
|
||||
func (l *queryLog) clear() {
|
||||
l.fileFlushLock.Lock()
|
||||
defer l.fileFlushLock.Unlock()
|
||||
|
||||
l.logBufferLock.Lock()
|
||||
l.logBuffer = nil
|
||||
l.bufferLock.Lock()
|
||||
l.buffer = nil
|
||||
l.flushPending = false
|
||||
l.logBufferLock.Unlock()
|
||||
l.bufferLock.Unlock()
|
||||
|
||||
err := os.Remove(l.logFile + ".1")
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
@@ -96,6 +115,10 @@ func getIPString(addr net.Addr) string {
|
||||
}
|
||||
|
||||
func (l *queryLog) Add(question *dns.Msg, answer *dns.Msg, result *dnsfilter.Result, elapsed time.Duration, addr net.Addr, upstream string) {
|
||||
if !l.conf.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
var q []byte
|
||||
var a []byte
|
||||
var err error
|
||||
@@ -132,16 +155,16 @@ func (l *queryLog) Add(question *dns.Msg, answer *dns.Msg, result *dnsfilter.Res
|
||||
Upstream: upstream,
|
||||
}
|
||||
|
||||
l.logBufferLock.Lock()
|
||||
l.logBuffer = append(l.logBuffer, &entry)
|
||||
l.bufferLock.Lock()
|
||||
l.buffer = append(l.buffer, &entry)
|
||||
needFlush := false
|
||||
if !l.flushPending {
|
||||
needFlush = len(l.logBuffer) >= logBufferCap
|
||||
needFlush = len(l.buffer) >= logBufferCap
|
||||
if needFlush {
|
||||
l.flushPending = true
|
||||
}
|
||||
}
|
||||
l.logBufferLock.Unlock()
|
||||
l.bufferLock.Unlock()
|
||||
|
||||
// if buffer needs to be flushed to disk, do it now
|
||||
if needFlush {
|
||||
@@ -152,11 +175,9 @@ func (l *queryLog) Add(question *dns.Msg, answer *dns.Msg, result *dnsfilter.Res
|
||||
}
|
||||
|
||||
// Return TRUE if this entry is needed
|
||||
func isNeeded(entry *logEntry, params GetDataParams) bool {
|
||||
if params.ResponseStatus != 0 {
|
||||
if params.ResponseStatus == ResponseStatusFiltered && !entry.Result.IsFiltered {
|
||||
return false
|
||||
}
|
||||
func isNeeded(entry *logEntry, params getDataParams) bool {
|
||||
if params.ResponseStatus == responseStatusFiltered && !entry.Result.IsFiltered {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(params.Domain) != 0 || params.QuestionType != 0 {
|
||||
@@ -193,7 +214,7 @@ func isNeeded(entry *logEntry, params GetDataParams) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (l *queryLog) readFromFile(params GetDataParams) ([]*logEntry, int) {
|
||||
func (l *queryLog) readFromFile(params getDataParams) ([]*logEntry, int) {
|
||||
entries := []*logEntry{}
|
||||
olderThan := params.OlderThan
|
||||
totalChunks := 0
|
||||
@@ -247,7 +268,28 @@ func (l *queryLog) readFromFile(params GetDataParams) ([]*logEntry, int) {
|
||||
return entries, total
|
||||
}
|
||||
|
||||
func (l *queryLog) GetData(params GetDataParams) []map[string]interface{} {
|
||||
// Parameters for getData()
|
||||
type getDataParams struct {
|
||||
OlderThan time.Time // return entries that are older than this value
|
||||
Domain string // filter by domain name in question
|
||||
Client string // filter by client IP
|
||||
QuestionType uint16 // filter by question type
|
||||
ResponseStatus responseStatusType // filter by response status
|
||||
StrictMatchDomain bool // if Domain value must be matched strictly
|
||||
StrictMatchClient bool // if Client value must be matched strictly
|
||||
}
|
||||
|
||||
// Response status
|
||||
type responseStatusType int32
|
||||
|
||||
// Response status constants
|
||||
const (
|
||||
responseStatusAll responseStatusType = iota + 1
|
||||
responseStatusFiltered
|
||||
)
|
||||
|
||||
// Get log entries
|
||||
func (l *queryLog) getData(params getDataParams) []map[string]interface{} {
|
||||
var data = []map[string]interface{}{}
|
||||
|
||||
if len(params.Domain) != 0 && params.StrictMatchDomain {
|
||||
@@ -266,9 +308,9 @@ func (l *queryLog) GetData(params GetDataParams) []map[string]interface{} {
|
||||
}
|
||||
|
||||
// add from memory buffer
|
||||
l.logBufferLock.Lock()
|
||||
total += len(l.logBuffer)
|
||||
for _, entry := range l.logBuffer {
|
||||
l.bufferLock.Lock()
|
||||
total += len(l.buffer)
|
||||
for _, entry := range l.buffer {
|
||||
|
||||
if !isNeeded(entry, params) {
|
||||
continue
|
||||
@@ -283,7 +325,7 @@ func (l *queryLog) GetData(params GetDataParams) []map[string]interface{} {
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
l.logBufferLock.Unlock()
|
||||
l.bufferLock.Unlock()
|
||||
|
||||
// process the elements from latest to oldest
|
||||
for i := len(entries) - 1; i >= 0; i-- {
|
||||
|
||||
Reference in New Issue
Block a user