cherry-pick: 4358 stats races
Merge in DNS/adguard-home from 4358-stats-races to master Updates #4358 Squashed commit of the following: commit 162d17b04d95adad21fb9b3c5a6fb64df2e037ec Merge: 17732cfad4c3a43bAuthor: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Wed Aug 17 14:04:20 2022 +0300 Merge branch 'master' into 4358-stats-races commit 17732cfa0f3b2589bf2c252697eee1d6b358a66c Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Wed Aug 17 13:53:42 2022 +0300 stats: imp docs, locking commit 4ee090869af0fa2b777c12027c3b77d5acd6e4de Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Tue Aug 16 20:26:19 2022 +0300 stats: revert const commit a7681a1b882cef04511fcd5d569f5abe2f955239 Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Tue Aug 16 20:23:00 2022 +0300 stats: imp concurrency commit a6c6c1a0572e4201cd24644fd3f86f51fc27f633 Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Tue Aug 16 19:51:30 2022 +0300 stats: imp code, tests, docs commit 954196b49f5ad91d91f445ff656e63c318e4124c Merge: 281e00da6e63757fAuthor: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Tue Aug 16 13:07:32 2022 +0300 Merge branch 'master' into 4358-stats-races commit 281e00daf781d045269584ce0158eed1d77918df Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Fri Aug 12 16:22:18 2022 +0300 stats: imp closing commit ed036d9aa7e25498869edfb866b6e923538970eb Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Fri Aug 12 16:11:12 2022 +0300 stats: imp tests more commit f848a12487ecd2afc8416e800510090cc1be7330 Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Fri Aug 12 13:54:19 2022 +0300 stats: imp tests, code commit 60e11f042d51ec68850143129e61c701c5e4f3a4 Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Thu Aug 11 16:36:07 2022 +0300 stats: fix test commit 6d97f1db093b5ce0d37984ff96a9ef6f4e02dba1 Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Thu Aug 11 14:53:21 2022 +0300 stats: imp code, docs commit 20c70c2847b0de6c7f9271a8d9a831175ed0c499 Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Wed Aug 10 20:53:36 2022 +0300 stats: imp shared memory safety commit 8b3945670a190bab070171e6b4976edab1e3e2a2 Author: Eugene Burkov <E.Burkov@AdGuard.COM> Date: Wed Aug 10 17:22:55 2022 +0300 stats: imp code
This commit is contained in:
committed by
Ainar Garipov
parent
7bb9b2416b
commit
993a3fc42c
@@ -5,14 +5,9 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
"go.etcd.io/bbolt"
|
||||
@@ -22,51 +17,65 @@ import (
|
||||
// inspection. Improve logging. Decrease complexity.
|
||||
|
||||
const (
|
||||
maxDomains = 100 // max number of top domains to store in file or return via Get()
|
||||
maxClients = 100 // max number of top clients to store in file or return via Get()
|
||||
// maxDomains is the max number of top domains to return.
|
||||
maxDomains = 100
|
||||
// maxClients is the max number of top clients to return.
|
||||
maxClients = 100
|
||||
)
|
||||
|
||||
// StatsCtx collects the statistics and flushes it to the database. Its default
|
||||
// flushing interval is one hour.
|
||||
// UnitIDGenFunc is the signature of a function that generates a unique ID for
|
||||
// the statistics unit.
|
||||
type UnitIDGenFunc func() (id uint32)
|
||||
|
||||
// TimeUnit is the unit of measuring time while aggregating the statistics.
|
||||
type TimeUnit int
|
||||
|
||||
// Supported TimeUnit values.
|
||||
const (
|
||||
Hours TimeUnit = iota
|
||||
Days
|
||||
)
|
||||
|
||||
// Result is the resulting code of processing the DNS request.
|
||||
type Result int
|
||||
|
||||
// Supported Result values.
|
||||
//
|
||||
// TODO(e.burkov): Use atomic.Pointer for accessing curr and db in go1.19.
|
||||
type StatsCtx struct {
|
||||
// currMu protects the current unit.
|
||||
currMu *sync.Mutex
|
||||
// curr is the actual statistics collection result.
|
||||
curr *unit
|
||||
// TODO(e.burkov): Think about better naming.
|
||||
const (
|
||||
RNotFiltered Result = iota + 1
|
||||
RFiltered
|
||||
RSafeBrowsing
|
||||
RSafeSearch
|
||||
RParental
|
||||
|
||||
// dbMu protects db.
|
||||
dbMu *sync.Mutex
|
||||
// db is the opened statistics database, if any.
|
||||
db *bbolt.DB
|
||||
resultLast = RParental + 1
|
||||
)
|
||||
|
||||
// unitIDGen is the function that generates an identifier for the current
|
||||
// unit. It's here for only testing purposes.
|
||||
unitIDGen UnitIDGenFunc
|
||||
// Entry is a statistics data entry.
|
||||
type Entry struct {
|
||||
// Clients is the client's primary ID.
|
||||
//
|
||||
// TODO(a.garipov): Make this a {net.IP, string} enum?
|
||||
Client string
|
||||
|
||||
// httpRegister is used to set HTTP handlers.
|
||||
httpRegister aghhttp.RegisterFunc
|
||||
// Domain is the domain name requested.
|
||||
Domain string
|
||||
|
||||
// configModified is called whenever the configuration is modified via web
|
||||
// interface.
|
||||
configModified func()
|
||||
// Result is the result of processing the request.
|
||||
Result Result
|
||||
|
||||
// filename is the name of database file.
|
||||
filename string
|
||||
|
||||
// limitHours is the maximum number of hours to collect statistics into the
|
||||
// current unit.
|
||||
limitHours uint32
|
||||
// Time is the duration of the request processing in milliseconds.
|
||||
Time uint32
|
||||
}
|
||||
|
||||
// unit collects the statistics data for a specific period of time.
|
||||
type unit struct {
|
||||
// mu protects all the fields of a unit.
|
||||
mu *sync.RWMutex
|
||||
|
||||
// id is the unique unit's identifier. It's set to an absolute hour number
|
||||
// since the beginning of UNIX time by the default ID generating function.
|
||||
//
|
||||
// Must not be rewritten after creating to be accessed concurrently without
|
||||
// using mu.
|
||||
id uint32
|
||||
|
||||
// nTotal stores the total number of requests.
|
||||
@@ -86,44 +95,15 @@ type unit struct {
|
||||
clients map[string]uint64
|
||||
}
|
||||
|
||||
// ongoing returns the current unit. It's safe for concurrent use.
|
||||
//
|
||||
// Note that the unit itself should be locked before accessing.
|
||||
func (s *StatsCtx) ongoing() (u *unit) {
|
||||
s.currMu.Lock()
|
||||
defer s.currMu.Unlock()
|
||||
|
||||
return s.curr
|
||||
}
|
||||
|
||||
// swapCurrent swaps the current unit with another and returns it. It's safe
|
||||
// for concurrent use.
|
||||
func (s *StatsCtx) swapCurrent(with *unit) (old *unit) {
|
||||
s.currMu.Lock()
|
||||
defer s.currMu.Unlock()
|
||||
|
||||
old, s.curr = s.curr, with
|
||||
|
||||
return old
|
||||
}
|
||||
|
||||
// database returns the database if it's opened. It's safe for concurrent use.
|
||||
func (s *StatsCtx) database() (db *bbolt.DB) {
|
||||
s.dbMu.Lock()
|
||||
defer s.dbMu.Unlock()
|
||||
|
||||
return s.db
|
||||
}
|
||||
|
||||
// swapDatabase swaps the database with another one and returns it. It's safe
|
||||
// for concurrent use.
|
||||
func (s *StatsCtx) swapDatabase(with *bbolt.DB) (old *bbolt.DB) {
|
||||
s.dbMu.Lock()
|
||||
defer s.dbMu.Unlock()
|
||||
|
||||
old, s.db = s.db, with
|
||||
|
||||
return old
|
||||
// newUnit allocates the new *unit.
|
||||
func newUnit(id uint32) (u *unit) {
|
||||
return &unit{
|
||||
id: id,
|
||||
nResult: make([]uint64, resultLast),
|
||||
domains: make(map[string]uint64),
|
||||
blockedDomains: make(map[string]uint64),
|
||||
clients: make(map[string]uint64),
|
||||
}
|
||||
}
|
||||
|
||||
// countPair is a single name-number pair for deserializing statistics data into
|
||||
@@ -133,7 +113,7 @@ type countPair struct {
|
||||
Count uint64
|
||||
}
|
||||
|
||||
// unitDB is the structure for deserializing statistics data into the database.
|
||||
// unitDB is the structure for serializing statistics data into the database.
|
||||
type unitDB struct {
|
||||
// NTotal is the total number of requests.
|
||||
NTotal uint64
|
||||
@@ -152,157 +132,6 @@ type unitDB struct {
|
||||
TimeAvg uint32
|
||||
}
|
||||
|
||||
// withRecovered turns the value recovered from panic if any into an error and
|
||||
// combines it with the one pointed by orig. orig must be non-nil.
|
||||
func withRecovered(orig *error) {
|
||||
p := recover()
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
switch p := p.(type) {
|
||||
case error:
|
||||
err = fmt.Errorf("panic: %w", p)
|
||||
default:
|
||||
err = fmt.Errorf("panic: recovered value of type %[1]T: %[1]v", p)
|
||||
}
|
||||
|
||||
*orig = errors.WithDeferred(*orig, err)
|
||||
}
|
||||
|
||||
// isEnabled is a helper that check if the statistics collecting is enabled.
|
||||
func (s *StatsCtx) isEnabled() (ok bool) {
|
||||
return atomic.LoadUint32(&s.limitHours) != 0
|
||||
}
|
||||
|
||||
// New creates s from conf and properly initializes it. Don't use s before
|
||||
// calling it's Start method.
|
||||
func New(conf Config) (s *StatsCtx, err error) {
|
||||
defer withRecovered(&err)
|
||||
|
||||
s = &StatsCtx{
|
||||
currMu: &sync.Mutex{},
|
||||
dbMu: &sync.Mutex{},
|
||||
filename: conf.Filename,
|
||||
configModified: conf.ConfigModified,
|
||||
httpRegister: conf.HTTPRegister,
|
||||
}
|
||||
if s.limitHours = conf.LimitDays * 24; !checkInterval(conf.LimitDays) {
|
||||
s.limitHours = 24
|
||||
}
|
||||
if s.unitIDGen = newUnitID; conf.UnitID != nil {
|
||||
s.unitIDGen = conf.UnitID
|
||||
}
|
||||
|
||||
if err = s.dbOpen(); err != nil {
|
||||
return nil, fmt.Errorf("opening database: %w", err)
|
||||
}
|
||||
|
||||
id := s.unitIDGen()
|
||||
tx := beginTxn(s.db, true)
|
||||
var udb *unitDB
|
||||
if tx != nil {
|
||||
log.Tracef("Deleting old units...")
|
||||
firstID := id - s.limitHours - 1
|
||||
unitDel := 0
|
||||
|
||||
err = tx.ForEach(newBucketWalker(tx, &unitDel, firstID))
|
||||
if err != nil && !errors.Is(err, errStop) {
|
||||
log.Debug("stats: deleting units: %s", err)
|
||||
}
|
||||
|
||||
udb = s.loadUnitFromDB(tx, id)
|
||||
|
||||
if unitDel != 0 {
|
||||
s.commitTxn(tx)
|
||||
} else {
|
||||
err = tx.Rollback()
|
||||
if err != nil {
|
||||
log.Debug("rolling back: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
u := newUnit(id)
|
||||
// This use of deserialize is safe since the accessed unit has just been
|
||||
// created.
|
||||
u.deserialize(udb)
|
||||
s.curr = u
|
||||
|
||||
log.Debug("stats: initialized")
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// TODO(a.garipov): See if this is actually necessary. Looks like a rather
|
||||
// bizarre solution.
|
||||
const errStop errors.Error = "stop iteration"
|
||||
|
||||
// newBucketWalker returns a new bucket walker that deletes old units. The
|
||||
// integer that unitDelPtr points to is incremented for every successful
|
||||
// deletion. If the bucket isn't deleted, f returns errStop.
|
||||
func newBucketWalker(
|
||||
tx *bbolt.Tx,
|
||||
unitDelPtr *int,
|
||||
firstID uint32,
|
||||
) (f func(name []byte, b *bbolt.Bucket) (err error)) {
|
||||
return func(name []byte, _ *bbolt.Bucket) (err error) {
|
||||
nameID, ok := unitNameToID(name)
|
||||
if !ok || nameID < firstID {
|
||||
err = tx.DeleteBucket(name)
|
||||
if err != nil {
|
||||
log.Debug("stats: tx.DeleteBucket: %s", err)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("stats: deleted unit %d (name %x)", nameID, name)
|
||||
|
||||
*unitDelPtr++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return errStop
|
||||
}
|
||||
}
|
||||
|
||||
// Start makes s process the incoming data.
|
||||
func (s *StatsCtx) Start() {
|
||||
s.initWeb()
|
||||
go s.periodicFlush()
|
||||
}
|
||||
|
||||
// checkInterval returns true if days is valid to be used as statistics
|
||||
// retention interval. The valid values are 0, 1, 7, 30 and 90.
|
||||
func checkInterval(days uint32) (ok bool) {
|
||||
return days == 0 || days == 1 || days == 7 || days == 30 || days == 90
|
||||
}
|
||||
|
||||
// dbOpen returns an error if the database can't be opened from the specified
|
||||
// file. It's safe for concurrent use.
|
||||
func (s *StatsCtx) dbOpen() (err error) {
|
||||
log.Tracef("db.Open...")
|
||||
|
||||
s.dbMu.Lock()
|
||||
defer s.dbMu.Unlock()
|
||||
|
||||
s.db, err = bbolt.Open(s.filename, 0o644, nil)
|
||||
if err != nil {
|
||||
log.Error("stats: open DB: %s: %s", s.filename, err)
|
||||
if err.Error() == "invalid argument" {
|
||||
log.Error("AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
log.Tracef("db.Open")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newUnitID is the default UnitIDGenFunc that generates the unique id hourly.
|
||||
func newUnitID() (id uint32) {
|
||||
const secsInHour = int64(time.Hour / time.Second)
|
||||
@@ -310,50 +139,14 @@ func newUnitID() (id uint32) {
|
||||
return uint32(time.Now().Unix() / secsInHour)
|
||||
}
|
||||
|
||||
// newUnit allocates the new *unit.
|
||||
func newUnit(id uint32) (u *unit) {
|
||||
return &unit{
|
||||
mu: &sync.RWMutex{},
|
||||
id: id,
|
||||
nResult: make([]uint64, resultLast),
|
||||
domains: make(map[string]uint64),
|
||||
blockedDomains: make(map[string]uint64),
|
||||
clients: make(map[string]uint64),
|
||||
}
|
||||
}
|
||||
|
||||
// beginTxn opens a new database transaction. If writable is true, the
|
||||
// transaction will be opened for writing, and for reading otherwise. It
|
||||
// returns nil if the transaction can't be created.
|
||||
func beginTxn(db *bbolt.DB, writable bool) (tx *bbolt.Tx) {
|
||||
if db == nil {
|
||||
return nil
|
||||
func finishTxn(tx *bbolt.Tx, commit bool) (err error) {
|
||||
if commit {
|
||||
err = errors.Annotate(tx.Commit(), "committing: %w")
|
||||
} else {
|
||||
err = errors.Annotate(tx.Rollback(), "rolling back: %w")
|
||||
}
|
||||
|
||||
log.Tracef("opening a database transaction")
|
||||
|
||||
tx, err := db.Begin(writable)
|
||||
if err != nil {
|
||||
log.Error("stats: opening a transaction: %s", err)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Tracef("transaction has been opened")
|
||||
|
||||
return tx
|
||||
}
|
||||
|
||||
// commitTxn applies the changes made in tx to the database.
|
||||
func (s *StatsCtx) commitTxn(tx *bbolt.Tx) {
|
||||
err := tx.Commit()
|
||||
if err != nil {
|
||||
log.Error("stats: committing a transaction: %s", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Tracef("transaction has been committed")
|
||||
return err
|
||||
}
|
||||
|
||||
// bucketNameLen is the length of a bucket, a 64-bit unsigned integer.
|
||||
@@ -380,88 +173,34 @@ func unitNameToID(name []byte) (id uint32, ok bool) {
|
||||
return uint32(binary.BigEndian.Uint64(name)), true
|
||||
}
|
||||
|
||||
// Flush the current unit to DB and delete an old unit when a new hour is started
|
||||
// If a unit must be flushed:
|
||||
// . lock DB
|
||||
// . atomically set a new empty unit as the current one and get the old unit
|
||||
// This is important to do it inside DB lock, so the reader won't get inconsistent results.
|
||||
// . write the unit to DB
|
||||
// . remove the stale unit from DB
|
||||
// . unlock DB
|
||||
func (s *StatsCtx) periodicFlush() {
|
||||
for ptr := s.ongoing(); ptr != nil; ptr = s.ongoing() {
|
||||
id := s.unitIDGen()
|
||||
// Access the unit's ID with atomic to avoid locking the whole unit.
|
||||
if !s.isEnabled() || atomic.LoadUint32(&ptr.id) == id {
|
||||
time.Sleep(time.Second)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
tx := beginTxn(s.database(), true)
|
||||
|
||||
nu := newUnit(id)
|
||||
u := s.swapCurrent(nu)
|
||||
udb := u.serialize()
|
||||
|
||||
if tx == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
flushOK := flushUnitToDB(tx, u.id, udb)
|
||||
delOK := s.deleteUnit(tx, id-atomic.LoadUint32(&s.limitHours))
|
||||
if flushOK || delOK {
|
||||
s.commitTxn(tx)
|
||||
} else {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
}
|
||||
|
||||
log.Tracef("periodicFlush() exited")
|
||||
}
|
||||
|
||||
// deleteUnit removes the unit by it's id from the database the tx belongs to.
|
||||
func (s *StatsCtx) deleteUnit(tx *bbolt.Tx, id uint32) bool {
|
||||
err := tx.DeleteBucket(idToUnitName(id))
|
||||
if err != nil {
|
||||
log.Tracef("stats: bolt DeleteBucket: %s", err)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
log.Debug("stats: deleted unit %d", id)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func convertMapToSlice(m map[string]uint64, max int) []countPair {
|
||||
a := []countPair{}
|
||||
func convertMapToSlice(m map[string]uint64, max int) (s []countPair) {
|
||||
s = make([]countPair, 0, len(m))
|
||||
for k, v := range m {
|
||||
a = append(a, countPair{Name: k, Count: v})
|
||||
s = append(s, countPair{Name: k, Count: v})
|
||||
}
|
||||
less := func(i, j int) bool {
|
||||
return a[j].Count < a[i].Count
|
||||
|
||||
sort.Slice(s, func(i, j int) bool {
|
||||
return s[j].Count < s[i].Count
|
||||
})
|
||||
if max > len(s) {
|
||||
max = len(s)
|
||||
}
|
||||
sort.Slice(a, less)
|
||||
if max > len(a) {
|
||||
max = len(a)
|
||||
}
|
||||
return a[:max]
|
||||
|
||||
return s[:max]
|
||||
}
|
||||
|
||||
func convertSliceToMap(a []countPair) map[string]uint64 {
|
||||
m := map[string]uint64{}
|
||||
func convertSliceToMap(a []countPair) (m map[string]uint64) {
|
||||
m = map[string]uint64{}
|
||||
for _, it := range a {
|
||||
m[it.Name] = it.Count
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// serialize converts u to the *unitDB. It's safe for concurrent use.
|
||||
// serialize converts u to the *unitDB. It's safe for concurrent use. u must
|
||||
// not be nil.
|
||||
func (u *unit) serialize() (udb *unitDB) {
|
||||
u.mu.RLock()
|
||||
defer u.mu.RUnlock()
|
||||
|
||||
var timeAvg uint32 = 0
|
||||
if u.nTotal != 0 {
|
||||
timeAvg = uint32(u.timeSum / u.nTotal)
|
||||
@@ -477,6 +216,28 @@ func (u *unit) serialize() (udb *unitDB) {
|
||||
}
|
||||
}
|
||||
|
||||
func loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) {
|
||||
bkt := tx.Bucket(idToUnitName(id))
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Tracef("Loading unit %d", id)
|
||||
|
||||
var buf bytes.Buffer
|
||||
buf.Write(bkt.Get([]byte{0}))
|
||||
udb = &unitDB{}
|
||||
|
||||
err := gob.NewDecoder(&buf).Decode(udb)
|
||||
if err != nil {
|
||||
log.Error("gob Decode: %s", err)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return udb
|
||||
}
|
||||
|
||||
// deserealize assigns the appropriate values from udb to u. u must not be nil.
|
||||
// It's safe for concurrent use.
|
||||
func (u *unit) deserialize(udb *unitDB) {
|
||||
@@ -484,9 +245,6 @@ func (u *unit) deserialize(udb *unitDB) {
|
||||
return
|
||||
}
|
||||
|
||||
u.mu.Lock()
|
||||
defer u.mu.Unlock()
|
||||
|
||||
u.nTotal = udb.NTotal
|
||||
u.nResult = make([]uint64, resultLast)
|
||||
copy(u.nResult, udb.NResult)
|
||||
@@ -496,51 +254,41 @@ func (u *unit) deserialize(udb *unitDB) {
|
||||
u.timeSum = uint64(udb.TimeAvg) * udb.NTotal
|
||||
}
|
||||
|
||||
func flushUnitToDB(tx *bbolt.Tx, id uint32, udb *unitDB) bool {
|
||||
log.Tracef("Flushing unit %d", id)
|
||||
// add adds new data to u. It's safe for concurrent use.
|
||||
func (u *unit) add(res Result, domain, cli string, dur uint64) {
|
||||
u.nResult[res]++
|
||||
if res == RNotFiltered {
|
||||
u.domains[domain]++
|
||||
} else {
|
||||
u.blockedDomains[domain]++
|
||||
}
|
||||
|
||||
u.clients[cli]++
|
||||
u.timeSum += dur
|
||||
u.nTotal++
|
||||
}
|
||||
|
||||
// flushUnitToDB puts udb to the database at id.
|
||||
func (udb *unitDB) flushUnitToDB(tx *bbolt.Tx, id uint32) (err error) {
|
||||
log.Debug("stats: flushing unit with id %d and total of %d", id, udb.NTotal)
|
||||
|
||||
bkt, err := tx.CreateBucketIfNotExists(idToUnitName(id))
|
||||
if err != nil {
|
||||
log.Error("tx.CreateBucketIfNotExists: %s", err)
|
||||
return false
|
||||
return fmt.Errorf("creating bucket: %w", err)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
enc := gob.NewEncoder(&buf)
|
||||
err = enc.Encode(udb)
|
||||
buf := &bytes.Buffer{}
|
||||
err = gob.NewEncoder(buf).Encode(udb)
|
||||
if err != nil {
|
||||
log.Error("gob.Encode: %s", err)
|
||||
return false
|
||||
return fmt.Errorf("encoding unit: %w", err)
|
||||
}
|
||||
|
||||
err = bkt.Put([]byte{0}, buf.Bytes())
|
||||
if err != nil {
|
||||
log.Error("bkt.Put: %s", err)
|
||||
return false
|
||||
return fmt.Errorf("putting unit to database: %w", err)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *StatsCtx) loadUnitFromDB(tx *bbolt.Tx, id uint32) *unitDB {
|
||||
bkt := tx.Bucket(idToUnitName(id))
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// log.Tracef("Loading unit %d", id)
|
||||
|
||||
var buf bytes.Buffer
|
||||
buf.Write(bkt.Get([]byte{0}))
|
||||
dec := gob.NewDecoder(&buf)
|
||||
udb := unitDB{}
|
||||
err := dec.Decode(&udb)
|
||||
if err != nil {
|
||||
log.Error("gob Decode: %s", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return &udb
|
||||
return nil
|
||||
}
|
||||
|
||||
func convertTopSlice(a []countPair) (m []map[string]uint64) {
|
||||
@@ -552,144 +300,6 @@ func convertTopSlice(a []countPair) (m []map[string]uint64) {
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *StatsCtx) setLimit(limitDays int) {
|
||||
atomic.StoreUint32(&s.limitHours, uint32(24*limitDays))
|
||||
if limitDays == 0 {
|
||||
s.clear()
|
||||
}
|
||||
|
||||
log.Debug("stats: set limit: %d days", limitDays)
|
||||
}
|
||||
|
||||
func (s *StatsCtx) WriteDiskConfig(dc *DiskConfig) {
|
||||
dc.Interval = atomic.LoadUint32(&s.limitHours) / 24
|
||||
}
|
||||
|
||||
func (s *StatsCtx) Close() {
|
||||
u := s.swapCurrent(nil)
|
||||
|
||||
db := s.database()
|
||||
if tx := beginTxn(db, true); tx != nil {
|
||||
udb := u.serialize()
|
||||
if flushUnitToDB(tx, u.id, udb) {
|
||||
s.commitTxn(tx)
|
||||
} else {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
}
|
||||
|
||||
if db != nil {
|
||||
log.Tracef("db.Close...")
|
||||
_ = db.Close()
|
||||
log.Tracef("db.Close")
|
||||
}
|
||||
|
||||
log.Debug("stats: closed")
|
||||
}
|
||||
|
||||
// Reset counters and clear database
|
||||
func (s *StatsCtx) clear() {
|
||||
db := s.database()
|
||||
tx := beginTxn(db, true)
|
||||
if tx != nil {
|
||||
_ = s.swapDatabase(nil)
|
||||
_ = tx.Rollback()
|
||||
// the active transactions can continue using database,
|
||||
// but no new transactions will be opened
|
||||
_ = db.Close()
|
||||
log.Tracef("db.Close")
|
||||
// all active transactions are now closed
|
||||
}
|
||||
|
||||
u := newUnit(s.unitIDGen())
|
||||
_ = s.swapCurrent(u)
|
||||
|
||||
err := os.Remove(s.filename)
|
||||
if err != nil {
|
||||
log.Error("os.Remove: %s", err)
|
||||
}
|
||||
|
||||
_ = s.dbOpen()
|
||||
|
||||
log.Debug("stats: cleared")
|
||||
}
|
||||
|
||||
func (s *StatsCtx) Update(e Entry) {
|
||||
if !s.isEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
if e.Result == 0 ||
|
||||
e.Result >= resultLast ||
|
||||
e.Domain == "" ||
|
||||
e.Client == "" {
|
||||
return
|
||||
}
|
||||
|
||||
clientID := e.Client
|
||||
if ip := net.ParseIP(clientID); ip != nil {
|
||||
clientID = ip.String()
|
||||
}
|
||||
|
||||
u := s.ongoing()
|
||||
if u == nil {
|
||||
return
|
||||
}
|
||||
|
||||
u.mu.Lock()
|
||||
defer u.mu.Unlock()
|
||||
|
||||
u.nResult[e.Result]++
|
||||
if e.Result == RNotFiltered {
|
||||
u.domains[e.Domain]++
|
||||
} else {
|
||||
u.blockedDomains[e.Domain]++
|
||||
}
|
||||
|
||||
u.clients[clientID]++
|
||||
u.timeSum += uint64(e.Time)
|
||||
u.nTotal++
|
||||
}
|
||||
|
||||
func (s *StatsCtx) loadUnits(limit uint32) ([]*unitDB, uint32) {
|
||||
tx := beginTxn(s.database(), false)
|
||||
if tx == nil {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
cur := s.ongoing()
|
||||
var curID uint32
|
||||
if cur != nil {
|
||||
curID = atomic.LoadUint32(&cur.id)
|
||||
} else {
|
||||
curID = s.unitIDGen()
|
||||
}
|
||||
|
||||
// Per-hour units.
|
||||
units := []*unitDB{}
|
||||
firstID := curID - limit + 1
|
||||
for i := firstID; i != curID; i++ {
|
||||
u := s.loadUnitFromDB(tx, i)
|
||||
if u == nil {
|
||||
u = &unitDB{}
|
||||
u.NResult = make([]uint64, resultLast)
|
||||
}
|
||||
units = append(units, u)
|
||||
}
|
||||
|
||||
_ = tx.Rollback()
|
||||
|
||||
if cur != nil {
|
||||
units = append(units, cur.serialize())
|
||||
}
|
||||
|
||||
if len(units) != int(limit) {
|
||||
log.Fatalf("len(units) != limit: %d %d", len(units), limit)
|
||||
}
|
||||
|
||||
return units, firstID
|
||||
}
|
||||
|
||||
// numsGetter is a signature for statsCollector argument.
|
||||
type numsGetter func(u *unitDB) (num uint64)
|
||||
|
||||
@@ -697,6 +307,7 @@ type numsGetter func(u *unitDB) (num uint64)
|
||||
// timeUnit using ng to retrieve data.
|
||||
func statsCollector(units []*unitDB, firstID uint32, timeUnit TimeUnit, ng numsGetter) (nums []uint64) {
|
||||
if timeUnit == Hours {
|
||||
nums = make([]uint64, 0, len(units))
|
||||
for _, u := range units {
|
||||
nums = append(nums, ng(u))
|
||||
}
|
||||
@@ -738,6 +349,7 @@ func topsCollector(units []*unitDB, max int, pg pairsGetter) []map[string]uint64
|
||||
}
|
||||
}
|
||||
a2 := convertMapToSlice(m, max)
|
||||
|
||||
return convertTopSlice(a2)
|
||||
}
|
||||
|
||||
@@ -768,10 +380,9 @@ func topsCollector(units []*unitDB, max int, pg pairsGetter) []map[string]uint64
|
||||
* parental-blocked
|
||||
These values are just the sum of data for all units.
|
||||
*/
|
||||
func (s *StatsCtx) getData() (statsResponse, bool) {
|
||||
limit := atomic.LoadUint32(&s.limitHours)
|
||||
func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
|
||||
if limit == 0 {
|
||||
return statsResponse{
|
||||
return StatsResp{
|
||||
TimeUnits: "days",
|
||||
|
||||
TopBlocked: []topAddrs{},
|
||||
@@ -792,7 +403,7 @@ func (s *StatsCtx) getData() (statsResponse, bool) {
|
||||
|
||||
units, firstID := s.loadUnits(limit)
|
||||
if units == nil {
|
||||
return statsResponse{}, false
|
||||
return StatsResp{}, false
|
||||
}
|
||||
|
||||
dnsQueries := statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NTotal })
|
||||
@@ -800,7 +411,7 @@ func (s *StatsCtx) getData() (statsResponse, bool) {
|
||||
log.Fatalf("len(dnsQueries) != limit: %d %d", len(dnsQueries), limit)
|
||||
}
|
||||
|
||||
data := statsResponse{
|
||||
data := StatsResp{
|
||||
DNSQueries: dnsQueries,
|
||||
BlockedFiltering: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RFiltered] }),
|
||||
ReplacedSafebrowsing: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RSafeBrowsing] }),
|
||||
@@ -844,31 +455,3 @@ func (s *StatsCtx) getData() (statsResponse, bool) {
|
||||
|
||||
return data, true
|
||||
}
|
||||
|
||||
func (s *StatsCtx) GetTopClientsIP(maxCount uint) []net.IP {
|
||||
if !s.isEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
units, _ := s.loadUnits(atomic.LoadUint32(&s.limitHours))
|
||||
if units == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// top clients
|
||||
m := map[string]uint64{}
|
||||
for _, u := range units {
|
||||
for _, it := range u.Clients {
|
||||
m[it.Name] += it.Count
|
||||
}
|
||||
}
|
||||
a := convertMapToSlice(m, int(maxCount))
|
||||
d := []net.IP{}
|
||||
for _, it := range a {
|
||||
ip := net.ParseIP(it.Name)
|
||||
if ip != nil {
|
||||
d = append(d, ip)
|
||||
}
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user