19 Commits

Author SHA1 Message Date
Toby
3ec5456e86 Merge pull request #135 from apernet/revert-pcap
Revert pcap
2024-05-07 23:46:46 -07:00
Toby
b51ea5fa07 Revert "Merge pull request #134 from apernet/ci-cgo"
This reverts commit 5014523ae0, reversing
changes made to c453020349.
2024-05-07 23:44:00 -07:00
Toby
2ac8783eb6 Revert "Merge pull request #132 from eddc005/feat-pcap"
This reverts commit c453020349, reversing
changes made to 5723490a6c.
2024-05-07 23:43:54 -07:00
Toby
5014523ae0 Merge pull request #134 from apernet/ci-cgo
ci: enable cgo
2024-05-07 23:40:17 -07:00
Toby
dabcc9566c ci: enable cgo 2024-05-07 23:38:28 -07:00
Toby
c453020349 Merge pull request #132 from eddc005/feat-pcap
feat: add support for pcap replay
2024-05-07 23:18:03 -07:00
Toby
0daaa32fc6 ci: install pcap for build 2 2024-05-07 23:13:58 -07:00
Toby
5e15fd6dd9 ci: install pcap for build 2024-05-07 23:12:24 -07:00
Toby
76c0f47832 chore: do not default replay.realtime to true 2024-05-07 23:05:06 -07:00
Toby
70fee14103 chore: format 2024-05-07 22:50:58 -07:00
eddc005
abd7725fed close pcap properly and implement ProtectedDialContext 2024-05-07 21:50:06 +01:00
eddc005
f01b79e625 rebase and remove replayDelay 2024-05-06 23:04:54 +01:00
eddc005
94387450cf feat: add support for pcap replay 2024-05-06 22:53:11 +01:00
Toby
5723490a6c Merge pull request #133 from apernet/fix-timestamp
fix: provide correct timestamp for TCP reassembler
2024-05-06 14:38:35 -07:00
Toby
d7506264ad fix: provide correct timestamp for TCP reassembler 2024-05-06 14:35:31 -07:00
Toby
245ac46b65 Merge pull request #130 from apernet/fix-geo-leak
fix: do not reload geoip/geosite when reloading ruleset to prevent leaking references to streams
2024-04-10 22:57:18 -07:00
Toby
107e29ee20 fix: do not reload geoip/geosite when reloading ruleset to prevent leaking references to streams 2024-04-10 21:30:37 -07:00
Toby
5f447d4e31 Merge pull request #124 from apernet/wip-tcp-flush
feat: TCP timeout flush
2024-04-09 11:07:20 -07:00
Toby
347667a2bd feat: TCP timeout flush 2024-04-08 11:54:35 -07:00
8 changed files with 99 additions and 35 deletions

View File

