Pull request 2275: AGDNS-2374-slog-stats
Squashed commit of the following: commit 45b2fc6a05a4f7775d2b6fa056c81d53d4f402d7 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Thu Sep 5 18:07:06 2024 +0300 all: imp code commit 022c90496a46b0a0423dd2cb1c02a3473ba5d224 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Sep 4 19:32:35 2024 +0300 stats: imp code commit bb3c0c8002c34bec7440cd93b7833f7022eef0d8 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Sep 4 19:10:36 2024 +0300 all: imp code commit 363a16f6bb2faa1d9b890b4967684129208af62e Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Sep 4 17:45:31 2024 +0300 all: imp code commit a3c96e3d211cc5e11ba09e334748f65a44b8960a Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Mon Sep 2 20:44:11 2024 +0300 stats: imp code commit 2c0ffd91fddd286254b53be790146a2931b7b55b Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Mon Sep 2 19:47:11 2024 +0300 all: slog stats
This commit is contained in:
@@ -3,8 +3,10 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/netip"
|
||||
"os"
|
||||
"sync"
|
||||
@@ -14,7 +16,7 @@ import (
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghnet"
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
"github.com/AdguardTeam/golibs/log"
|
||||
"github.com/AdguardTeam/golibs/logutil/slogutil"
|
||||
"github.com/AdguardTeam/golibs/timeutil"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
@@ -43,6 +45,10 @@ func validateIvl(ivl time.Duration) (err error) {
|
||||
//
|
||||
// Do not alter any fields of this structure after using it.
|
||||
type Config struct {
|
||||
// Logger is used for logging the operation of the statistics management.
|
||||
// It must not be nil.
|
||||
Logger *slog.Logger
|
||||
|
||||
// UnitID is the function to generate the identifier for current unit. If
|
||||
// nil, the default function is used, see newUnitID.
|
||||
UnitID UnitIDGenFunc
|
||||
@@ -96,6 +102,10 @@ type Interface interface {
|
||||
// StatsCtx collects the statistics and flushes it to the database. Its default
|
||||
// flushing interval is one hour.
|
||||
type StatsCtx struct {
|
||||
// logger is used for logging the operation of the statistics management.
|
||||
// It must not be nil.
|
||||
logger *slog.Logger
|
||||
|
||||
// currMu protects curr.
|
||||
currMu *sync.RWMutex
|
||||
// curr is the actual statistics collection result.
|
||||
@@ -150,6 +160,7 @@ func New(conf Config) (s *StatsCtx, err error) {
|
||||
}
|
||||
|
||||
s = &StatsCtx{
|
||||
logger: conf.Logger,
|
||||
currMu: &sync.RWMutex{},
|
||||
httpRegister: conf.HTTPRegister,
|
||||
configModified: conf.ConfigModified,
|
||||
@@ -178,21 +189,21 @@ func New(conf Config) (s *StatsCtx, err error) {
|
||||
|
||||
tx, err := s.db.Load().Begin(true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stats: opening a transaction: %w", err)
|
||||
return nil, fmt.Errorf("opening a transaction: %w", err)
|
||||
}
|
||||
|
||||
deleted := deleteOldUnits(tx, id-uint32(s.limit.Hours())-1)
|
||||
udb = loadUnitFromDB(tx, id)
|
||||
deleted := s.deleteOldUnits(tx, id-uint32(s.limit.Hours())-1)
|
||||
udb = s.loadUnitFromDB(tx, id)
|
||||
|
||||
err = finishTxn(tx, deleted > 0)
|
||||
if err != nil {
|
||||
log.Error("stats: %s", err)
|
||||
s.logger.Error("finishing transacation", slogutil.KeyError, err)
|
||||
}
|
||||
|
||||
s.curr = newUnit(id)
|
||||
s.curr.deserialize(udb)
|
||||
|
||||
log.Debug("stats: initialized")
|
||||
s.logger.Debug("initialized")
|
||||
|
||||
return s, nil
|
||||
}
|
||||
@@ -228,8 +239,6 @@ func (s *StatsCtx) Start() {
|
||||
|
||||
// Close implements the [io.Closer] interface for *StatsCtx.
|
||||
func (s *StatsCtx) Close() (err error) {
|
||||
defer func() { err = errors.Annotate(err, "stats: closing: %w") }()
|
||||
|
||||
db := s.db.Swap(nil)
|
||||
if db == nil {
|
||||
return nil
|
||||
@@ -237,7 +246,7 @@ func (s *StatsCtx) Close() (err error) {
|
||||
defer func() {
|
||||
cerr := db.Close()
|
||||
if cerr == nil {
|
||||
log.Debug("stats: database closed")
|
||||
s.logger.Debug("database closed")
|
||||
}
|
||||
|
||||
err = errors.WithDeferred(err, cerr)
|
||||
@@ -254,7 +263,7 @@ func (s *StatsCtx) Close() (err error) {
|
||||
|
||||
udb := s.curr.serialize()
|
||||
|
||||
return udb.flushUnitToDB(tx, s.curr.id)
|
||||
return s.flushUnitToDB(udb, tx, s.curr.id)
|
||||
}
|
||||
|
||||
// Update implements the [Interface] interface for *StatsCtx. e must not be
|
||||
@@ -269,7 +278,7 @@ func (s *StatsCtx) Update(e *Entry) {
|
||||
|
||||
err := e.validate()
|
||||
if err != nil {
|
||||
log.Debug("stats: updating: validating entry: %s", err)
|
||||
s.logger.Debug("validating entry", slogutil.KeyError, err)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -278,7 +287,7 @@ func (s *StatsCtx) Update(e *Entry) {
|
||||
defer s.currMu.Unlock()
|
||||
|
||||
if s.curr == nil {
|
||||
log.Error("stats: current unit is nil")
|
||||
s.logger.Error("current unit is nil")
|
||||
|
||||
return
|
||||
}
|
||||
@@ -333,8 +342,8 @@ func (s *StatsCtx) TopClientsIP(maxCount uint) (ips []netip.Addr) {
|
||||
|
||||
// deleteOldUnits walks the buckets available to tx and deletes old units. It
|
||||
// returns the number of deletions performed.
|
||||
func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
|
||||
log.Debug("stats: deleting old units until id %d", firstID)
|
||||
func (s *StatsCtx) deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
|
||||
s.logger.Debug("deleting old units up to", "unit", firstID)
|
||||
|
||||
// TODO(a.garipov): See if this is actually necessary. Looks like a rather
|
||||
// bizarre solution.
|
||||
@@ -348,12 +357,12 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
|
||||
|
||||
err = tx.DeleteBucket(name)
|
||||
if err != nil {
|
||||
log.Debug("stats: deleting bucket: %s", err)
|
||||
s.logger.Debug("deleting bucket", slogutil.KeyError, err)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("stats: deleted unit %d (name %x)", nameID, name)
|
||||
s.logger.Debug("deleted unit", "name_id", nameID, "name", fmt.Sprintf("%x", name))
|
||||
|
||||
deleted++
|
||||
|
||||
@@ -362,7 +371,7 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
|
||||
|
||||
err := tx.ForEach(walk)
|
||||
if err != nil && !errors.Is(err, errStop) {
|
||||
log.Debug("stats: deleting units: %s", err)
|
||||
s.logger.Debug("deleting units", slogutil.KeyError, err)
|
||||
}
|
||||
|
||||
return deleted
|
||||
@@ -371,20 +380,29 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
|
||||
// openDB returns an error if the database can't be opened from the specified
|
||||
// file. It's safe for concurrent use.
|
||||
func (s *StatsCtx) openDB() (err error) {
|
||||
log.Debug("stats: opening database")
|
||||
s.logger.Debug("opening database")
|
||||
|
||||
var db *bbolt.DB
|
||||
db, err = bbolt.Open(s.filename, 0o644, nil)
|
||||
if err != nil {
|
||||
if err.Error() == "invalid argument" {
|
||||
log.Error("AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations")
|
||||
const lines = `AdGuard Home cannot be initialized due to an incompatible file system.
|
||||
Please read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations`
|
||||
|
||||
// TODO(s.chzhen): Use passed context.
|
||||
slogutil.PrintLines(
|
||||
context.TODO(),
|
||||
s.logger,
|
||||
slog.LevelError,
|
||||
"opening database",
|
||||
lines,
|
||||
)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Use defer to unlock the mutex as soon as possible.
|
||||
defer log.Debug("stats: database opened")
|
||||
defer s.logger.Debug("database opened")
|
||||
|
||||
s.db.Store(db)
|
||||
|
||||
@@ -424,34 +442,37 @@ func (s *StatsCtx) flushDB(id, limit uint32, ptr *unit) (cont bool, sleepFor tim
|
||||
isCommitable := true
|
||||
tx, err := db.Begin(true)
|
||||
if err != nil {
|
||||
log.Error("stats: opening transaction: %s", err)
|
||||
s.logger.Error("opening transaction", slogutil.KeyError, err)
|
||||
|
||||
return true, 0
|
||||
}
|
||||
defer func() {
|
||||
if err = finishTxn(tx, isCommitable); err != nil {
|
||||
log.Error("stats: %s", err)
|
||||
s.logger.Error("finishing transaction", slogutil.KeyError, err)
|
||||
}
|
||||
}()
|
||||
|
||||
s.curr = newUnit(id)
|
||||
|
||||
flushErr := ptr.serialize().flushUnitToDB(tx, ptr.id)
|
||||
udb := ptr.serialize()
|
||||
flushErr := s.flushUnitToDB(udb, tx, ptr.id)
|
||||
if flushErr != nil {
|
||||
log.Error("stats: flushing unit: %s", flushErr)
|
||||
s.logger.Error("flushing unit", slogutil.KeyError, flushErr)
|
||||
isCommitable = false
|
||||
}
|
||||
|
||||
delErr := tx.DeleteBucket(idToUnitName(id - limit))
|
||||
|
||||
if delErr != nil {
|
||||
// TODO(e.burkov): Improve the algorithm of deleting the oldest bucket
|
||||
// to avoid the error.
|
||||
if errors.Is(delErr, bbolt.ErrBucketNotFound) {
|
||||
log.Debug("stats: warning: deleting unit: %s", delErr)
|
||||
} else {
|
||||
lvl := slog.LevelWarn
|
||||
if !errors.Is(delErr, bbolt.ErrBucketNotFound) {
|
||||
isCommitable = false
|
||||
log.Error("stats: deleting unit: %s", delErr)
|
||||
lvl = slog.LevelError
|
||||
}
|
||||
|
||||
s.logger.Log(context.TODO(), lvl, "deleting bucket", slogutil.KeyError, delErr)
|
||||
}
|
||||
|
||||
return true, 0
|
||||
@@ -467,7 +488,7 @@ func (s *StatsCtx) periodicFlush() {
|
||||
cont, sleepFor = s.flush()
|
||||
}
|
||||
|
||||
log.Debug("periodic flushing finished")
|
||||
s.logger.Debug("periodic flushing finished")
|
||||
}
|
||||
|
||||
// setLimit sets the limit. s.lock is expected to be locked.
|
||||
@@ -477,16 +498,16 @@ func (s *StatsCtx) setLimit(limit time.Duration) {
|
||||
if limit != 0 {
|
||||
s.enabled = true
|
||||
s.limit = limit
|
||||
log.Debug("stats: set limit: %d days", limit/timeutil.Day)
|
||||
s.logger.Debug("setting limit in days", "num", limit/timeutil.Day)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
s.enabled = false
|
||||
log.Debug("stats: disabled")
|
||||
s.logger.Debug("disabled")
|
||||
|
||||
if err := s.clear(); err != nil {
|
||||
log.Error("stats: %s", err)
|
||||
s.logger.Error("clearing", slogutil.KeyError, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,7 +520,7 @@ func (s *StatsCtx) clear() (err error) {
|
||||
var tx *bbolt.Tx
|
||||
tx, err = db.Begin(true)
|
||||
if err != nil {
|
||||
log.Error("stats: opening a transaction: %s", err)
|
||||
s.logger.Error("opening transaction", slogutil.KeyError, err)
|
||||
} else if err = finishTxn(tx, false); err != nil {
|
||||
// Don't wrap the error since it's informative enough as is.
|
||||
return err
|
||||
@@ -513,21 +534,21 @@ func (s *StatsCtx) clear() (err error) {
|
||||
}
|
||||
|
||||
// All active transactions are now closed.
|
||||
log.Debug("stats: database closed")
|
||||
s.logger.Debug("database closed")
|
||||
}
|
||||
|
||||
err = os.Remove(s.filename)
|
||||
if err != nil {
|
||||
log.Error("stats: %s", err)
|
||||
s.logger.Error("removing", slogutil.KeyError, err)
|
||||
}
|
||||
|
||||
err = s.openDB()
|
||||
if err != nil {
|
||||
log.Error("stats: opening database: %s", err)
|
||||
s.logger.Error("opening database", slogutil.KeyError, err)
|
||||
}
|
||||
|
||||
// Use defer to unlock the mutex as soon as possible.
|
||||
defer log.Debug("stats: cleared")
|
||||
defer s.logger.Debug("cleared")
|
||||
|
||||
s.currMu.Lock()
|
||||
defer s.currMu.Unlock()
|
||||
@@ -548,7 +569,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
|
||||
// taken into account.
|
||||
tx, err := db.Begin(true)
|
||||
if err != nil {
|
||||
log.Error("stats: opening transaction: %s", err)
|
||||
s.logger.Error("opening transaction", slogutil.KeyError, err)
|
||||
|
||||
return nil, 0
|
||||
}
|
||||
@@ -568,7 +589,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
|
||||
units = make([]*unitDB, 0, limit)
|
||||
firstID := curID - limit + 1
|
||||
for i := firstID; i != curID; i++ {
|
||||
u := loadUnitFromDB(tx, i)
|
||||
u := s.loadUnitFromDB(tx, i)
|
||||
if u == nil {
|
||||
u = &unitDB{NResult: make([]uint64, resultLast)}
|
||||
}
|
||||
@@ -577,7 +598,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
|
||||
|
||||
err = finishTxn(tx, false)
|
||||
if err != nil {
|
||||
log.Error("stats: %s", err)
|
||||
s.logger.Error("finishing transaction", slogutil.KeyError, err)
|
||||
}
|
||||
|
||||
if cur != nil {
|
||||
@@ -585,7 +606,8 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
|
||||
}
|
||||
|
||||
if unitsLen := len(units); unitsLen != int(limit) {
|
||||
log.Fatalf("loaded %d units whilst the desired number is %d", unitsLen, limit)
|
||||
// Should not happen.
|
||||
panic(fmt.Errorf("loaded %d units when the desired number is %d", unitsLen, limit))
|
||||
}
|
||||
|
||||
return units, curID
|
||||
|
||||
Reference in New Issue
Block a user