Pull request 2107: AG-28327-upstream-config-parser

Squashed commit of the following:

commit e496653b10de52676826ed8e0c461e91405603a8
Merge: db2cd04e9 60a978c9a
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Feb 1 18:23:50 2024 +0300

    Merge branch 'master' into AG-28327-upstream-config-parser

commit db2cd04e981dd24998d87f4935ff6590ea7854cd
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Wed Jan 31 16:21:53 2024 +0300

    all: upd proxy

commit e8878179b6d094321d56fb2b75c16c1ba8cf637d
Merge: ccbbae6d6 aa872dfe9
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Wed Jan 31 16:17:34 2024 +0300

    Merge branch 'master' into AG-28327-upstream-config-parser

commit ccbbae6d615e110d7d2d4c2a6b35954311153bcf
Merge: d947d900e 8936c95ec
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Jan 29 18:31:17 2024 +0300

    Merge branch 'master' into AG-28327-upstream-config-parser

commit d947d900e1f759159bc9068589ffe852483cfdd0
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Jan 29 18:26:01 2024 +0300

    dnsforward: imp docs

commit cf9678c098951e2a4bebae7a3a5808d7de4c14c6
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Jan 25 14:18:04 2024 +0300

    dnsforward: imp code

commit 22792a9311cb93b2bb3b804293f87f091b9b81e2
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Wed Jan 24 13:59:28 2024 +0300

    dnsforward: imp code

commit 57ddaaaaaf1009c65f0d9d6b2b1671211f194c85
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Jan 22 20:19:44 2024 +0300

    all: add tests

commit d6732d13adae4ee46410252a33d092e67da3c34a
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Jan 22 18:44:57 2024 +0300

    all: imp errors

commit e14456571ce2ef43fb217f45445729ce6299daf6
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Jan 18 19:05:31 2024 +0300

    dnsforward: imp code

commit a5c106eae902fbc0a169ef9e4d7bf968f1e40bec
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Mon Jan 15 18:36:30 2024 +0300

    dnsforward: imp logs

commit 333b8561aa21d778007f808fb8e931ef3e95d721
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Dec 21 15:06:42 2023 +0300

    all: imp tests

commit 5b19d6b039755577e03ffcc03952724a36f21aa4
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Fri Dec 15 14:21:58 2023 +0300

    all: imp code

commit 15fbd229de336425bde107a4f32175b8af41d876
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Wed Dec 13 14:49:40 2023 +0300

    all: upstream config parser
This commit is contained in:
Stanislav Chzhen
2024-02-05 16:12:27 +03:00
parent 60a978c9a6
commit 34aa81ca99
10 changed files with 231 additions and 424 deletions

View File

