all: sync with master; upd chlog

This commit is contained in:
Ainar Garipov
2023-02-15 16:53:29 +03:00
parent 80eb339896
commit 66b831072c
55 changed files with 24817 additions and 798 deletions

View File

@@ -5,7 +5,6 @@ package stats
import (
"encoding/json"
"net/http"
"sync/atomic"
"time"
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
@@ -41,10 +40,11 @@ type StatsResp struct {
// handleStats handles requests to the GET /control/stats endpoint.
func (s *StatsCtx) handleStats(w http.ResponseWriter, r *http.Request) {
limit := atomic.LoadUint32(&s.limitHours)
s.lock.Lock()
defer s.lock.Unlock()
start := time.Now()
resp, ok := s.getData(limit)
resp, ok := s.getData(s.limitHours)
log.Debug("stats: prepared data in %v", time.Since(start))
if !ok {
@@ -65,7 +65,13 @@ type configResp struct {
// handleStatsInfo handles requests to the GET /control/stats_info endpoint.
func (s *StatsCtx) handleStatsInfo(w http.ResponseWriter, r *http.Request) {
resp := configResp{IntervalDays: atomic.LoadUint32(&s.limitHours) / 24}
s.lock.Lock()
defer s.lock.Unlock()
resp := configResp{IntervalDays: s.limitHours / 24}
if !s.enabled {
resp.IntervalDays = 0
}
_ = aghhttp.WriteJSONResponse(w, r, resp)
}

View File

@@ -15,16 +15,10 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/stringutil"
"go.etcd.io/bbolt"
)
// DiskConfig is the configuration structure that is stored in file.
type DiskConfig struct {
// Interval is the number of days for which the statistics are collected
// before flushing to the database.
Interval uint32 `yaml:"statistics_interval"`
}
// checkInterval returns true if days is valid to be used as statistics
// retention interval. The valid values are 0, 1, 7, 30 and 90.
func checkInterval(days uint32) (ok bool) {
@@ -51,6 +45,12 @@ type Config struct {
// LimitDays is the maximum number of days to collect statistics into the
// current unit.
LimitDays uint32
// Enabled tells if the statistics are enabled.
Enabled bool
// Ignored is the list of host names, which should not be counted.
Ignored *stringutil.Set
}
// Interface is the statistics interface to be used by other packages.
@@ -68,30 +68,22 @@ type Interface interface {
TopClientsIP(limit uint) []netip.Addr
// WriteDiskConfig puts the Interface's configuration to the dc.
WriteDiskConfig(dc *DiskConfig)
WriteDiskConfig(dc *Config)
// ShouldCount returns true if request for the host should be counted.
ShouldCount(host string, qType, qClass uint16) bool
}
// StatsCtx collects the statistics and flushes it to the database. Its default
// flushing interval is one hour.
//
// TODO(e.burkov): Use atomic.Pointer for accessing db in go1.19.
type StatsCtx struct {
// limitHours is the maximum number of hours to collect statistics into the
// current unit.
//
// It is of type uint32 to be accessed by atomic. It's arranged at the
// beginning of the structure to keep 64-bit alignment.
limitHours uint32
// currMu protects curr.
currMu *sync.RWMutex
// curr is the actual statistics collection result.
curr *unit
// dbMu protects db.
dbMu *sync.Mutex
// db is the opened statistics database, if any.
db *bbolt.DB
db atomic.Pointer[bbolt.DB]
// unitIDGen is the function that generates an identifier for the current
// unit. It's here for only testing purposes.
@@ -106,6 +98,21 @@ type StatsCtx struct {
// filename is the name of database file.
filename string
// lock protects all the fields below.
lock sync.Mutex
// enabled tells if the statistics are enabled.
enabled bool
// limitHours is the maximum number of hours to collect statistics into the
// current unit.
//
// TODO(s.chzhen): Rewrite to use time.Duration.
limitHours uint32
// ignored is the list of host names, which should not be counted.
ignored *stringutil.Set
}
// New creates s from conf and properly initializes it. Don't use s before
@@ -114,11 +121,12 @@ func New(conf Config) (s *StatsCtx, err error) {
defer withRecovered(&err)
s = &StatsCtx{
enabled: conf.Enabled,
currMu: &sync.RWMutex{},
dbMu: &sync.Mutex{},
filename: conf.Filename,
configModified: conf.ConfigModified,
httpRegister: conf.HTTPRegister,
ignored: conf.Ignored,
}
if s.limitHours = conf.LimitDays * 24; !checkInterval(conf.LimitDays) {
s.limitHours = 24
@@ -137,7 +145,7 @@ func New(conf Config) (s *StatsCtx, err error) {
var udb *unitDB
id := s.unitIDGen()
tx, err := s.db.Begin(true)
tx, err := s.db.Load().Begin(true)
if err != nil {
return nil, fmt.Errorf("stats: opening a transaction: %w", err)
}
@@ -191,7 +199,7 @@ func (s *StatsCtx) Start() {
func (s *StatsCtx) Close() (err error) {
defer func() { err = errors.Annotate(err, "stats: closing: %w") }()
db := s.swapDatabase(nil)
db := s.db.Swap(nil)
if db == nil {
return nil
}
@@ -220,7 +228,10 @@ func (s *StatsCtx) Close() (err error) {
// Update implements the Interface interface for *StatsCtx.
func (s *StatsCtx) Update(e Entry) {
if atomic.LoadUint32(&s.limitHours) == 0 {
s.lock.Lock()
defer s.lock.Unlock()
if !s.enabled || s.limitHours == 0 {
return
}
@@ -248,14 +259,22 @@ func (s *StatsCtx) Update(e Entry) {
}
// WriteDiskConfig implements the Interface interface for *StatsCtx.
func (s *StatsCtx) WriteDiskConfig(dc *DiskConfig) {
dc.Interval = atomic.LoadUint32(&s.limitHours) / 24
func (s *StatsCtx) WriteDiskConfig(dc *Config) {
s.lock.Lock()
defer s.lock.Unlock()
dc.LimitDays = s.limitHours / 24
dc.Enabled = s.enabled
dc.Ignored = s.ignored
}
// TopClientsIP implements the [Interface] interface for *StatsCtx.
func (s *StatsCtx) TopClientsIP(maxCount uint) (ips []netip.Addr) {
limit := atomic.LoadUint32(&s.limitHours)
if limit == 0 {
s.lock.Lock()
defer s.lock.Unlock()
limit := s.limitHours
if !s.enabled || limit == 0 {
return nil
}
@@ -284,25 +303,6 @@ func (s *StatsCtx) TopClientsIP(maxCount uint) (ips []netip.Addr) {
return ips
}
// database returns the database if it's opened. It's safe for concurrent use.
func (s *StatsCtx) database() (db *bbolt.DB) {
s.dbMu.Lock()
defer s.dbMu.Unlock()
return s.db
}
// swapDatabase swaps the database with another one and returns it. It's safe
// for concurrent use.
func (s *StatsCtx) swapDatabase(with *bbolt.DB) (old *bbolt.DB) {
s.dbMu.Lock()
defer s.dbMu.Unlock()
old, s.db = s.db, with
return old
}
// deleteOldUnits walks the buckets available to tx and deletes old units. It
// returns the number of deletions performed.
func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
@@ -358,10 +358,7 @@ func (s *StatsCtx) openDB() (err error) {
// Use defer to unlock the mutex as soon as possible.
defer log.Debug("stats: database opened")
s.dbMu.Lock()
defer s.dbMu.Unlock()
s.db = db
s.db.Store(db)
return nil
}
@@ -369,6 +366,9 @@ func (s *StatsCtx) openDB() (err error) {
func (s *StatsCtx) flush() (cont bool, sleepFor time.Duration) {
id := s.unitIDGen()
s.lock.Lock()
defer s.lock.Unlock()
s.currMu.Lock()
defer s.currMu.Unlock()
@@ -377,12 +377,12 @@ func (s *StatsCtx) flush() (cont bool, sleepFor time.Duration) {
return false, 0
}
limit := atomic.LoadUint32(&s.limitHours)
limit := s.limitHours
if limit == 0 || ptr.id == id {
return true, time.Second
}
db := s.database()
db := s.db.Load()
if db == nil {
return true, 0
}
@@ -437,21 +437,30 @@ func (s *StatsCtx) periodicFlush() {
}
func (s *StatsCtx) setLimit(limitDays int) {
atomic.StoreUint32(&s.limitHours, uint32(24*limitDays))
if limitDays == 0 {
if err := s.clear(); err != nil {
log.Error("stats: %s", err)
}
s.lock.Lock()
defer s.lock.Unlock()
if limitDays != 0 {
s.enabled = true
s.limitHours = uint32(24 * limitDays)
log.Debug("stats: set limit: %d days", limitDays)
return
}
log.Debug("stats: set limit: %d days", limitDays)
s.enabled = false
log.Debug("stats: disabled")
if err := s.clear(); err != nil {
log.Error("stats: %s", err)
}
}
// Reset counters and clear database
func (s *StatsCtx) clear() (err error) {
defer func() { err = errors.Annotate(err, "clearing: %w") }()
db := s.swapDatabase(nil)
db := s.db.Swap(nil)
if db != nil {
var tx *bbolt.Tx
tx, err = db.Begin(true)
@@ -495,7 +504,7 @@ func (s *StatsCtx) clear() (err error) {
}
func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, firstID uint32) {
db := s.database()
db := s.db.Load()
if db == nil {
return nil, 0
}
@@ -547,3 +556,13 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, firstID uint32) {
return units, firstID
}
// ShouldCount returns true if request for the host should be counted.
func (s *StatsCtx) ShouldCount(host string, _, _ uint16) bool {
return !s.isIgnored(host)
}
// isIgnored returns true if the host is in the Ignored list.
func (s *StatsCtx) isIgnored(host string) bool {
return s.ignored.Has(host)
}

View File

@@ -47,8 +47,6 @@ func TestStats_races(t *testing.T) {
startTime := time.Now()
testutil.CleanupAndRequireSuccess(t, s.Close)
type signal = struct{}
writeFunc := func(start, fin *sync.WaitGroup, waitCh <-chan unit, i int) {
e := Entry{
Domain: fmt.Sprintf("example-%d.org", i),

View File

@@ -53,6 +53,7 @@ func TestStats(t *testing.T) {
conf := stats.Config{
Filename: filepath.Join(t.TempDir(), "stats.db"),
LimitDays: 1,
Enabled: true,
UnitID: constUnitID,
HTTPRegister: func(_, url string, handler http.HandlerFunc) {
handlers[url] = handler
@@ -158,6 +159,7 @@ func TestLargeNumbers(t *testing.T) {
conf := stats.Config{
Filename: filepath.Join(t.TempDir(), "stats.db"),
LimitDays: 1,
Enabled: true,
UnitID: func() (id uint32) { return atomic.LoadUint32(&curHour) },
HTTPRegister: func(_, url string, handler http.HandlerFunc) { handlers[url] = handler },
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/stringutil"
"go.etcd.io/bbolt"
)
@@ -341,11 +342,13 @@ type pairsGetter func(u *unitDB) (pairs []countPair)
// topsCollector collects statistics about highest values from the given *unitDB
// slice using pg to retrieve data.
func topsCollector(units []*unitDB, max int, pg pairsGetter) []map[string]uint64 {
func topsCollector(units []*unitDB, max int, ignored *stringutil.Set, pg pairsGetter) []map[string]uint64 {
m := map[string]uint64{}
for _, u := range units {
for _, cp := range pg(u) {
m[cp.Name] += cp.Count
if !ignored.Has(cp.Name) {
m[cp.Name] += cp.Count
}
}
}
a2 := convertMapToSlice(m, max)
@@ -408,9 +411,9 @@ func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
BlockedFiltering: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RFiltered] }),
ReplacedSafebrowsing: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RSafeBrowsing] }),
ReplacedParental: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RParental] }),
TopQueried: topsCollector(units, maxDomains, func(u *unitDB) (pairs []countPair) { return u.Domains }),
TopBlocked: topsCollector(units, maxDomains, func(u *unitDB) (pairs []countPair) { return u.BlockedDomains }),
TopClients: topsCollector(units, maxClients, func(u *unitDB) (pairs []countPair) { return u.Clients }),
TopQueried: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.Domains }),
TopBlocked: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.BlockedDomains }),
TopClients: topsCollector(units, maxClients, nil, func(u *unitDB) (pairs []countPair) { return u.Clients }),
}
// Total counters: