Pull request 2346: AGDNS-2686-client-upstream-manager
Merge in DNS/adguard-home from AGDNS-2686-client-upstream-manager to master Squashed commit of the following: commit 563cb583f01c26434fa04d0e37dcbe2ba15c0912 Merge: f4b0caf5c61fe269cbAuthor: Stanislav Chzhen <s.chzhen@adguard.com> Date: Mon Mar 3 19:07:35 2025 +0300 Merge branch 'master' into AGDNS-2686-client-upstream-manager commit f4b0caf5c8bc48ee8be97f031cd1aa1399eb461c Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Thu Feb 27 21:52:51 2025 +0300 client: imp docs commit e7d74931b1cc9b62eeadbe1168ae5781d57d6c73 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Feb 26 21:44:04 2025 +0300 client: imp code commit 1cba38c1bc3b6b5afb7829c230c4e831f789647e Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Feb 26 18:06:17 2025 +0300 client: fix typo commit 65b6b1e8c0fde47f367c428a78fefc4c63bc45f9 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Feb 26 17:52:02 2025 +0300 all: imp code, docs commit ed158ef09fc26bc9c57c91dbfa04d89fede583d0 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Feb 26 14:34:50 2025 +0300 client: imp code commit ab897f64c8751ea158408521116d5b689e6d39a9 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Feb 25 18:26:16 2025 +0300 all: upd chlog commit a2c30e3ede6fb61f6d23fd392cc3035dc96f77af Merge: bdb08ee0ed8ce5b453Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Feb 25 17:40:32 2025 +0300 Merge branch 'master' into AGDNS-2686-client-upstream-manager commit bdb08ee0e6122de727f2749a44f5df7e29d0eee2 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Feb 25 17:16:31 2025 +0300 all: imp tests commit 00f0eb60474a2297567acf5a3a27e8b5c2d99229 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Thu Feb 20 21:37:58 2025 +0300 all: imp code, docs commit 13934176636dd70a17e53bc1956d6cf51602760a Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Feb 19 15:58:11 2025 +0300 all: client upstream manager
This commit is contained in:
@@ -9,7 +9,6 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghalg"
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
)
|
||||
|
||||
// macKey contains MAC as byte array of 6, 8, or 20 bytes.
|
||||
@@ -35,7 +34,7 @@ type index struct {
|
||||
// nameToUID maps client name to UID.
|
||||
nameToUID map[string]UID
|
||||
|
||||
// clientIDToUID maps client ID to UID.
|
||||
// clientIDToUID maps ClientID to UID.
|
||||
clientIDToUID map[string]UID
|
||||
|
||||
// ipToUID maps IP address to UID.
|
||||
@@ -205,19 +204,19 @@ func (ci *index) clashesMAC(c *Persistent) (p *Persistent, mac net.HardwareAddr)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// find finds persistent client by string representation of the client ID, IP
|
||||
// find finds persistent client by string representation of the ClientID, IP
|
||||
// address, or MAC.
|
||||
func (ci *index) find(id string) (c *Persistent, ok bool) {
|
||||
uid, found := ci.clientIDToUID[id]
|
||||
if found {
|
||||
return ci.uidToClient[uid], true
|
||||
c, ok = ci.findByClientID(id)
|
||||
if ok {
|
||||
return c, true
|
||||
}
|
||||
|
||||
ip, err := netip.ParseAddr(id)
|
||||
if err == nil {
|
||||
// MAC addresses can be successfully parsed as IP addresses.
|
||||
c, found = ci.findByIP(ip)
|
||||
if found {
|
||||
c, ok = ci.findByIP(ip)
|
||||
if ok {
|
||||
return c, true
|
||||
}
|
||||
}
|
||||
@@ -230,6 +229,16 @@ func (ci *index) find(id string) (c *Persistent, ok bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// findByClientID finds persistent client by ClientID.
|
||||
func (ci *index) findByClientID(clientID string) (c *Persistent, ok bool) {
|
||||
uid, ok := ci.clientIDToUID[clientID]
|
||||
if ok {
|
||||
return ci.uidToClient[uid], true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// findByName finds persistent client by name.
|
||||
func (ci *index) findByName(name string) (c *Persistent, found bool) {
|
||||
uid, found := ci.nameToUID[name]
|
||||
@@ -343,18 +352,3 @@ func (ci *index) rangeByName(f func(c *Persistent) (cont bool)) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// closeUpstreams closes upstream configurations of persistent clients.
|
||||
func (ci *index) closeUpstreams() (err error) {
|
||||
var errs []error
|
||||
ci.rangeByName(func(c *Persistent) (cont bool) {
|
||||
err = c.CloseUpstreams()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
@@ -58,12 +58,6 @@ func (uid *UID) UnmarshalText(data []byte) error {
|
||||
|
||||
// Persistent contains information about persistent clients.
|
||||
type Persistent struct {
|
||||
// UpstreamConfig is the custom upstream configuration for this client. If
|
||||
// it's nil, it has not been initialized yet. If it's non-nil and empty,
|
||||
// there are no valid upstreams. If it's non-nil and non-empty, these
|
||||
// upstream must be used.
|
||||
UpstreamConfig *proxy.CustomUpstreamConfig
|
||||
|
||||
// SafeSearch handles search engine hosts rewrites.
|
||||
SafeSearch filtering.SafeSearch
|
||||
|
||||
@@ -262,7 +256,7 @@ func ValidateClientID(id string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// IDs returns a list of client IDs containing at least one element.
|
||||
// IDs returns a list of ClientIDs containing at least one element.
|
||||
func (c *Persistent) IDs() (ids []string) {
|
||||
ids = make([]string, 0, c.IDsLen())
|
||||
|
||||
@@ -281,7 +275,7 @@ func (c *Persistent) IDs() (ids []string) {
|
||||
return append(ids, c.ClientIDs...)
|
||||
}
|
||||
|
||||
// IDsLen returns a length of client ids.
|
||||
// IDsLen returns a length of ClientIDs.
|
||||
func (c *Persistent) IDsLen() (n int) {
|
||||
return len(c.IPs) + len(c.Subnets) + len(c.MACs) + len(c.ClientIDs)
|
||||
}
|
||||
@@ -312,14 +306,3 @@ func (c *Persistent) ShallowClone() (clone *Persistent) {
|
||||
|
||||
return clone
|
||||
}
|
||||
|
||||
// CloseUpstreams closes the client-specific upstream config of c if any.
|
||||
func (c *Persistent) CloseUpstreams() (err error) {
|
||||
if c.UpstreamConfig != nil {
|
||||
if err = c.UpstreamConfig.Close(); err != nil {
|
||||
return fmt.Errorf("closing upstreams of client %q: %w", c.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/arpdb"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/dhcpsvc"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/whois"
|
||||
"github.com/AdguardTeam/dnsproxy/proxy"
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
"github.com/AdguardTeam/golibs/hostsfile"
|
||||
"github.com/AdguardTeam/golibs/logutil/slogutil"
|
||||
@@ -126,6 +127,9 @@ type Storage struct {
|
||||
// runtimeIndex contains information about runtime clients.
|
||||
runtimeIndex *runtimeIndex
|
||||
|
||||
// upstreamManager stores and updates custom client upstream configurations.
|
||||
upstreamManager *upstreamManager
|
||||
|
||||
// dhcp is used to update [SourceDHCP] runtime client information.
|
||||
dhcp DHCP
|
||||
|
||||
@@ -163,6 +167,7 @@ func NewStorage(ctx context.Context, conf *StorageConfig) (s *Storage, err error
|
||||
mu: &sync.Mutex{},
|
||||
index: newIndex(),
|
||||
runtimeIndex: newRuntimeIndex(),
|
||||
upstreamManager: newUpstreamManager(conf.Logger),
|
||||
dhcp: conf.DHCP,
|
||||
etcHosts: conf.EtcHosts,
|
||||
arpDB: conf.ARPDB,
|
||||
@@ -200,7 +205,7 @@ func (s *Storage) Start(ctx context.Context) (err error) {
|
||||
func (s *Storage) Shutdown(_ context.Context) (err error) {
|
||||
close(s.done)
|
||||
|
||||
return s.closeUpstreams()
|
||||
return s.upstreamManager.close()
|
||||
}
|
||||
|
||||
// periodicARPUpdate periodically reloads runtime clients from ARP. It is
|
||||
@@ -416,6 +421,7 @@ func (s *Storage) Add(ctx context.Context, p *Persistent) (err error) {
|
||||
}
|
||||
|
||||
s.index.add(p)
|
||||
s.upstreamManager.updateCustomUpstreamConfig(p)
|
||||
|
||||
s.logger.DebugContext(
|
||||
ctx,
|
||||
@@ -441,7 +447,7 @@ func (s *Storage) FindByName(name string) (p *Persistent, ok bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Find finds persistent client by string representation of the client ID, IP
|
||||
// Find finds persistent client by string representation of the ClientID, IP
|
||||
// address, or MAC. And returns its shallow copy.
|
||||
//
|
||||
// TODO(s.chzhen): Accept ClientIDData structure instead, which will contain
|
||||
@@ -514,12 +520,13 @@ func (s *Storage) RemoveByName(ctx context.Context, name string) (ok bool) {
|
||||
return false
|
||||
}
|
||||
|
||||
if err := p.CloseUpstreams(); err != nil {
|
||||
s.logger.ErrorContext(ctx, "removing client", "name", p.Name, slogutil.KeyError, err)
|
||||
}
|
||||
|
||||
s.index.remove(p)
|
||||
|
||||
err := s.upstreamManager.remove(p.UID)
|
||||
if err != nil {
|
||||
s.logger.DebugContext(ctx, "closing client upstreams", "name", name, slogutil.KeyError, err)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -556,6 +563,8 @@ func (s *Storage) Update(ctx context.Context, name string, p *Persistent) (err e
|
||||
s.index.remove(stored)
|
||||
s.index.add(p)
|
||||
|
||||
s.upstreamManager.updateCustomUpstreamConfig(p)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -576,14 +585,6 @@ func (s *Storage) Size() (n int) {
|
||||
return s.index.size()
|
||||
}
|
||||
|
||||
// closeUpstreams closes upstream configurations of persistent clients.
|
||||
func (s *Storage) closeUpstreams() (err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.index.closeUpstreams()
|
||||
}
|
||||
|
||||
// ClientRuntime returns a copy of the saved runtime client by ip. If no such
|
||||
// client exists, returns nil.
|
||||
func (s *Storage) ClientRuntime(ip netip.Addr) (rc *Runtime) {
|
||||
@@ -626,3 +627,42 @@ func (s *Storage) RangeRuntime(f func(rc *Runtime) (cont bool)) {
|
||||
func (s *Storage) AllowedTags() (tags []string) {
|
||||
return s.allowedTags
|
||||
}
|
||||
|
||||
// CustomUpstreamConfig implements the [dnsforward.ClientsContainer] interface
|
||||
// for *Storage
|
||||
func (s *Storage) CustomUpstreamConfig(
|
||||
id string,
|
||||
addr netip.Addr,
|
||||
) (prxConf *proxy.CustomUpstreamConfig) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c, ok := s.index.findByClientID(id)
|
||||
if !ok {
|
||||
c, ok = s.index.findByIP(addr)
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.upstreamManager.customUpstreamConfig(c.UID)
|
||||
}
|
||||
|
||||
// UpdateCommonUpstreamConfig implements the [dnsforward.ClientsContainer]
|
||||
// interface for *Storage
|
||||
func (s *Storage) UpdateCommonUpstreamConfig(conf *CommonUpstreamConfig) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.upstreamManager.updateCommonUpstreamConfig(conf)
|
||||
}
|
||||
|
||||
// ClearUpstreamCache implements the [dnsforward.ClientsContainer] interface for
|
||||
// *Storage
|
||||
func (s *Storage) ClearUpstreamCache() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.upstreamManager.clearUpstreamCache()
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/client"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/dhcpd"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/dhcpsvc"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/dnsforward"
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/whois"
|
||||
"github.com/AdguardTeam/golibs/hostsfile"
|
||||
"github.com/AdguardTeam/golibs/logutil/slogutil"
|
||||
@@ -34,6 +35,9 @@ func newTestStorage(tb testing.TB) (s *client.Storage) {
|
||||
return s
|
||||
}
|
||||
|
||||
// type check
|
||||
var _ dnsforward.ClientsContainer = (*client.Storage)(nil)
|
||||
|
||||
// testHostsContainer is a mock implementation of the [client.HostsContainer]
|
||||
// interface.
|
||||
type testHostsContainer struct {
|
||||
@@ -1278,3 +1282,90 @@ func TestStorage_RangeByName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorage_CustomUpstreamConfig(t *testing.T) {
|
||||
const (
|
||||
existingName = "existing_name"
|
||||
existingClientID = "existing_client_id"
|
||||
|
||||
nonExistingClientID = "non_existing_client_id"
|
||||
)
|
||||
|
||||
var (
|
||||
existingClientUID = client.MustNewUID()
|
||||
existingIP = netip.MustParseAddr("192.0.2.1")
|
||||
|
||||
nonExistingIP = netip.MustParseAddr("192.0.2.255")
|
||||
|
||||
testUpstreamTimeout = time.Second
|
||||
)
|
||||
|
||||
existingClient := &client.Persistent{
|
||||
Name: existingName,
|
||||
IPs: []netip.Addr{existingIP},
|
||||
ClientIDs: []string{existingClientID},
|
||||
UID: existingClientUID,
|
||||
Upstreams: []string{"192.0.2.0"},
|
||||
}
|
||||
|
||||
s := newTestStorage(t)
|
||||
s.UpdateCommonUpstreamConfig(&client.CommonUpstreamConfig{
|
||||
UpstreamTimeout: testUpstreamTimeout,
|
||||
})
|
||||
|
||||
testutil.CleanupAndRequireSuccess(t, func() (err error) {
|
||||
return s.Shutdown(testutil.ContextWithTimeout(t, testTimeout))
|
||||
})
|
||||
|
||||
ctx := testutil.ContextWithTimeout(t, testTimeout)
|
||||
err := s.Add(ctx, existingClient)
|
||||
require.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
cliAddr netip.Addr
|
||||
wantNilConf assert.ValueAssertionFunc
|
||||
name string
|
||||
cliID string
|
||||
}{{
|
||||
name: "client_id",
|
||||
cliID: existingClientID,
|
||||
cliAddr: netip.Addr{},
|
||||
wantNilConf: assert.NotNil,
|
||||
}, {
|
||||
name: "client_addr",
|
||||
cliID: "",
|
||||
cliAddr: existingIP,
|
||||
wantNilConf: assert.NotNil,
|
||||
}, {
|
||||
name: "non_existing_client_id",
|
||||
cliID: nonExistingClientID,
|
||||
cliAddr: netip.Addr{},
|
||||
wantNilConf: assert.Nil,
|
||||
}, {
|
||||
name: "non_existing_client_addr",
|
||||
cliID: "",
|
||||
cliAddr: nonExistingIP,
|
||||
wantNilConf: assert.Nil,
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
conf := s.CustomUpstreamConfig(tc.cliID, tc.cliAddr)
|
||||
tc.wantNilConf(t, conf)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("update_common_config", func(t *testing.T) {
|
||||
conf := s.CustomUpstreamConfig(existingClientID, existingIP)
|
||||
require.NotNil(t, conf)
|
||||
|
||||
s.UpdateCommonUpstreamConfig(&client.CommonUpstreamConfig{
|
||||
UpstreamTimeout: testUpstreamTimeout * 2,
|
||||
})
|
||||
|
||||
updConf := s.CustomUpstreamConfig(existingClientID, existingIP)
|
||||
require.NotNil(t, updConf)
|
||||
|
||||
assert.NotEqual(t, conf, updConf)
|
||||
})
|
||||
}
|
||||
|
||||
219
internal/client/upstreammanager.go
Normal file
219
internal/client/upstreammanager.go
Normal file
@@ -0,0 +1,219 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/AdguardTeam/AdGuardHome/internal/aghnet"
|
||||
"github.com/AdguardTeam/dnsproxy/proxy"
|
||||
"github.com/AdguardTeam/dnsproxy/upstream"
|
||||
"github.com/AdguardTeam/golibs/errors"
|
||||
"github.com/AdguardTeam/golibs/logutil/slogutil"
|
||||
"github.com/AdguardTeam/golibs/stringutil"
|
||||
)
|
||||
|
||||
// CommonUpstreamConfig contains common settings for custom client upstream
|
||||
// configurations.
|
||||
type CommonUpstreamConfig struct {
|
||||
Bootstrap upstream.Resolver
|
||||
UpstreamTimeout time.Duration
|
||||
BootstrapPreferIPv6 bool
|
||||
EDNSClientSubnetEnabled bool
|
||||
UseHTTP3Upstreams bool
|
||||
}
|
||||
|
||||
// customUpstreamConfig contains custom client upstream configuration and the
|
||||
// timestamp of the latest configuration update.
|
||||
type customUpstreamConfig struct {
|
||||
// proxyConf is the constructed upstream configuration for the [proxy],
|
||||
// derived from the fields below. It is initialized on demand with
|
||||
// [newCustomUpstreamConfig].
|
||||
proxyConf *proxy.CustomUpstreamConfig
|
||||
|
||||
// commonConfUpdate is the timestamp of the latest configuration update,
|
||||
// used to check against [upstreamManager.confUpdate] to determine if the
|
||||
// configuration is up to date.
|
||||
commonConfUpdate time.Time
|
||||
|
||||
// upstreams is the cached list of custom upstream DNS servers used for the
|
||||
// configuration of proxyConf.
|
||||
upstreams []string
|
||||
|
||||
// upstreamsCacheSize is the cached value of the cache size of the
|
||||
// upstreams, used for the configuration of proxyConf.
|
||||
upstreamsCacheSize uint32
|
||||
|
||||
// upstreamsCacheEnabled is the cached value indicating whether the cache of
|
||||
// the upstreams is enabled for the configuration of proxyConf.
|
||||
upstreamsCacheEnabled bool
|
||||
|
||||
// isChanged indicates whether the proxyConf needs to be updated.
|
||||
isChanged bool
|
||||
}
|
||||
|
||||
// upstreamManager stores and updates custom client upstream configurations.
|
||||
type upstreamManager struct {
|
||||
// logger is used for logging the operation of the upstream manager. It
|
||||
// must not be nil.
|
||||
//
|
||||
// TODO(s.chzhen): Consider using a logger with its own prefix.
|
||||
logger *slog.Logger
|
||||
|
||||
// uidToCustomConf maps persistent client UID to the custom client upstream
|
||||
// configuration. Stored UIDs must be in sync with the [index.uidToClient].
|
||||
uidToCustomConf map[UID]*customUpstreamConfig
|
||||
|
||||
// commonConf is the common upstream configuration.
|
||||
commonConf *CommonUpstreamConfig
|
||||
|
||||
// confUpdate is the timestamp of the latest common upstream configuration
|
||||
// update.
|
||||
confUpdate time.Time
|
||||
}
|
||||
|
||||
// newUpstreamManager returns the new properly initialized upstream manager.
|
||||
func newUpstreamManager(logger *slog.Logger) (m *upstreamManager) {
|
||||
return &upstreamManager{
|
||||
logger: logger,
|
||||
uidToCustomConf: make(map[UID]*customUpstreamConfig),
|
||||
}
|
||||
}
|
||||
|
||||
// updateCommonUpstreamConfig updates the common upstream configuration and the
|
||||
// timestamp of the latest configuration update.
|
||||
func (m *upstreamManager) updateCommonUpstreamConfig(conf *CommonUpstreamConfig) {
|
||||
m.commonConf = conf
|
||||
m.confUpdate = time.Now()
|
||||
}
|
||||
|
||||
// updateCustomUpstreamConfig updates the stored custom client upstream
|
||||
// configuration associated with the persistent client. It also sets
|
||||
// [customUpstreamConfig.isChanged] to true so [customUpstreamConfig.proxyConf]
|
||||
// can be updated later in [upstreamManager.customUpstreamConfig].
|
||||
func (m *upstreamManager) updateCustomUpstreamConfig(c *Persistent) {
|
||||
cliConf, ok := m.uidToCustomConf[c.UID]
|
||||
if !ok {
|
||||
cliConf = &customUpstreamConfig{
|
||||
commonConfUpdate: m.confUpdate,
|
||||
}
|
||||
|
||||
m.uidToCustomConf[c.UID] = cliConf
|
||||
}
|
||||
|
||||
// TODO(s.chzhen): Compare before cloning.
|
||||
cliConf.upstreams = slices.Clone(c.Upstreams)
|
||||
cliConf.upstreamsCacheSize = c.UpstreamsCacheSize
|
||||
cliConf.upstreamsCacheEnabled = c.UpstreamsCacheEnabled
|
||||
cliConf.isChanged = true
|
||||
}
|
||||
|
||||
// customUpstreamConfig returns the custom client upstream configuration.
|
||||
func (m *upstreamManager) customUpstreamConfig(uid UID) (proxyConf *proxy.CustomUpstreamConfig) {
|
||||
cliConf, ok := m.uidToCustomConf[uid]
|
||||
if !ok {
|
||||
// TODO(s.chzhen): Consider panic.
|
||||
m.logger.Error("no associated custom client upstream config")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !m.isConfigChanged(cliConf) {
|
||||
return cliConf.proxyConf
|
||||
}
|
||||
|
||||
if cliConf.proxyConf != nil {
|
||||
err := cliConf.proxyConf.Close()
|
||||
if err != nil {
|
||||
// TODO(s.chzhen): Pass context.
|
||||
m.logger.Debug("closing custom upstream config", slogutil.KeyError, err)
|
||||
}
|
||||
}
|
||||
|
||||
proxyConf = newCustomUpstreamConfig(cliConf, m.commonConf)
|
||||
cliConf.proxyConf = proxyConf
|
||||
cliConf.isChanged = false
|
||||
|
||||
return proxyConf
|
||||
}
|
||||
|
||||
// isConfigChanged returns true if the update is necessary for the custom client
|
||||
// upstream configuration.
|
||||
func (m *upstreamManager) isConfigChanged(cliConf *customUpstreamConfig) (ok bool) {
|
||||
return !m.confUpdate.Equal(cliConf.commonConfUpdate) || cliConf.isChanged
|
||||
}
|
||||
|
||||
// clearUpstreamCache clears the upstream cache for each stored custom client
|
||||
// upstream configuration.
|
||||
func (m *upstreamManager) clearUpstreamCache() {
|
||||
for _, c := range m.uidToCustomConf {
|
||||
c.proxyConf.ClearCache()
|
||||
}
|
||||
}
|
||||
|
||||
// remove deletes the custom client upstream configuration and closes
|
||||
// [customUpstreamConfig.proxyConf] if necessary.
|
||||
func (m *upstreamManager) remove(uid UID) (err error) {
|
||||
cliConf, ok := m.uidToCustomConf[uid]
|
||||
if !ok {
|
||||
// TODO(s.chzhen): Consider panic.
|
||||
return errors.Error("no associated custom client upstream config")
|
||||
}
|
||||
|
||||
delete(m.uidToCustomConf, uid)
|
||||
|
||||
if cliConf.proxyConf != nil {
|
||||
return cliConf.proxyConf.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// close shuts down each stored custom client upstream configuration.
|
||||
func (m *upstreamManager) close() (err error) {
|
||||
var errs []error
|
||||
for _, c := range m.uidToCustomConf {
|
||||
if c.proxyConf == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
errs = append(errs, c.proxyConf.Close())
|
||||
}
|
||||
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// newCustomUpstreamConfig returns the new properly initialized custom proxy
|
||||
// upstream configuration for the client.
|
||||
func newCustomUpstreamConfig(
|
||||
cliConf *customUpstreamConfig,
|
||||
conf *CommonUpstreamConfig,
|
||||
) (proxyConf *proxy.CustomUpstreamConfig) {
|
||||
upstreams := stringutil.FilterOut(cliConf.upstreams, aghnet.IsCommentOrEmpty)
|
||||
if len(upstreams) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
upsConf, err := proxy.ParseUpstreamsConfig(
|
||||
upstreams,
|
||||
&upstream.Options{
|
||||
Bootstrap: conf.Bootstrap,
|
||||
Timeout: time.Duration(conf.UpstreamTimeout),
|
||||
HTTPVersions: aghnet.UpstreamHTTPVersions(conf.UseHTTP3Upstreams),
|
||||
PreferIPv6: conf.BootstrapPreferIPv6,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
// Should not happen because upstreams are already validated. See
|
||||
// [Persistent.validate].
|
||||
panic(fmt.Errorf("creating custom upstream config: %w", err))
|
||||
}
|
||||
|
||||
return proxy.NewCustomUpstreamConfig(
|
||||
upsConf,
|
||||
cliConf.upstreamsCacheEnabled,
|
||||
int(cliConf.upstreamsCacheSize),
|
||||
conf.EDNSClientSubnetEnabled,
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user