all: sync with master
This commit is contained in:
@@ -385,43 +385,47 @@ func (s *StatsCtx) flush() (cont bool, sleepFor time.Duration) {
|
||||
return true, 0
|
||||
}
|
||||
|
||||
isCommitable := true
|
||||
tx, err := db.Begin(true)
|
||||
if err != nil {
|
||||
log.Error("stats: opening transaction: %s", err)
|
||||
|
||||
return true, 0
|
||||
}
|
||||
defer func() {
|
||||
if err = finishTxn(tx, isCommitable); err != nil {
|
||||
log.Error("stats: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
s.curr = newUnit(id)
|
||||
isCommitable := true
|
||||
|
||||
ferr := ptr.serialize().flushUnitToDB(tx, ptr.id)
|
||||
if ferr != nil {
|
||||
log.Error("stats: flushing unit: %s", ferr)
|
||||
flushErr := ptr.serialize().flushUnitToDB(tx, ptr.id)
|
||||
if flushErr != nil {
|
||||
log.Error("stats: flushing unit: %s", flushErr)
|
||||
isCommitable = false
|
||||
}
|
||||
|
||||
derr := tx.DeleteBucket(idToUnitName(id - limit))
|
||||
if derr != nil {
|
||||
log.Error("stats: deleting unit: %s", derr)
|
||||
if !errors.Is(derr, bbolt.ErrBucketNotFound) {
|
||||
delErr := tx.DeleteBucket(idToUnitName(id - limit))
|
||||
if delErr != nil {
|
||||
// TODO(e.burkov): Improve the algorithm of deleting the oldest bucket
|
||||
// to avoid the error.
|
||||
if errors.Is(delErr, bbolt.ErrBucketNotFound) {
|
||||
log.Debug("stats: warning: deleting unit: %s", delErr)
|
||||
} else {
|
||||
isCommitable = false
|
||||
log.Error("stats: deleting unit: %s", delErr)
|
||||
}
|
||||
}
|
||||
|
||||
err = finishTxn(tx, isCommitable)
|
||||
if err != nil {
|
||||
log.Error("stats: %s", err)
|
||||
}
|
||||
|
||||
return true, 0
|
||||
}
|
||||
|
||||
// periodicFlush checks and flushes the unit to the database if the freshly
|
||||
// generated unit ID differs from the current's ID. Flushing process includes:
|
||||
// - swapping the current unit with the new empty one;
|
||||
// - writing the current unit to the database;
|
||||
// - removing the stale unit from the database.
|
||||
// - swapping the current unit with the new empty one;
|
||||
// - writing the current unit to the database;
|
||||
// - removing the stale unit from the database.
|
||||
func (s *StatsCtx) periodicFlush() {
|
||||
for cont, sleepFor := true, time.Duration(0); cont; time.Sleep(sleepFor) {
|
||||
cont, sleepFor = s.flush()
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/AdguardTeam/golibs/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -24,3 +30,76 @@ func TestStatsCollector(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStats_races(t *testing.T) {
|
||||
var r uint32
|
||||
idGen := func() (id uint32) { return atomic.LoadUint32(&r) }
|
||||
conf := Config{
|
||||
UnitID: idGen,
|
||||
Filename: filepath.Join(t.TempDir(), "./stats.db"),
|
||||
LimitDays: 1,
|
||||
}
|
||||
|
||||
s, err := New(conf)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.Start()
|
||||
startTime := time.Now()
|
||||
testutil.CleanupAndRequireSuccess(t, s.Close)
|
||||
|
||||
type signal = struct{}
|
||||
|
||||
writeFunc := func(start, fin *sync.WaitGroup, waitCh <-chan unit, i int) {
|
||||
e := Entry{
|
||||
Domain: fmt.Sprintf("example-%d.org", i),
|
||||
Client: fmt.Sprintf("client_%d", i),
|
||||
Result: Result(i)%(resultLast-1) + 1,
|
||||
Time: uint32(time.Since(startTime).Milliseconds()),
|
||||
}
|
||||
|
||||
start.Done()
|
||||
defer fin.Done()
|
||||
|
||||
<-waitCh
|
||||
|
||||
s.Update(e)
|
||||
}
|
||||
readFunc := func(start, fin *sync.WaitGroup, waitCh <-chan unit) {
|
||||
start.Done()
|
||||
defer fin.Done()
|
||||
|
||||
<-waitCh
|
||||
|
||||
_, _ = s.getData(24)
|
||||
}
|
||||
|
||||
const (
|
||||
roundsNum = 3
|
||||
|
||||
writersNum = 10
|
||||
readersNum = 5
|
||||
)
|
||||
|
||||
for round := 0; round < roundsNum; round++ {
|
||||
atomic.StoreUint32(&r, uint32(round))
|
||||
|
||||
startWG, finWG := &sync.WaitGroup{}, &sync.WaitGroup{}
|
||||
waitCh := make(chan unit)
|
||||
|
||||
for i := 0; i < writersNum; i++ {
|
||||
startWG.Add(1)
|
||||
finWG.Add(1)
|
||||
go writeFunc(startWG, finWG, waitCh, i)
|
||||
}
|
||||
|
||||
for i := 0; i < readersNum; i++ {
|
||||
startWG.Add(1)
|
||||
finWG.Add(1)
|
||||
go readFunc(startWG, finWG, waitCh)
|
||||
}
|
||||
|
||||
startWG.Wait()
|
||||
close(waitCh)
|
||||
finWG.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,33 +353,25 @@ func topsCollector(units []*unitDB, max int, pg pairsGetter) []map[string]uint64
|
||||
return convertTopSlice(a2)
|
||||
}
|
||||
|
||||
/* Algorithm:
|
||||
. Prepare array of N units, where N is the value of "limit" configuration setting
|
||||
. Load data for the most recent units from file
|
||||
If a unit with required ID doesn't exist, just add an empty unit
|
||||
. Get data for the current unit
|
||||
. Process data from the units and prepare an output map object:
|
||||
* per time unit counters:
|
||||
* DNS-queries/time-unit
|
||||
* blocked/time-unit
|
||||
* safebrowsing-blocked/time-unit
|
||||
* parental-blocked/time-unit
|
||||
If time-unit is an hour, just add values from each unit to an array.
|
||||
If time-unit is a day, aggregate per-hour data into days.
|
||||
* top counters:
|
||||
* queries/domain
|
||||
* queries/blocked-domain
|
||||
* queries/client
|
||||
To get these values we first sum up data for all units into a single map.
|
||||
Then we get the pairs with the highest numbers (the values are sorted in descending order)
|
||||
* total counters:
|
||||
* DNS-queries
|
||||
* blocked
|
||||
* safebrowsing-blocked
|
||||
* safesearch-blocked
|
||||
* parental-blocked
|
||||
These values are just the sum of data for all units.
|
||||
*/
|
||||
// getData returns the statistics data using the following algorithm:
|
||||
//
|
||||
// 1. Prepare a slice of N units, where N is the value of "limit" configuration
|
||||
// setting. Load data for the most recent units from the file. If a unit
|
||||
// with required ID doesn't exist, just add an empty unit. Get data for the
|
||||
// current unit.
|
||||
//
|
||||
// 2. Process data from the units and prepare an output map object, including
|
||||
// per time unit counters (DNS queries per time-unit, blocked queries per
|
||||
// time unit, etc.). If the time unit is hour, just add values from each
|
||||
// unit to the slice; otherwise, the time unit is day, so aggregate per-hour
|
||||
// data into days.
|
||||
//
|
||||
// To get the top counters (queries per domain, queries per blocked domain,
|
||||
// etc.), first sum up data for all units into a single map. Then, get the
|
||||
// pairs with the highest numbers.
|
||||
//
|
||||
// The total counters (DNS queries, blocked, etc.) are just the sum of data
|
||||
// for all units.
|
||||
func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
|
||||
if limit == 0 {
|
||||
return StatsResp{
|
||||
|
||||
Reference in New Issue
Block a user