Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1de95ed53e | ||
|
|
1934c065ec | ||
|
|
301f9af3d4 | ||
|
|
cb0427bfbb | ||
|
|
7456e5907e | ||
|
|
8cab86b924 | ||
|
|
3ec5456e86 | ||
|
|
b51ea5fa07 | ||
|
|
2ac8783eb6 | ||
|
|
5014523ae0 | ||
|
|
dabcc9566c | ||
|
|
c453020349 | ||
|
|
0daaa32fc6 | ||
|
|
5e15fd6dd9 | ||
|
|
76c0f47832 | ||
|
|
70fee14103 | ||
|
|
abd7725fed | ||
|
|
f01b79e625 | ||
|
|
94387450cf | ||
|
|
5723490a6c | ||
|
|
d7506264ad | ||
|
|
245ac46b65 | ||
|
|
107e29ee20 | ||
|
|
5f447d4e31 | ||
|
|
347667a2bd |
98
cmd/root.go
98
cmd/root.go
@@ -7,6 +7,7 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/apernet/OpenGFW/analyzer"
|
"github.com/apernet/OpenGFW/analyzer"
|
||||||
"github.com/apernet/OpenGFW/analyzer/tcp"
|
"github.com/apernet/OpenGFW/analyzer/tcp"
|
||||||
@@ -16,6 +17,7 @@ import (
|
|||||||
"github.com/apernet/OpenGFW/modifier"
|
"github.com/apernet/OpenGFW/modifier"
|
||||||
modUDP "github.com/apernet/OpenGFW/modifier/udp"
|
modUDP "github.com/apernet/OpenGFW/modifier/udp"
|
||||||
"github.com/apernet/OpenGFW/ruleset"
|
"github.com/apernet/OpenGFW/ruleset"
|
||||||
|
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
@@ -41,6 +43,7 @@ var logger *zap.Logger
|
|||||||
// Flags
|
// Flags
|
||||||
var (
|
var (
|
||||||
cfgFile string
|
cfgFile string
|
||||||
|
pcapFile string
|
||||||
logLevel string
|
logLevel string
|
||||||
logFormat string
|
logFormat string
|
||||||
)
|
)
|
||||||
@@ -116,6 +119,7 @@ func init() {
|
|||||||
|
|
||||||
func initFlags() {
|
func initFlags() {
|
||||||
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file")
|
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file")
|
||||||
|
rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)")
|
||||||
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level")
|
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level")
|
||||||
rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format")
|
rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format")
|
||||||
}
|
}
|
||||||
@@ -165,6 +169,7 @@ type cliConfig struct {
|
|||||||
IO cliConfigIO `mapstructure:"io"`
|
IO cliConfigIO `mapstructure:"io"`
|
||||||
Workers cliConfigWorkers `mapstructure:"workers"`
|
Workers cliConfigWorkers `mapstructure:"workers"`
|
||||||
Ruleset cliConfigRuleset `mapstructure:"ruleset"`
|
Ruleset cliConfigRuleset `mapstructure:"ruleset"`
|
||||||
|
Replay cliConfigReplay `mapstructure:"replay"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type cliConfigIO struct {
|
type cliConfigIO struct {
|
||||||
@@ -175,12 +180,17 @@ type cliConfigIO struct {
|
|||||||
RST bool `mapstructure:"rst"`
|
RST bool `mapstructure:"rst"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type cliConfigReplay struct {
|
||||||
|
Realtime bool `mapstructure:"realtime"`
|
||||||
|
}
|
||||||
|
|
||||||
type cliConfigWorkers struct {
|
type cliConfigWorkers struct {
|
||||||
Count int `mapstructure:"count"`
|
Count int `mapstructure:"count"`
|
||||||
QueueSize int `mapstructure:"queueSize"`
|
QueueSize int `mapstructure:"queueSize"`
|
||||||
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
|
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
|
||||||
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
|
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
|
||||||
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
|
TCPTimeout time.Duration `mapstructure:"tcpTimeout"`
|
||||||
|
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type cliConfigRuleset struct {
|
type cliConfigRuleset struct {
|
||||||
@@ -194,17 +204,30 @@ func (c *cliConfig) fillLogger(config *engine.Config) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *cliConfig) fillIO(config *engine.Config) error {
|
func (c *cliConfig) fillIO(config *engine.Config) error {
|
||||||
nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
|
var ioImpl io.PacketIO
|
||||||
QueueSize: c.IO.QueueSize,
|
var err error
|
||||||
ReadBuffer: c.IO.ReadBuffer,
|
if pcapFile != "" {
|
||||||
WriteBuffer: c.IO.WriteBuffer,
|
// Setup IO for pcap file replay
|
||||||
Local: c.IO.Local,
|
logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile))
|
||||||
RST: c.IO.RST,
|
ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{
|
||||||
})
|
PcapFile: pcapFile,
|
||||||
|
Realtime: c.Replay.Realtime,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// Setup IO for nfqueue
|
||||||
|
ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
|
||||||
|
QueueSize: c.IO.QueueSize,
|
||||||
|
ReadBuffer: c.IO.ReadBuffer,
|
||||||
|
WriteBuffer: c.IO.WriteBuffer,
|
||||||
|
Local: c.IO.Local,
|
||||||
|
RST: c.IO.RST,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return configError{Field: "io", Err: err}
|
return configError{Field: "io", Err: err}
|
||||||
}
|
}
|
||||||
config.IO = nfio
|
config.IO = ioImpl
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -213,6 +236,7 @@ func (c *cliConfig) fillWorkers(config *engine.Config) error {
|
|||||||
config.WorkerQueueSize = c.Workers.QueueSize
|
config.WorkerQueueSize = c.Workers.QueueSize
|
||||||
config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal
|
config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal
|
||||||
config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn
|
config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn
|
||||||
|
config.WorkerTCPTimeout = c.Workers.TCPTimeout
|
||||||
config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams
|
config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -256,8 +280,7 @@ func runMain(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
rsConfig := &ruleset.BuiltinConfig{
|
rsConfig := &ruleset.BuiltinConfig{
|
||||||
Logger: &rulesetLogger{},
|
Logger: &rulesetLogger{},
|
||||||
GeoSiteFilename: config.Ruleset.GeoSite,
|
GeoMatcher: geo.NewGeoMatcher(config.Ruleset.GeoSite, config.Ruleset.GeoIp),
|
||||||
GeoIpFilename: config.Ruleset.GeoIp,
|
|
||||||
ProtectedDialContext: engineConfig.IO.ProtectedDialContext,
|
ProtectedDialContext: engineConfig.IO.ProtectedDialContext,
|
||||||
}
|
}
|
||||||
rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig)
|
rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig)
|
||||||
@@ -340,12 +363,26 @@ func (l *engineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
||||||
logger.Info("TCP stream action",
|
if noMatch {
|
||||||
zap.Int64("id", info.ID),
|
logger.Debug("TCP stream no match",
|
||||||
zap.String("src", info.SrcString()),
|
zap.Int64("id", info.ID),
|
||||||
zap.String("dst", info.DstString()),
|
zap.String("src", info.SrcString()),
|
||||||
zap.String("action", action.String()),
|
zap.String("dst", info.DstString()),
|
||||||
zap.Bool("noMatch", noMatch))
|
zap.String("action", action.String()))
|
||||||
|
} else {
|
||||||
|
logger.Info("TCP stream action",
|
||||||
|
zap.Int64("id", info.ID),
|
||||||
|
zap.String("src", info.SrcString()),
|
||||||
|
zap.String("dst", info.DstString()),
|
||||||
|
zap.String("action", action.String()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *engineLogger) TCPFlush(workerID, flushed, closed int) {
|
||||||
|
logger.Debug("TCP flush",
|
||||||
|
zap.Int("workerID", workerID),
|
||||||
|
zap.Int("flushed", flushed),
|
||||||
|
zap.Int("closed", closed))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {
|
func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {
|
||||||
@@ -366,12 +403,19 @@ func (l *engineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
||||||
logger.Info("UDP stream action",
|
if noMatch {
|
||||||
zap.Int64("id", info.ID),
|
logger.Debug("UDP stream no match",
|
||||||
zap.String("src", info.SrcString()),
|
zap.Int64("id", info.ID),
|
||||||
zap.String("dst", info.DstString()),
|
zap.String("src", info.SrcString()),
|
||||||
zap.String("action", action.String()),
|
zap.String("dst", info.DstString()),
|
||||||
zap.Bool("noMatch", noMatch))
|
zap.String("action", action.String()))
|
||||||
|
} else {
|
||||||
|
logger.Info("UDP stream action",
|
||||||
|
zap.Int64("id", info.ID),
|
||||||
|
zap.String("src", info.SrcString()),
|
||||||
|
zap.String("dst", info.DstString()),
|
||||||
|
zap.String("action", action.String()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) {
|
func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) {
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ func NewEngine(config Config) (Engine, error) {
|
|||||||
Ruleset: config.Ruleset,
|
Ruleset: config.Ruleset,
|
||||||
TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal,
|
TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal,
|
||||||
TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn,
|
TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn,
|
||||||
|
TCPTimeout: config.WorkerTCPTimeout,
|
||||||
UDPMaxStreams: config.WorkerUDPMaxStreams,
|
UDPMaxStreams: config.WorkerUDPMaxStreams,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -57,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *engine) Run(ctx context.Context) error {
|
func (e *engine) Run(ctx context.Context) error {
|
||||||
|
workerCtx, workerCancel := context.WithCancel(ctx)
|
||||||
|
defer workerCancel() // Stop workers
|
||||||
|
|
||||||
|
// Register IO shutdown
|
||||||
ioCtx, ioCancel := context.WithCancel(ctx)
|
ioCtx, ioCancel := context.WithCancel(ctx)
|
||||||
defer ioCancel() // Stop workers & IO
|
e.io.SetCancelFunc(ioCancel)
|
||||||
|
defer ioCancel() // Stop IO
|
||||||
|
|
||||||
// Start workers
|
// Start workers
|
||||||
for _, w := range e.workers {
|
for _, w := range e.workers {
|
||||||
go w.Run(ioCtx)
|
go w.Run(workerCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register IO callback
|
// Register IO callback
|
||||||
@@ -84,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
|
case <-ioCtx.Done():
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -101,9 +109,11 @@ func (e *engine) dispatch(p io.Packet) bool {
|
|||||||
_ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil)
|
_ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
// Convert to gopacket.Packet
|
||||||
|
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
|
||||||
|
packet.Metadata().Timestamp = p.Timestamp()
|
||||||
// Load balance by stream ID
|
// Load balance by stream ID
|
||||||
index := p.StreamID() % uint32(len(e.workers))
|
index := p.StreamID() % uint32(len(e.workers))
|
||||||
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
|
|
||||||
e.workers[index].Feed(&workerPacket{
|
e.workers[index].Feed(&workerPacket{
|
||||||
StreamID: p.StreamID(),
|
StreamID: p.StreamID(),
|
||||||
Packet: packet,
|
Packet: packet,
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package engine
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/apernet/OpenGFW/io"
|
"github.com/apernet/OpenGFW/io"
|
||||||
"github.com/apernet/OpenGFW/ruleset"
|
"github.com/apernet/OpenGFW/ruleset"
|
||||||
@@ -25,6 +26,7 @@ type Config struct {
|
|||||||
WorkerQueueSize int
|
WorkerQueueSize int
|
||||||
WorkerTCPMaxBufferedPagesTotal int
|
WorkerTCPMaxBufferedPagesTotal int
|
||||||
WorkerTCPMaxBufferedPagesPerConn int
|
WorkerTCPMaxBufferedPagesPerConn int
|
||||||
|
WorkerTCPTimeout time.Duration
|
||||||
WorkerUDPMaxStreams int
|
WorkerUDPMaxStreams int
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,6 +38,7 @@ type Logger interface {
|
|||||||
TCPStreamNew(workerID int, info ruleset.StreamInfo)
|
TCPStreamNew(workerID int, info ruleset.StreamInfo)
|
||||||
TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
||||||
TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool)
|
TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool)
|
||||||
|
TCPFlush(workerID, flushed, closed int)
|
||||||
|
|
||||||
UDPStreamNew(workerID int, info ruleset.StreamInfo)
|
UDPStreamNew(workerID int, info ruleset.StreamInfo)
|
||||||
UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package engine
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/apernet/OpenGFW/io"
|
"github.com/apernet/OpenGFW/io"
|
||||||
"github.com/apernet/OpenGFW/ruleset"
|
"github.com/apernet/OpenGFW/ruleset"
|
||||||
@@ -14,9 +15,12 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
defaultChanSize = 64
|
defaultChanSize = 64
|
||||||
defaultTCPMaxBufferedPagesTotal = 4096
|
defaultTCPMaxBufferedPagesTotal = 65536
|
||||||
defaultTCPMaxBufferedPagesPerConnection = 64
|
defaultTCPMaxBufferedPagesPerConnection = 16
|
||||||
|
defaultTCPTimeout = 10 * time.Minute
|
||||||
defaultUDPMaxStreams = 4096
|
defaultUDPMaxStreams = 4096
|
||||||
|
|
||||||
|
tcpFlushInterval = 1 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
type workerPacket struct {
|
type workerPacket struct {
|
||||||
@@ -33,6 +37,7 @@ type worker struct {
|
|||||||
tcpStreamFactory *tcpStreamFactory
|
tcpStreamFactory *tcpStreamFactory
|
||||||
tcpStreamPool *reassembly.StreamPool
|
tcpStreamPool *reassembly.StreamPool
|
||||||
tcpAssembler *reassembly.Assembler
|
tcpAssembler *reassembly.Assembler
|
||||||
|
tcpTimeout time.Duration
|
||||||
|
|
||||||
udpStreamFactory *udpStreamFactory
|
udpStreamFactory *udpStreamFactory
|
||||||
udpStreamManager *udpStreamManager
|
udpStreamManager *udpStreamManager
|
||||||
@@ -47,6 +52,7 @@ type workerConfig struct {
|
|||||||
Ruleset ruleset.Ruleset
|
Ruleset ruleset.Ruleset
|
||||||
TCPMaxBufferedPagesTotal int
|
TCPMaxBufferedPagesTotal int
|
||||||
TCPMaxBufferedPagesPerConn int
|
TCPMaxBufferedPagesPerConn int
|
||||||
|
TCPTimeout time.Duration
|
||||||
UDPMaxStreams int
|
UDPMaxStreams int
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,6 +66,9 @@ func (c *workerConfig) fillDefaults() {
|
|||||||
if c.TCPMaxBufferedPagesPerConn <= 0 {
|
if c.TCPMaxBufferedPagesPerConn <= 0 {
|
||||||
c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection
|
c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection
|
||||||
}
|
}
|
||||||
|
if c.TCPTimeout <= 0 {
|
||||||
|
c.TCPTimeout = defaultTCPTimeout
|
||||||
|
}
|
||||||
if c.UDPMaxStreams <= 0 {
|
if c.UDPMaxStreams <= 0 {
|
||||||
c.UDPMaxStreams = defaultUDPMaxStreams
|
c.UDPMaxStreams = defaultUDPMaxStreams
|
||||||
}
|
}
|
||||||
@@ -98,6 +107,7 @@ func newWorker(config workerConfig) (*worker, error) {
|
|||||||
tcpStreamFactory: tcpSF,
|
tcpStreamFactory: tcpSF,
|
||||||
tcpStreamPool: tcpStreamPool,
|
tcpStreamPool: tcpStreamPool,
|
||||||
tcpAssembler: tcpAssembler,
|
tcpAssembler: tcpAssembler,
|
||||||
|
tcpTimeout: config.TCPTimeout,
|
||||||
udpStreamFactory: udpSF,
|
udpStreamFactory: udpSF,
|
||||||
udpStreamManager: udpSM,
|
udpStreamManager: udpSM,
|
||||||
modSerializeBuffer: gopacket.NewSerializeBuffer(),
|
modSerializeBuffer: gopacket.NewSerializeBuffer(),
|
||||||
@@ -111,6 +121,10 @@ func (w *worker) Feed(p *workerPacket) {
|
|||||||
func (w *worker) Run(ctx context.Context) {
|
func (w *worker) Run(ctx context.Context) {
|
||||||
w.logger.WorkerStart(w.id)
|
w.logger.WorkerStart(w.id)
|
||||||
defer w.logger.WorkerStop(w.id)
|
defer w.logger.WorkerStop(w.id)
|
||||||
|
|
||||||
|
tcpFlushTicker := time.NewTicker(tcpFlushInterval)
|
||||||
|
defer tcpFlushTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -122,6 +136,8 @@ func (w *worker) Run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
v, b := w.handle(wPkt.StreamID, wPkt.Packet)
|
v, b := w.handle(wPkt.StreamID, wPkt.Packet)
|
||||||
_ = wPkt.SetVerdict(v, b)
|
_ = wPkt.SetVerdict(v, b)
|
||||||
|
case <-tcpFlushTicker.C:
|
||||||
|
w.flushTCP(w.tcpTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -176,6 +192,11 @@ func (w *worker) handleTCP(ipFlow gopacket.Flow, pMeta *gopacket.PacketMetadata,
|
|||||||
return io.Verdict(ctx.Verdict)
|
return io.Verdict(ctx.Verdict)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *worker) flushTCP(timeout time.Duration) {
|
||||||
|
flushed, closed := w.tcpAssembler.FlushCloseOlderThan(time.Now().Add(-timeout))
|
||||||
|
w.logger.TCPFlush(w.id, flushed, closed)
|
||||||
|
}
|
||||||
|
|
||||||
func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) {
|
func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) {
|
||||||
ctx := &udpContext{
|
ctx := &udpContext{
|
||||||
Verdict: udpVerdictAccept,
|
Verdict: udpVerdictAccept,
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package io
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Verdict int
|
type Verdict int
|
||||||
@@ -24,6 +25,8 @@ const (
|
|||||||
type Packet interface {
|
type Packet interface {
|
||||||
// StreamID is the ID of the stream the packet belongs to.
|
// StreamID is the ID of the stream the packet belongs to.
|
||||||
StreamID() uint32
|
StreamID() uint32
|
||||||
|
// Timestamp is the time the packet was received.
|
||||||
|
Timestamp() time.Time
|
||||||
// Data is the raw packet data, starting with the IP header.
|
// Data is the raw packet data, starting with the IP header.
|
||||||
Data() []byte
|
Data() []byte
|
||||||
}
|
}
|
||||||
@@ -45,6 +48,9 @@ type PacketIO interface {
|
|||||||
ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error)
|
ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
// Close closes the packet IO.
|
// Close closes the packet IO.
|
||||||
Close() error
|
Close() error
|
||||||
|
// SetCancelFunc gives packet IO access to context cancel function, enabling it to
|
||||||
|
// trigger a shutdown
|
||||||
|
SetCancelFunc(cancelFunc context.CancelFunc) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type ErrInvalidPacket struct {
|
type ErrInvalidPacket struct {
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/go-iptables/iptables"
|
"github.com/coreos/go-iptables/iptables"
|
||||||
"github.com/florianl/go-nfqueue"
|
"github.com/florianl/go-nfqueue"
|
||||||
@@ -189,6 +190,12 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error
|
|||||||
streamID: ctIDFromCtBytes(*a.Ct),
|
streamID: ctIDFromCtBytes(*a.Ct),
|
||||||
data: *a.Payload,
|
data: *a.Payload,
|
||||||
}
|
}
|
||||||
|
// Use timestamp from attribute if available, otherwise use current time as fallback
|
||||||
|
if a.Timestamp != nil {
|
||||||
|
p.timestamp = *a.Timestamp
|
||||||
|
} else {
|
||||||
|
p.timestamp = time.Now()
|
||||||
|
}
|
||||||
return okBoolToInt(cb(p, nil))
|
return okBoolToInt(cb(p, nil))
|
||||||
},
|
},
|
||||||
func(e error) int {
|
func(e error) int {
|
||||||
@@ -274,6 +281,11 @@ func (n *nfqueuePacketIO) Close() error {
|
|||||||
return n.n.Close()
|
return n.n.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nfqueue IO does not issue shutdown
|
||||||
|
func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error {
|
func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error {
|
||||||
rules, err := generateNftRules(local, rst)
|
rules, err := generateNftRules(local, rst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -312,15 +324,20 @@ func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error {
|
|||||||
var _ Packet = (*nfqueuePacket)(nil)
|
var _ Packet = (*nfqueuePacket)(nil)
|
||||||
|
|
||||||
type nfqueuePacket struct {
|
type nfqueuePacket struct {
|
||||||
id uint32
|
id uint32
|
||||||
streamID uint32
|
streamID uint32
|
||||||
data []byte
|
timestamp time.Time
|
||||||
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *nfqueuePacket) StreamID() uint32 {
|
func (p *nfqueuePacket) StreamID() uint32 {
|
||||||
return p.streamID
|
return p.streamID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *nfqueuePacket) Timestamp() time.Time {
|
||||||
|
return p.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
func (p *nfqueuePacket) Data() []byte {
|
func (p *nfqueuePacket) Data() []byte {
|
||||||
return p.data
|
return p.data
|
||||||
}
|
}
|
||||||
|
|||||||
136
io/pcap.go
Normal file
136
io/pcap.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package io
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"hash/crc32"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/gopacket"
|
||||||
|
"github.com/google/gopacket/pcapgo"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ PacketIO = (*pcapPacketIO)(nil)
|
||||||
|
|
||||||
|
type pcapPacketIO struct {
|
||||||
|
pcapFile io.ReadCloser
|
||||||
|
pcap *pcapgo.Reader
|
||||||
|
timeOffset *time.Duration
|
||||||
|
ioCancel context.CancelFunc
|
||||||
|
config PcapPacketIOConfig
|
||||||
|
|
||||||
|
dialer *net.Dialer
|
||||||
|
}
|
||||||
|
|
||||||
|
type PcapPacketIOConfig struct {
|
||||||
|
PcapFile string
|
||||||
|
Realtime bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) {
|
||||||
|
pcapFile, err := os.Open(config.PcapFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
handle, err := pcapgo.NewReader(pcapFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pcapPacketIO{
|
||||||
|
pcapFile: pcapFile,
|
||||||
|
pcap: handle,
|
||||||
|
timeOffset: nil,
|
||||||
|
ioCancel: nil,
|
||||||
|
config: config,
|
||||||
|
dialer: &net.Dialer{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error {
|
||||||
|
go func() {
|
||||||
|
packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType())
|
||||||
|
for packet := range packetSource.Packets() {
|
||||||
|
p.wait(packet)
|
||||||
|
|
||||||
|
networkLayer := packet.NetworkLayer()
|
||||||
|
if networkLayer != nil {
|
||||||
|
src, dst := networkLayer.NetworkFlow().Endpoints()
|
||||||
|
endpoints := []string{src.String(), dst.String()}
|
||||||
|
sort.Strings(endpoints)
|
||||||
|
id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable)
|
||||||
|
|
||||||
|
cb(&pcapPacket{
|
||||||
|
streamID: id,
|
||||||
|
timestamp: packet.Metadata().Timestamp,
|
||||||
|
data: packet.LinkLayer().LayerPayload(),
|
||||||
|
}, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Give the workers a chance to finish everything
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
// Stop the engine when all packets are finished
|
||||||
|
p.ioCancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// A normal dialer is sufficient as pcap IO does not mess up with the networking
|
||||||
|
func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
|
return p.dialer.DialContext(ctx, network, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
|
||||||
|
p.ioCancel = cancelFunc
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pcapPacketIO) Close() error {
|
||||||
|
return p.pcapFile.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Intentionally slow down the replay
|
||||||
|
// In realtime mode, this is to match the timestamps in the capture
|
||||||
|
func (p *pcapPacketIO) wait(packet gopacket.Packet) {
|
||||||
|
if !p.config.Realtime {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.timeOffset == nil {
|
||||||
|
offset := time.Since(packet.Metadata().Timestamp)
|
||||||
|
p.timeOffset = &offset
|
||||||
|
} else {
|
||||||
|
t := time.Until(packet.Metadata().Timestamp.Add(*p.timeOffset))
|
||||||
|
time.Sleep(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Packet = (*pcapPacket)(nil)
|
||||||
|
|
||||||
|
type pcapPacket struct {
|
||||||
|
streamID uint32
|
||||||
|
timestamp time.Time
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pcapPacket) StreamID() uint32 {
|
||||||
|
return p.streamID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pcapPacket) Timestamp() time.Time {
|
||||||
|
return p.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pcapPacket) Data() []byte {
|
||||||
|
return p.data
|
||||||
|
}
|
||||||
@@ -20,7 +20,6 @@ import (
|
|||||||
"github.com/apernet/OpenGFW/analyzer"
|
"github.com/apernet/OpenGFW/analyzer"
|
||||||
"github.com/apernet/OpenGFW/modifier"
|
"github.com/apernet/OpenGFW/modifier"
|
||||||
"github.com/apernet/OpenGFW/ruleset/builtins"
|
"github.com/apernet/OpenGFW/ruleset/builtins"
|
||||||
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExprRule is the external representation of an expression rule.
|
// ExprRule is the external representation of an expression rule.
|
||||||
@@ -302,23 +301,22 @@ type Function struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func buildFunctionMap(config *BuiltinConfig) map[string]*Function {
|
func buildFunctionMap(config *BuiltinConfig) map[string]*Function {
|
||||||
geoMatcher := geo.NewGeoMatcher(config.GeoSiteFilename, config.GeoIpFilename)
|
|
||||||
return map[string]*Function{
|
return map[string]*Function{
|
||||||
"geoip": {
|
"geoip": {
|
||||||
InitFunc: geoMatcher.LoadGeoIP,
|
InitFunc: config.GeoMatcher.LoadGeoIP,
|
||||||
PatchFunc: nil,
|
PatchFunc: nil,
|
||||||
Func: func(params ...any) (any, error) {
|
Func: func(params ...any) (any, error) {
|
||||||
return geoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil
|
return config.GeoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil
|
||||||
},
|
},
|
||||||
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoIp)},
|
Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoIp)},
|
||||||
},
|
},
|
||||||
"geosite": {
|
"geosite": {
|
||||||
InitFunc: geoMatcher.LoadGeoSite,
|
InitFunc: config.GeoMatcher.LoadGeoSite,
|
||||||
PatchFunc: nil,
|
PatchFunc: nil,
|
||||||
Func: func(params ...any) (any, error) {
|
Func: func(params ...any) (any, error) {
|
||||||
return geoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil
|
return config.GeoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil
|
||||||
},
|
},
|
||||||
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoSite)},
|
Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoSite)},
|
||||||
},
|
},
|
||||||
"cidr": {
|
"cidr": {
|
||||||
InitFunc: nil,
|
InitFunc: nil,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/apernet/OpenGFW/analyzer"
|
"github.com/apernet/OpenGFW/analyzer"
|
||||||
"github.com/apernet/OpenGFW/modifier"
|
"github.com/apernet/OpenGFW/modifier"
|
||||||
|
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Action int
|
type Action int
|
||||||
@@ -102,7 +103,6 @@ type Logger interface {
|
|||||||
|
|
||||||
type BuiltinConfig struct {
|
type BuiltinConfig struct {
|
||||||
Logger Logger
|
Logger Logger
|
||||||
GeoSiteFilename string
|
GeoMatcher *geo.GeoMatcher
|
||||||
GeoIpFilename string
|
|
||||||
ProtectedDialContext func(ctx context.Context, network, address string) (net.Conn, error)
|
ProtectedDialContext func(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user