@@ -7,6 +7,7 @@ import (
"os/signal" "os/signal"
"strings" "strings"
"syscall" "syscall"
"time"
"github.com/apernet/OpenGFW/analyzer" "github.com/apernet/OpenGFW/analyzer"
"github.com/apernet/OpenGFW/analyzer/tcp" "github.com/apernet/OpenGFW/analyzer/tcp"
@@ -16,6 +17,7 @@ import (
"github.com/apernet/OpenGFW/modifier" "github.com/apernet/OpenGFW/modifier"
modUDP "github.com/apernet/OpenGFW/modifier/udp" modUDP "github.com/apernet/OpenGFW/modifier/udp"
"github.com/apernet/OpenGFW/ruleset" "github.com/apernet/OpenGFW/ruleset"
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
@@ -176,11 +178,12 @@ type cliConfigIO struct {
} }
type cliConfigWorkers struct { type cliConfigWorkers struct {
Count int `mapstructure:"count"` Count int `mapstructure:"count"`
QueueSize int `mapstructure:"queueSize"` QueueSize int `mapstructure:"queueSize"`
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"` TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"` TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
UDPMaxStreams int `mapstructure:"udpMaxStreams"` TCPTimeout time.Duration `mapstructure:"tcpTimeout"`
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
} }
type cliConfigRuleset struct { type cliConfigRuleset struct {
@@ -213,6 +216,7 @@ func (c *cliConfig) fillWorkers(config *engine.Config) error {
config.WorkerQueueSize = c.Workers.QueueSize config.WorkerQueueSize = c.Workers.QueueSize
config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal
config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn
config.WorkerTCPTimeout = c.Workers.TCPTimeout
config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams
return nil return nil
} }
@@ -256,8 +260,7 @@ func runMain(cmd *cobra.Command, args []string) {
} }
rsConfig := &ruleset.BuiltinConfig{ rsConfig := &ruleset.BuiltinConfig{
Logger: &rulesetLogger{}, Logger: &rulesetLogger{},
GeoSiteFilename: config.Ruleset.GeoSite, GeoMatcher: geo.NewGeoMatcher(config.Ruleset.GeoSite, config.Ruleset.GeoIp),
GeoIpFilename: config.Ruleset.GeoIp,
ProtectedDialContext: engineConfig.IO.ProtectedDialContext, ProtectedDialContext: engineConfig.IO.ProtectedDialContext,
} }
rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig) rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig)
@@ -340,12 +343,26 @@ func (l *engineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
} }
func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
logger.Info("TCP stream action", if noMatch {
zap.Int64("id", info.ID), logger.Debug("TCP stream no match",
zap.String("src", info.SrcString()), zap.Int64("id", info.ID),
zap.String("dst", info.DstString()), zap.String("src", info.SrcString()),
zap.String("action", action.String()), zap.String("dst", info.DstString()),
zap.Bool("noMatch", noMatch)) zap.String("action", action.String()))
} else {
logger.Info("TCP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
}
}
func (l *engineLogger) TCPFlush(workerID, flushed, closed int) {
logger.Debug("TCP flush",
zap.Int("workerID", workerID),
zap.Int("flushed", flushed),
zap.Int("closed", closed))
} }
func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) { func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {
@@ -366,12 +383,19 @@ func (l *engineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
} }
func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
logger.Info("UDP stream action", if noMatch {
zap.Int64("id", info.ID), logger.Debug("UDP stream no match",
zap.String("src", info.SrcString()), zap.Int64("id", info.ID),
zap.String("dst", info.DstString()), zap.String("src", info.SrcString()),
zap.String("action", action.String()), zap.String("dst", info.DstString()),
zap.Bool("noMatch", noMatch)) zap.String("action", action.String()))
} else {
logger.Info("UDP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
}
} }
func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) { func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) {

View File

@@ -34,6 +34,7 @@ func NewEngine(config Config) (Engine, error) {
Ruleset: config.Ruleset, Ruleset: config.Ruleset,
TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal, TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal,
TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn, TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn,
TCPTimeout: config.WorkerTCPTimeout,
UDPMaxStreams: config.WorkerUDPMaxStreams, UDPMaxStreams: config.WorkerUDPMaxStreams,
}) })
if err != nil { if err != nil {
@@ -101,9 +102,11 @@ func (e *engine) dispatch(p io.Packet) bool {
_ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil) _ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil)
return true return true
} }
// Convert to gopacket.Packet
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
packet.Metadata().Timestamp = p.Timestamp()
// Load balance by stream ID // Load balance by stream ID
index := p.StreamID() % uint32(len(e.workers)) index := p.StreamID() % uint32(len(e.workers))
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
e.workers[index].Feed(&workerPacket{ e.workers[index].Feed(&workerPacket{
StreamID: p.StreamID(), StreamID: p.StreamID(),
Packet: packet, Packet: packet,

View File

@@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"time"
"github.com/apernet/OpenGFW/io" "github.com/apernet/OpenGFW/io"
"github.com/apernet/OpenGFW/ruleset" "github.com/apernet/OpenGFW/ruleset"
@@ -25,6 +26,7 @@ type Config struct {
WorkerQueueSize int WorkerQueueSize int
WorkerTCPMaxBufferedPagesTotal int WorkerTCPMaxBufferedPagesTotal int
WorkerTCPMaxBufferedPagesPerConn int WorkerTCPMaxBufferedPagesPerConn int
WorkerTCPTimeout time.Duration
WorkerUDPMaxStreams int WorkerUDPMaxStreams int
} }
@@ -36,6 +38,7 @@ type Logger interface {
TCPStreamNew(workerID int, info ruleset.StreamInfo) TCPStreamNew(workerID int, info ruleset.StreamInfo)
TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool)
TCPFlush(workerID, flushed, closed int)
UDPStreamNew(workerID int, info ruleset.StreamInfo) UDPStreamNew(workerID int, info ruleset.StreamInfo)
UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)

View File

@@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"time"
"github.com/apernet/OpenGFW/io" "github.com/apernet/OpenGFW/io"
"github.com/apernet/OpenGFW/ruleset" "github.com/apernet/OpenGFW/ruleset"
@@ -14,9 +15,12 @@ import (
const ( const (
defaultChanSize = 64 defaultChanSize = 64
defaultTCPMaxBufferedPagesTotal = 4096 defaultTCPMaxBufferedPagesTotal = 65536
defaultTCPMaxBufferedPagesPerConnection = 64 defaultTCPMaxBufferedPagesPerConnection = 16
defaultTCPTimeout = 10 * time.Minute
defaultUDPMaxStreams = 4096 defaultUDPMaxStreams = 4096
tcpFlushInterval = 1 * time.Minute
) )
type workerPacket struct { type workerPacket struct {
@@ -33,6 +37,7 @@ type worker struct {
tcpStreamFactory *tcpStreamFactory tcpStreamFactory *tcpStreamFactory
tcpStreamPool *reassembly.StreamPool tcpStreamPool *reassembly.StreamPool
tcpAssembler *reassembly.Assembler tcpAssembler *reassembly.Assembler
tcpTimeout time.Duration
udpStreamFactory *udpStreamFactory udpStreamFactory *udpStreamFactory
udpStreamManager *udpStreamManager udpStreamManager *udpStreamManager
@@ -47,6 +52,7 @@ type workerConfig struct {
Ruleset ruleset.Ruleset Ruleset ruleset.Ruleset
TCPMaxBufferedPagesTotal int TCPMaxBufferedPagesTotal int
TCPMaxBufferedPagesPerConn int TCPMaxBufferedPagesPerConn int
TCPTimeout time.Duration
UDPMaxStreams int UDPMaxStreams int
} }
@@ -60,6 +66,9 @@ func (c *workerConfig) fillDefaults() {
if c.TCPMaxBufferedPagesPerConn <= 0 { if c.TCPMaxBufferedPagesPerConn <= 0 {
c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection
} }
if c.TCPTimeout <= 0 {
c.TCPTimeout = defaultTCPTimeout
}
if c.UDPMaxStreams <= 0 { if c.UDPMaxStreams <= 0 {
c.UDPMaxStreams = defaultUDPMaxStreams c.UDPMaxStreams = defaultUDPMaxStreams
} }
@@ -98,6 +107,7 @@ func newWorker(config workerConfig) (*worker, error) {
tcpStreamFactory: tcpSF, tcpStreamFactory: tcpSF,
tcpStreamPool: tcpStreamPool, tcpStreamPool: tcpStreamPool,
tcpAssembler: tcpAssembler, tcpAssembler: tcpAssembler,
tcpTimeout: config.TCPTimeout,
udpStreamFactory: udpSF, udpStreamFactory: udpSF,
udpStreamManager: udpSM, udpStreamManager: udpSM,
modSerializeBuffer: gopacket.NewSerializeBuffer(), modSerializeBuffer: gopacket.NewSerializeBuffer(),
@@ -111,6 +121,10 @@ func (w *worker) Feed(p *workerPacket) {
func (w *worker) Run(ctx context.Context) { func (w *worker) Run(ctx context.Context) {
w.logger.WorkerStart(w.id) w.logger.WorkerStart(w.id)
defer w.logger.WorkerStop(w.id) defer w.logger.WorkerStop(w.id)
tcpFlushTicker := time.NewTicker(tcpFlushInterval)
defer tcpFlushTicker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -122,6 +136,8 @@ func (w *worker) Run(ctx context.Context) {
} }
v, b := w.handle(wPkt.StreamID, wPkt.Packet) v, b := w.handle(wPkt.StreamID, wPkt.Packet)
_ = wPkt.SetVerdict(v, b) _ = wPkt.SetVerdict(v, b)
case <-tcpFlushTicker.C:
w.flushTCP(w.tcpTimeout)
} }
} }
} }
@@ -176,6 +192,11 @@ func (w *worker) handleTCP(ipFlow gopacket.Flow, pMeta *gopacket.PacketMetadata,
return io.Verdict(ctx.Verdict) return io.Verdict(ctx.Verdict)
} }
func (w *worker) flushTCP(timeout time.Duration) {
flushed, closed := w.tcpAssembler.FlushCloseOlderThan(time.Now().Add(-timeout))
w.logger.TCPFlush(w.id, flushed, closed)
}
func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) { func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) {
ctx := &udpContext{ ctx := &udpContext{
Verdict: udpVerdictAccept, Verdict: udpVerdictAccept,

View File

@@ -3,6 +3,7 @@ package io
import ( import (
"context" "context"
"net" "net"
"time"
) )
type Verdict int type Verdict int
@@ -24,6 +25,8 @@ const (
type Packet interface { type Packet interface {
// StreamID is the ID of the stream the packet belongs to. // StreamID is the ID of the stream the packet belongs to.
StreamID() uint32 StreamID() uint32
// Timestamp is the time the packet was received.
Timestamp() time.Time
// Data is the raw packet data, starting with the IP header. // Data is the raw packet data, starting with the IP header.
Data() []byte Data() []byte
} }

View File

@@ -10,6 +10,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
"time"
"github.com/coreos/go-iptables/iptables" "github.com/coreos/go-iptables/iptables"
"github.com/florianl/go-nfqueue" "github.com/florianl/go-nfqueue"
@@ -189,6 +190,12 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error
streamID: ctIDFromCtBytes(*a.Ct), streamID: ctIDFromCtBytes(*a.Ct),
data: *a.Payload, data: *a.Payload,
} }
// Use timestamp from attribute if available, otherwise use current time as fallback
if a.Timestamp != nil {
p.timestamp = *a.Timestamp
} else {
p.timestamp = time.Now()
}
return okBoolToInt(cb(p, nil)) return okBoolToInt(cb(p, nil))
}, },
func(e error) int { func(e error) int {
@@ -312,15 +319,20 @@ func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error {
var _ Packet = (*nfqueuePacket)(nil) var _ Packet = (*nfqueuePacket)(nil)
type nfqueuePacket struct { type nfqueuePacket struct {
id uint32 id uint32
streamID uint32 streamID uint32
data []byte timestamp time.Time
data []byte
} }
func (p *nfqueuePacket) StreamID() uint32 { func (p *nfqueuePacket) StreamID() uint32 {
return p.streamID return p.streamID
} }
func (p *nfqueuePacket) Timestamp() time.Time {
return p.timestamp
}
func (p *nfqueuePacket) Data() []byte { func (p *nfqueuePacket) Data() []byte {
return p.data return p.data
} }

View File

@@ -20,7 +20,6 @@ import (
"github.com/apernet/OpenGFW/analyzer" "github.com/apernet/OpenGFW/analyzer"
"github.com/apernet/OpenGFW/modifier" "github.com/apernet/OpenGFW/modifier"
"github.com/apernet/OpenGFW/ruleset/builtins" "github.com/apernet/OpenGFW/ruleset/builtins"
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
) )
// ExprRule is the external representation of an expression rule. // ExprRule is the external representation of an expression rule.
@@ -302,23 +301,22 @@ type Function struct {
} }
func buildFunctionMap(config *BuiltinConfig) map[string]*Function { func buildFunctionMap(config *BuiltinConfig) map[string]*Function {
geoMatcher := geo.NewGeoMatcher(config.GeoSiteFilename, config.GeoIpFilename)
return map[string]*Function{ return map[string]*Function{
"geoip": { "geoip": {
InitFunc: geoMatcher.LoadGeoIP, InitFunc: config.GeoMatcher.LoadGeoIP,
PatchFunc: nil, PatchFunc: nil,
Func: func(params ...any) (any, error) { Func: func(params ...any) (any, error) {
return geoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil return config.GeoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil
}, },
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoIp)}, Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoIp)},
}, },
"geosite": { "geosite": {
InitFunc: geoMatcher.LoadGeoSite, InitFunc: config.GeoMatcher.LoadGeoSite,
PatchFunc: nil, PatchFunc: nil,
Func: func(params ...any) (any, error) { Func: func(params ...any) (any, error) {
return geoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil return config.GeoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil
}, },
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoSite)}, Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoSite)},
}, },
"cidr": { "cidr": {
InitFunc: nil, InitFunc: nil,

View File

@@ -7,6 +7,7 @@ import (
"github.com/apernet/OpenGFW/analyzer" "github.com/apernet/OpenGFW/analyzer"
"github.com/apernet/OpenGFW/modifier" "github.com/apernet/OpenGFW/modifier"
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
) )
type Action int type Action int
@@ -102,7 +103,6 @@ type Logger interface {
type BuiltinConfig struct { type BuiltinConfig struct {
Logger Logger Logger Logger
GeoSiteFilename string GeoMatcher *geo.GeoMatcher
GeoIpFilename string
ProtectedDialContext func(ctx context.Context, network, address string) (net.Conn, error) ProtectedDialContext func(ctx context.Context, network, address string) (net.Conn, error)
} }