|
|
|
|
@@ -1,11 +1,13 @@
|
|
|
|
|
package querylog
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bufio"
|
|
|
|
|
"bytes"
|
|
|
|
|
"compress/gzip"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"io"
|
|
|
|
|
"os"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/AdguardTeam/golibs/log"
|
|
|
|
|
@@ -146,7 +148,7 @@ type Reader struct {
|
|
|
|
|
ql *queryLog
|
|
|
|
|
|
|
|
|
|
f *os.File
|
|
|
|
|
jd *json.Decoder
|
|
|
|
|
reader *bufio.Reader // reads file line by line
|
|
|
|
|
now time.Time
|
|
|
|
|
validFrom int64 // UNIX time (ns)
|
|
|
|
|
olderThan int64 // UNIX time (ns)
|
|
|
|
|
@@ -215,7 +217,6 @@ func (r *Reader) BeginRead(olderThan time.Time, count uint64) {
|
|
|
|
|
}
|
|
|
|
|
r.filePrepared = false
|
|
|
|
|
r.searching = false
|
|
|
|
|
r.jd = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// BeginReadPrev - start reading the previous data chunk
|
|
|
|
|
@@ -245,7 +246,6 @@ func (r *Reader) BeginReadPrev(olderThan time.Time, count uint64) {
|
|
|
|
|
|
|
|
|
|
r.filePrepared = true
|
|
|
|
|
r.searching = false
|
|
|
|
|
r.jd = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Perform binary seek
|
|
|
|
|
@@ -279,27 +279,17 @@ func (fs *fileSeeker) seekBinary(cur uint64) int32 {
|
|
|
|
|
|
|
|
|
|
// Seek to a new line
|
|
|
|
|
func (r *Reader) seekToNewLine() bool {
|
|
|
|
|
b := make([]byte, maxEntrySize*2)
|
|
|
|
|
|
|
|
|
|
_, err := r.f.Read(b)
|
|
|
|
|
r.reader = bufio.NewReader(r.f)
|
|
|
|
|
b, err := r.reader.ReadBytes('\n')
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.reader = nil
|
|
|
|
|
log.Error("QueryLog: file.Read: %s: %s", r.files[r.ifile], err)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
off := bytes.IndexByte(b, '\n') + 1
|
|
|
|
|
if off == 0 {
|
|
|
|
|
log.Error("QueryLog: Can't find a new line: %s", r.files[r.ifile])
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
off := len(b)
|
|
|
|
|
r.fpos += uint64(off)
|
|
|
|
|
log.Debug("QueryLog: seek: %x (+%d)", r.fpos, off)
|
|
|
|
|
_, err = r.f.Seek(int64(r.fpos), io.SeekStart)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("QueryLog: file.Seek: %s: %s", r.files[r.ifile], err)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -370,7 +360,6 @@ func (r *Reader) prepareRead() bool {
|
|
|
|
|
|
|
|
|
|
// Next - return the next entry or nil if reading is finished
|
|
|
|
|
func (r *Reader) Next() *logEntry { // nolint
|
|
|
|
|
var err error
|
|
|
|
|
for {
|
|
|
|
|
// open file if needed
|
|
|
|
|
if r.f == nil {
|
|
|
|
|
@@ -390,30 +379,26 @@ func (r *Reader) Next() *logEntry { // nolint
|
|
|
|
|
r.filePrepared = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// open decoder if needed
|
|
|
|
|
if r.jd == nil {
|
|
|
|
|
r.jd = json.NewDecoder(r.f)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check if there's data
|
|
|
|
|
if !r.jd.More() {
|
|
|
|
|
r.jd = nil
|
|
|
|
|
// open decoder
|
|
|
|
|
b, err := r.reader.ReadBytes('\n')
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
strReader := strings.NewReader(string(b))
|
|
|
|
|
jd := json.NewDecoder(strReader)
|
|
|
|
|
|
|
|
|
|
// read data
|
|
|
|
|
var entry logEntry
|
|
|
|
|
err = r.jd.Decode(&entry)
|
|
|
|
|
err = jd.Decode(&entry)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("QueryLog: Failed to decode: %s", err)
|
|
|
|
|
r.jd = nil
|
|
|
|
|
return nil
|
|
|
|
|
log.Debug("QueryLog: Failed to decode: %s", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t := entry.Time.UnixNano()
|
|
|
|
|
if r.searching {
|
|
|
|
|
r.jd = nil
|
|
|
|
|
|
|
|
|
|
r.reader = nil
|
|
|
|
|
rr := r.fseeker.seekBinary(uint64(t))
|
|
|
|
|
r.fpos = r.fseeker.pos
|
|
|
|
|
if rr < 0 {
|
|
|
|
|
|