@@ -2,54 +2,56 @@ package dnsforward
import (
"fmt"
"strings"
"sync"
"github.com/AdguardTeam/dnsproxy/proxy"
"github.com/AdguardTeam/dnsproxy/upstream"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/miekg/dns"
"golang.org/x/exp/slices"
)
// upstreamConfigValidator parses the [*proxy.UpstreamConfig] and checks the
// actual DNS availability of each upstream.
// upstreamConfigValidator parses each section of an upstream configuration into
// a corresponding [*proxy.UpstreamConfig] and checks the actual DNS
// availability of each upstream.
type upstreamConfigValidator struct {
// general is the general upstream configuration.
general []*upstreamResult
// generalUpstreamResults contains upstream results of a general section.
generalUpstreamResults map[string]*upstreamResult
// fallback is the fallback upstream configuration.
fallback []*upstreamResult
// fallbackUpstreamResults contains upstream results of a fallback section.
fallbackUpstreamResults map[string]*upstreamResult
// private is the private upstream configuration.
private []*upstreamResult
// privateUpstreamResults contains upstream results of a private section.
privateUpstreamResults map[string]*upstreamResult
// generalParseResults contains parsing results of a general section.
generalParseResults []*parseResult
// fallbackParseResults contains parsing results of a fallback section.
fallbackParseResults []*parseResult
// privateParseResults contains parsing results of a private section.
privateParseResults []*parseResult
}
// upstreamResult is a result of validation of an [upstream.Upstream] within an
// upstreamResult is a result of parsing of an [upstream.Upstream] within an
// [proxy.UpstreamConfig].
type upstreamResult struct {
// server is the parsed upstream. It is nil when there was an error during
// parsing.
// server is the parsed upstream.
server upstream.Upstream
// err is the error either from parsing or from checking the upstream.
// err is the upstream check error.
err error
// original is the piece of configuration that have either been turned to an
// upstream or caused an error.
original string
// isSpecific is true if the upstream is domain-specific.
isSpecific bool
}
// compare compares two [upstreamResult]s. It returns 0 if they are equal, -1
// if ur should be sorted before other, and 1 otherwise.
//
// TODO(e.burkov): Perhaps it makes sense to sort the results with errors near
// the end.
func (ur *upstreamResult) compare(other *upstreamResult) (res int) {
return strings.Compare(ur.original, other.original)
// parseResult contains a original piece of upstream configuration and a
// corresponding error.
type parseResult struct {
err *proxy.ParseError
original string
}
// newUpstreamConfigValidator parses the upstream configuration and returns a
@@ -61,97 +63,99 @@ func newUpstreamConfigValidator(
private []string,
opts *upstream.Options,
) (cv *upstreamConfigValidator) {
cv = &upstreamConfigValidator{}
cv = &upstreamConfigValidator{
generalUpstreamResults: map[string]*upstreamResult{},
fallbackUpstreamResults: map[string]*upstreamResult{},
privateUpstreamResults: map[string]*upstreamResult{},
}
for _, line := range general {
cv.general = cv.insertLineResults(cv.general, line, opts)
}
for _, line := range fallback {
cv.fallback = cv.insertLineResults(cv.fallback, line, opts)
}
for _, line := range private {
cv.private = cv.insertLineResults(cv.private, line, opts)
}
conf, err := proxy.ParseUpstreamsConfig(general, opts)
cv.generalParseResults = collectErrResults(general, err)
insertConfResults(conf, cv.generalUpstreamResults)
conf, err = proxy.ParseUpstreamsConfig(fallback, opts)
cv.fallbackParseResults = collectErrResults(fallback, err)
insertConfResults(conf, cv.fallbackUpstreamResults)
conf, err = proxy.ParseUpstreamsConfig(private, opts)
cv.privateParseResults = collectErrResults(private, err)
insertConfResults(conf, cv.privateUpstreamResults)
return cv
}
// insertLineResults parses line and inserts the result into s. It can insert
// multiple results as well as none.
func (cv *upstreamConfigValidator) insertLineResults(
s []*upstreamResult,
line string,
opts *upstream.Options,
) (result []*upstreamResult) {
upstreams, isSpecific, err := splitUpstreamLine(line)
if err != nil {
return cv.insert(s, &upstreamResult{
err: err,
original: line,
})
// collectErrResults parses err and returns parsing results containing the
// original upstream configuration line and the corresponding error. err can be
// nil.
func collectErrResults(lines []string, err error) (results []*parseResult) {
if err == nil {
return nil
}
for _, upstreamAddr := range upstreams {
var res *upstreamResult
if upstreamAddr != "#" {
res = cv.parseUpstream(upstreamAddr, opts)
} else if !isSpecific {
res = &upstreamResult{
err: errNotDomainSpecific,
original: upstreamAddr,
}
} else {
// limit is a maximum length for upstream configuration lines.
const limit = 80
wrapper, ok := err.(errors.WrapperSlice)
if !ok {
log.Debug("dnsforward: configvalidator: unwrapping: %s", err)
return nil
}
errs := wrapper.Unwrap()
results = make([]*parseResult, 0, len(errs))
for i, e := range errs {
var parseErr *proxy.ParseError
if !errors.As(e, &parseErr) {
log.Debug("dnsforward: configvalidator: inserting unexpected error %d: %s", i, err)
continue
}
res.isSpecific = isSpecific
s = cv.insert(s, res)
}
return s
}
// insert inserts r into slice in a sorted order, except duplicates. slice must
// not be nil.
func (cv *upstreamConfigValidator) insert(
s []*upstreamResult,
r *upstreamResult,
) (result []*upstreamResult) {
i, has := slices.BinarySearchFunc(s, r, (*upstreamResult).compare)
if has {
log.Debug("dnsforward: duplicate configuration %q", r.original)
return s
}
return slices.Insert(s, i, r)
}
// parseUpstream parses addr and returns the result of parsing. It returns nil
// if the specified server points at the default upstream server which is
// validated separately.
func (cv *upstreamConfigValidator) parseUpstream(
addr string,
opts *upstream.Options,
) (r *upstreamResult) {
// Check if the upstream has a valid protocol prefix.
//
// TODO(e.burkov): Validate the domain name.
if proto, _, ok := strings.Cut(addr, "://"); ok {
if !slices.Contains(protocols, proto) {
return &upstreamResult{
err: fmt.Errorf("bad protocol %q", proto),
original: addr,
}
idx := parseErr.Idx
line := []rune(lines[idx])
if len(line) > limit {
line = line[:limit]
line[limit-1] = '…'
}
results = append(results, &parseResult{
original: string(line),
err: parseErr,
})
}
ups, err := upstream.AddressToUpstream(addr, opts)
return results
}
return &upstreamResult{
server: ups,
err: err,
original: addr,
// insertConfResults parses conf and inserts the upstream result into results.
// It can insert multiple results as well as none.
func insertConfResults(conf *proxy.UpstreamConfig, results map[string]*upstreamResult) {
insertListResults(conf.Upstreams, results, false)
for _, ups := range conf.DomainReservedUpstreams {
insertListResults(ups, results, true)
}
for _, ups := range conf.SpecifiedDomainUpstreams {
insertListResults(ups, results, true)
}
}
// insertListResults constructs upstream results from the upstream list and
// inserts them into results. It can insert multiple results as well as none.
func insertListResults(ups []upstream.Upstream, results map[string]*upstreamResult, specific bool) {
for _, u := range ups {
addr := u.Address()
_, ok := results[addr]
if ok {
continue
}
results[addr] = &upstreamResult{
server: u,
isSpecific: specific,
}
}
}
@@ -187,35 +191,30 @@ func (cv *upstreamConfigValidator) check() {
}
wg := &sync.WaitGroup{}
wg.Add(len(cv.general) + len(cv.fallback) + len(cv.private))
wg.Add(len(cv.generalUpstreamResults) +
len(cv.fallbackUpstreamResults) +
len(cv.privateUpstreamResults))
for _, res := range cv.general {
go cv.checkSrv(res, wg, commonChecker)
for _, res := range cv.generalUpstreamResults {
go checkSrv(res, wg, commonChecker)
}
for _, res := range cv.fallback {
go cv.checkSrv(res, wg, commonChecker)
for _, res := range cv.fallbackUpstreamResults {
go checkSrv(res, wg, commonChecker)
}
for _, res := range cv.private {
go cv.checkSrv(res, wg, arpaChecker)
for _, res := range cv.privateUpstreamResults {
go checkSrv(res, wg, arpaChecker)
}
wg.Wait()
}
// checkSrv runs hc on the server from res, if any, and stores any occurred
// error in res. wg is always marked done in the end. It used to be called in
// a separate goroutine.
func (cv *upstreamConfigValidator) checkSrv(
res *upstreamResult,
wg *sync.WaitGroup,
hc *healthchecker,
) {
// error in res. wg is always marked done in the end. It is intended to be
// used as a goroutine.
func checkSrv(res *upstreamResult, wg *sync.WaitGroup, hc *healthchecker) {
defer log.OnPanic(fmt.Sprintf("dnsforward: checking upstream %s", res.server.Address()))
defer wg.Done()
if res.server == nil {
return
}
res.err = hc.check(res.server)
if res.err != nil && res.isSpecific {
res.err = domainSpecificTestError{Err: res.err}
@@ -225,65 +224,126 @@ func (cv *upstreamConfigValidator) checkSrv(
// close closes all the upstreams that were successfully parsed. It enriches
// the results with deferred closing errors.
func (cv *upstreamConfigValidator) close() {
for _, slice := range [][]*upstreamResult{cv.general, cv.fallback, cv.private} {
for _, r := range slice {
if r.server != nil {
r.err = errors.WithDeferred(r.err, r.server.Close())
}
all := []map[string]*upstreamResult{
cv.generalUpstreamResults,
cv.fallbackUpstreamResults,
cv.privateUpstreamResults,
}
for _, m := range all {
for _, r := range m {
r.err = errors.WithDeferred(r.err, r.server.Close())
}
}
}
// sections of the upstream configuration according to the text label of the
// localization.
//
// Keep in sync with client/src/__locales/en.json.
//
// TODO(s.chzhen): Refactor.
const (
generalTextLabel = "upstream_dns"
fallbackTextLabel = "fallback_dns_title"
privateTextLabel = "local_ptr_title"
)
// status returns all the data collected during parsing, healthcheck, and
// closing of the upstreams. The returned map is keyed by the original upstream
// configuration piece and contains the corresponding error or "OK" if there was
// no error.
func (cv *upstreamConfigValidator) status() (results map[string]string) {
result := map[string]string{}
// Names of the upstream configuration sections for logging.
const (
generalSection = "general"
fallbackSection = "fallback"
privateSection = "private"
)
for _, res := range cv.general {
resultToStatus("general", res, result)
results = map[string]string{}
for original, res := range cv.generalUpstreamResults {
upstreamResultToStatus(generalSection, string(original), res, results)
}
for _, res := range cv.fallback {
resultToStatus("fallback", res, result)
for original, res := range cv.fallbackUpstreamResults {
upstreamResultToStatus(fallbackSection, string(original), res, results)
}
for _, res := range cv.private {
resultToStatus("private", res, result)
for original, res := range cv.privateUpstreamResults {
upstreamResultToStatus(privateSection, string(original), res, results)
}
return result
parseResultToStatus(generalTextLabel, generalSection, cv.generalParseResults, results)
parseResultToStatus(fallbackTextLabel, fallbackSection, cv.fallbackParseResults, results)
parseResultToStatus(privateTextLabel, privateSection, cv.privateParseResults, results)
return results
}
// resultToStatus puts "OK" or an error message from res into resMap. section
// is the name of the upstream configuration section, i.e. "general",
// upstreamResultToStatus puts "OK" or an error message from res into resMap.
// section is the name of the upstream configuration section, i.e. "general",
// "fallback", or "private", and only used for logging.
//
// TODO(e.burkov): Currently, the HTTP handler expects that all the results are
// put together in a single map, which may lead to collisions, see AG-27539.
// Improve the results compilation.
func resultToStatus(section string, res *upstreamResult, resMap map[string]string) {
func upstreamResultToStatus(
section string,
original string,
res *upstreamResult,
resMap map[string]string,
) {
val := "OK"
if res.err != nil {
val = res.err.Error()
}
prevVal := resMap[res.original]
prevVal := resMap[original]
switch prevVal {
case "":
resMap[res.original] = val
resMap[original] = val
case val:
log.Debug("dnsforward: duplicating %s config line %q", section, res.original)
log.Debug("dnsforward: duplicating %s config line %q", section, original)
default:
log.Debug(
"dnsforward: warning: %s config line %q (%v) had different result %v",
section,
val,
res.original,
original,
prevVal,
)
}
}
// parseResultToStatus puts parsing error messages from results into resMap.
// section is the name of the upstream configuration section, i.e. "general",
// "fallback", or "private", and only used for logging.
//
// Parsing error message has the following format:
//
// sectionTextLabel line: parsing error
//
// Where sectionTextLabel is a section text label of a localization and line is
// a line number.
func parseResultToStatus(
textLabel string,
section string,
results []*parseResult,
resMap map[string]string,
) {
for _, res := range results {
original := res.original
_, ok := resMap[original]
if ok {
log.Debug("dnsforward: duplicating %s parsing error %q", section, original)
continue
}
resMap[original] = fmt.Sprintf("%s %d: parsing error", textLabel, res.err.Idx+1)
}
}
// domainSpecificTestError is a wrapper for errors returned by checkDNS to mark
// the tested upstream domain-specific and therefore consider its errors
// non-critical.
@@ -342,7 +402,7 @@ func (h *healthchecker) check(u upstream.Upstream) (err error) {
if err != nil {
return fmt.Errorf("couldn't communicate with upstream: %w", err)
} else if h.ansEmpty && len(reply.Answer) > 0 {
return errWrongResponse
return errors.Error("wrong response")
}
return nil