Compare commits
35 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
278d731b6f | ||
|
|
0e97c9f086 | ||
|
|
5f4df7e806 | ||
|
|
d8d7c5b477 | ||
|
|
d3f1785ac9 | ||
|
|
1de95ed53e | ||
|
|
1934c065ec | ||
|
|
301f9af3d4 | ||
|
|
cb0427bfbb | ||
|
|
7456e5907e | ||
|
|
8cab86b924 | ||
|
|
3ec5456e86 | ||
|
|
b51ea5fa07 | ||
|
|
2ac8783eb6 | ||
|
|
5014523ae0 | ||
|
|
dabcc9566c | ||
|
|
c453020349 | ||
|
|
0daaa32fc6 | ||
|
|
5e15fd6dd9 | ||
|
|
76c0f47832 | ||
|
|
70fee14103 | ||
|
|
abd7725fed | ||
|
|
f01b79e625 | ||
|
|
94387450cf | ||
|
|
5723490a6c | ||
|
|
d7506264ad | ||
|
|
245ac46b65 | ||
|
|
107e29ee20 | ||
|
|
5f447d4e31 | ||
|
|
347667a2bd | ||
|
|
393c29bd2d | ||
|
|
9c0893c512 | ||
|
|
ae34b4856a | ||
|
|
d7737e9211 | ||
|
|
dd9ecc3dd7 |
@@ -18,8 +18,8 @@ Telegram グループ: https://t.me/OpGFW
|
||||
## 特徴
|
||||
|
||||
- フル IP/TCP 再アセンブル、各種プロトコルアナライザー
|
||||
- HTTP、TLS、QUIC、DNS、SSH、SOCKS4/5、WireGuard、その他多数
|
||||
- Shadowsocks の「完全に暗号化されたトラフィック」の検出など (https://gfw.report/publications/usenixsecurity23/en/)
|
||||
- HTTP、TLS、QUIC、DNS、SSH、SOCKS4/5、WireGuard、OpenVPN、その他多数
|
||||
- Shadowsocks、VMess の「完全に暗号化されたトラフィック」の検出など (https://gfw.report/publications/usenixsecurity23/en/)
|
||||
- Trojan プロキシプロトコルの検出
|
||||
- [WIP] 機械学習に基づくトラフィック分類
|
||||
- IPv4 と IPv6 をフルサポート
|
||||
|
||||
@@ -21,8 +21,8 @@ Telegram group: https://t.me/OpGFW
|
||||
## Features
|
||||
|
||||
- Full IP/TCP reassembly, various protocol analyzers
|
||||
- HTTP, TLS, QUIC, DNS, SSH, SOCKS4/5, WireGuard, and many more to come
|
||||
- "Fully encrypted traffic" detection for Shadowsocks,
|
||||
- HTTP, TLS, QUIC, DNS, SSH, SOCKS4/5, WireGuard, OpenVPN, and many more to come
|
||||
- "Fully encrypted traffic" detection for Shadowsocks, VMess,
|
||||
etc. (https://gfw.report/publications/usenixsecurity23/en/)
|
||||
- Trojan (proxy protocol) detection
|
||||
- [WIP] Machine learning based traffic classification
|
||||
|
||||
@@ -18,8 +18,8 @@ Telegram 群组: https://t.me/OpGFW
|
||||
## 功能
|
||||
|
||||
- 完整的 IP/TCP 重组,各种协议解析器
|
||||
- HTTP, TLS, QUIC, DNS, SSH, SOCKS4/5, WireGuard, 更多协议正在开发中
|
||||
- Shadowsocks 等 "全加密流量" 检测 (https://gfw.report/publications/usenixsecurity23/zh/)
|
||||
- HTTP, TLS, QUIC, DNS, SSH, SOCKS4/5, WireGuard, OpenVPN, 更多协议正在开发中
|
||||
- Shadowsocks, VMess 等 "全加密流量" 检测 (https://gfw.report/publications/usenixsecurity23/zh/)
|
||||
- Trojan 协议检测
|
||||
- [开发中] 基于机器学习的流量分类
|
||||
- 同等支持 IPv4 和 IPv6
|
||||
|
||||
128
cmd/root.go
128
cmd/root.go
@@ -7,6 +7,7 @@ import (
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/apernet/OpenGFW/analyzer"
|
||||
"github.com/apernet/OpenGFW/analyzer/tcp"
|
||||
@@ -16,6 +17,7 @@ import (
|
||||
"github.com/apernet/OpenGFW/modifier"
|
||||
modUDP "github.com/apernet/OpenGFW/modifier/udp"
|
||||
"github.com/apernet/OpenGFW/ruleset"
|
||||
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
@@ -41,6 +43,7 @@ var logger *zap.Logger
|
||||
// Flags
|
||||
var (
|
||||
cfgFile string
|
||||
pcapFile string
|
||||
logLevel string
|
||||
logFormat string
|
||||
)
|
||||
@@ -116,6 +119,7 @@ func init() {
|
||||
|
||||
func initFlags() {
|
||||
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file")
|
||||
rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)")
|
||||
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level")
|
||||
rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format")
|
||||
}
|
||||
@@ -165,22 +169,33 @@ type cliConfig struct {
|
||||
IO cliConfigIO `mapstructure:"io"`
|
||||
Workers cliConfigWorkers `mapstructure:"workers"`
|
||||
Ruleset cliConfigRuleset `mapstructure:"ruleset"`
|
||||
Replay cliConfigReplay `mapstructure:"replay"`
|
||||
}
|
||||
|
||||
type cliConfigIO struct {
|
||||
QueueSize uint32 `mapstructure:"queueSize"`
|
||||
ReadBuffer int `mapstructure:"rcvBuf"`
|
||||
WriteBuffer int `mapstructure:"sndBuf"`
|
||||
Local bool `mapstructure:"local"`
|
||||
RST bool `mapstructure:"rst"`
|
||||
QueueSize uint32 `mapstructure:"queueSize"`
|
||||
QueueNum *uint16 `mapstructure:"queueNum"`
|
||||
Table string `mapstructure:"table"`
|
||||
ConnMarkAccept uint32 `mapstructure:"connMarkAccept"`
|
||||
ConnMarkDrop uint32 `mapstructure:"connMarkDrop"`
|
||||
|
||||
ReadBuffer int `mapstructure:"rcvBuf"`
|
||||
WriteBuffer int `mapstructure:"sndBuf"`
|
||||
Local bool `mapstructure:"local"`
|
||||
RST bool `mapstructure:"rst"`
|
||||
}
|
||||
|
||||
type cliConfigReplay struct {
|
||||
Realtime bool `mapstructure:"realtime"`
|
||||
}
|
||||
|
||||
type cliConfigWorkers struct {
|
||||
Count int `mapstructure:"count"`
|
||||
QueueSize int `mapstructure:"queueSize"`
|
||||
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
|
||||
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
|
||||
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
|
||||
Count int `mapstructure:"count"`
|
||||
QueueSize int `mapstructure:"queueSize"`
|
||||
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
|
||||
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
|
||||
TCPTimeout time.Duration `mapstructure:"tcpTimeout"`
|
||||
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
|
||||
}
|
||||
|
||||
type cliConfigRuleset struct {
|
||||
@@ -194,17 +209,35 @@ func (c *cliConfig) fillLogger(config *engine.Config) error {
|
||||
}
|
||||
|
||||
func (c *cliConfig) fillIO(config *engine.Config) error {
|
||||
nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
|
||||
QueueSize: c.IO.QueueSize,
|
||||
ReadBuffer: c.IO.ReadBuffer,
|
||||
WriteBuffer: c.IO.WriteBuffer,
|
||||
Local: c.IO.Local,
|
||||
RST: c.IO.RST,
|
||||
})
|
||||
var ioImpl io.PacketIO
|
||||
var err error
|
||||
if pcapFile != "" {
|
||||
// Setup IO for pcap file replay
|
||||
logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile))
|
||||
ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{
|
||||
PcapFile: pcapFile,
|
||||
Realtime: c.Replay.Realtime,
|
||||
})
|
||||
} else {
|
||||
// Setup IO for nfqueue
|
||||
ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
|
||||
QueueSize: c.IO.QueueSize,
|
||||
QueueNum: c.IO.QueueNum,
|
||||
Table: c.IO.Table,
|
||||
ConnMarkAccept: c.IO.ConnMarkAccept,
|
||||
ConnMarkDrop: c.IO.ConnMarkDrop,
|
||||
|
||||
ReadBuffer: c.IO.ReadBuffer,
|
||||
WriteBuffer: c.IO.WriteBuffer,
|
||||
Local: c.IO.Local,
|
||||
RST: c.IO.RST,
|
||||
})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return configError{Field: "io", Err: err}
|
||||
}
|
||||
config.IOs = []io.PacketIO{nfio}
|
||||
config.IO = ioImpl
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -213,6 +246,7 @@ func (c *cliConfig) fillWorkers(config *engine.Config) error {
|
||||
config.WorkerQueueSize = c.Workers.QueueSize
|
||||
config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal
|
||||
config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn
|
||||
config.WorkerTCPTimeout = c.Workers.TCPTimeout
|
||||
config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams
|
||||
return nil
|
||||
}
|
||||
@@ -247,12 +281,7 @@ func runMain(cmd *cobra.Command, args []string) {
|
||||
if err != nil {
|
||||
logger.Fatal("failed to parse config", zap.Error(err))
|
||||
}
|
||||
defer func() {
|
||||
// Make sure to close all IOs on exit
|
||||
for _, i := range engineConfig.IOs {
|
||||
_ = i.Close()
|
||||
}
|
||||
}()
|
||||
defer engineConfig.IO.Close() // Make sure to close IO on exit
|
||||
|
||||
// Ruleset
|
||||
rawRs, err := ruleset.ExprRulesFromYAML(args[0])
|
||||
@@ -260,9 +289,9 @@ func runMain(cmd *cobra.Command, args []string) {
|
||||
logger.Fatal("failed to load rules", zap.Error(err))
|
||||
}
|
||||
rsConfig := &ruleset.BuiltinConfig{
|
||||
Logger: &rulesetLogger{},
|
||||
GeoSiteFilename: config.Ruleset.GeoSite,
|
||||
GeoIpFilename: config.Ruleset.GeoIp,
|
||||
Logger: &rulesetLogger{},
|
||||
GeoMatcher: geo.NewGeoMatcher(config.Ruleset.GeoSite, config.Ruleset.GeoIp),
|
||||
ProtectedDialContext: engineConfig.IO.ProtectedDialContext,
|
||||
}
|
||||
rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig)
|
||||
if err != nil {
|
||||
@@ -344,12 +373,26 @@ func (l *engineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
||||
}
|
||||
|
||||
func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
||||
logger.Info("TCP stream action",
|
||||
zap.Int64("id", info.ID),
|
||||
zap.String("src", info.SrcString()),
|
||||
zap.String("dst", info.DstString()),
|
||||
zap.String("action", action.String()),
|
||||
zap.Bool("noMatch", noMatch))
|
||||
if noMatch {
|
||||
logger.Debug("TCP stream no match",
|
||||
zap.Int64("id", info.ID),
|
||||
zap.String("src", info.SrcString()),
|
||||
zap.String("dst", info.DstString()),
|
||||
zap.String("action", action.String()))
|
||||
} else {
|
||||
logger.Info("TCP stream action",
|
||||
zap.Int64("id", info.ID),
|
||||
zap.String("src", info.SrcString()),
|
||||
zap.String("dst", info.DstString()),
|
||||
zap.String("action", action.String()))
|
||||
}
|
||||
}
|
||||
|
||||
func (l *engineLogger) TCPFlush(workerID, flushed, closed int) {
|
||||
logger.Debug("TCP flush",
|
||||
zap.Int("workerID", workerID),
|
||||
zap.Int("flushed", flushed),
|
||||
zap.Int("closed", closed))
|
||||
}
|
||||
|
||||
func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {
|
||||
@@ -370,12 +413,19 @@ func (l *engineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
||||
}
|
||||
|
||||
func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
||||
logger.Info("UDP stream action",
|
||||
zap.Int64("id", info.ID),
|
||||
zap.String("src", info.SrcString()),
|
||||
zap.String("dst", info.DstString()),
|
||||
zap.String("action", action.String()),
|
||||
zap.Bool("noMatch", noMatch))
|
||||
if noMatch {
|
||||
logger.Debug("UDP stream no match",
|
||||
zap.Int64("id", info.ID),
|
||||
zap.String("src", info.SrcString()),
|
||||
zap.String("dst", info.DstString()),
|
||||
zap.String("action", action.String()))
|
||||
} else {
|
||||
logger.Info("UDP stream action",
|
||||
zap.Int64("id", info.ID),
|
||||
zap.String("src", info.SrcString()),
|
||||
zap.String("dst", info.DstString()),
|
||||
zap.String("action", action.String()))
|
||||
}
|
||||
}
|
||||
|
||||
func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) {
|
||||
|
||||
@@ -15,7 +15,7 @@ var _ Engine = (*engine)(nil)
|
||||
|
||||
type engine struct {
|
||||
logger Logger
|
||||
ioList []io.PacketIO
|
||||
io io.PacketIO
|
||||
workers []*worker
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ func NewEngine(config Config) (Engine, error) {
|
||||
Ruleset: config.Ruleset,
|
||||
TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal,
|
||||
TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn,
|
||||
TCPTimeout: config.WorkerTCPTimeout,
|
||||
UDPMaxStreams: config.WorkerUDPMaxStreams,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -42,7 +43,7 @@ func NewEngine(config Config) (Engine, error) {
|
||||
}
|
||||
return &engine{
|
||||
logger: config.Logger,
|
||||
ioList: config.IOs,
|
||||
io: config.IO,
|
||||
workers: workers,
|
||||
}, nil
|
||||
}
|
||||
@@ -57,28 +58,30 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error {
|
||||
}
|
||||
|
||||
func (e *engine) Run(ctx context.Context) error {
|
||||
workerCtx, workerCancel := context.WithCancel(ctx)
|
||||
defer workerCancel() // Stop workers
|
||||
|
||||
// Register IO shutdown
|
||||
ioCtx, ioCancel := context.WithCancel(ctx)
|
||||
defer ioCancel() // Stop workers & IOs
|
||||
e.io.SetCancelFunc(ioCancel)
|
||||
defer ioCancel() // Stop IO
|
||||
|
||||
// Start workers
|
||||
for _, w := range e.workers {
|
||||
go w.Run(ioCtx)
|
||||
go w.Run(workerCtx)
|
||||
}
|
||||
|
||||
// Register callbacks
|
||||
errChan := make(chan error, len(e.ioList))
|
||||
for _, i := range e.ioList {
|
||||
ioEntry := i // Make sure dispatch() uses the correct ioEntry
|
||||
err := ioEntry.Register(ioCtx, func(p io.Packet, err error) bool {
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return false
|
||||
}
|
||||
return e.dispatch(ioEntry, p)
|
||||
})
|
||||
// Register IO callback
|
||||
errChan := make(chan error, 1)
|
||||
err := e.io.Register(ioCtx, func(p io.Packet, err error) bool {
|
||||
if err != nil {
|
||||
return err
|
||||
errChan <- err
|
||||
return false
|
||||
}
|
||||
return e.dispatch(p)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Block until IO errors or context is cancelled
|
||||
@@ -87,12 +90,13 @@ func (e *engine) Run(ctx context.Context) error {
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ioCtx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch dispatches a packet to a worker.
|
||||
// This must be safe for concurrent use, as it may be called from multiple IOs.
|
||||
func (e *engine) dispatch(ioEntry io.PacketIO, p io.Packet) bool {
|
||||
func (e *engine) dispatch(p io.Packet) bool {
|
||||
data := p.Data()
|
||||
ipVersion := data[0] >> 4
|
||||
var layerType gopacket.LayerType
|
||||
@@ -102,17 +106,19 @@ func (e *engine) dispatch(ioEntry io.PacketIO, p io.Packet) bool {
|
||||
layerType = layers.LayerTypeIPv6
|
||||
} else {
|
||||
// Unsupported network layer
|
||||
_ = ioEntry.SetVerdict(p, io.VerdictAcceptStream, nil)
|
||||
_ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil)
|
||||
return true
|
||||
}
|
||||
// Convert to gopacket.Packet
|
||||
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
|
||||
packet.Metadata().Timestamp = p.Timestamp()
|
||||
// Load balance by stream ID
|
||||
index := p.StreamID() % uint32(len(e.workers))
|
||||
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
|
||||
e.workers[index].Feed(&workerPacket{
|
||||
StreamID: p.StreamID(),
|
||||
Packet: packet,
|
||||
SetVerdict: func(v io.Verdict, b []byte) error {
|
||||
return ioEntry.SetVerdict(p, v, b)
|
||||
return e.io.SetVerdict(p, v, b)
|
||||
},
|
||||
})
|
||||
return true
|
||||
|
||||
@@ -2,6 +2,7 @@ package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/apernet/OpenGFW/io"
|
||||
"github.com/apernet/OpenGFW/ruleset"
|
||||
@@ -18,13 +19,14 @@ type Engine interface {
|
||||
// Config is the configuration for the engine.
|
||||
type Config struct {
|
||||
Logger Logger
|
||||
IOs []io.PacketIO
|
||||
IO io.PacketIO
|
||||
Ruleset ruleset.Ruleset
|
||||
|
||||
Workers int // Number of workers. Zero or negative means auto (number of CPU cores).
|
||||
WorkerQueueSize int
|
||||
WorkerTCPMaxBufferedPagesTotal int
|
||||
WorkerTCPMaxBufferedPagesPerConn int
|
||||
WorkerTCPTimeout time.Duration
|
||||
WorkerUDPMaxStreams int
|
||||
}
|
||||
|
||||
@@ -36,6 +38,7 @@ type Logger interface {
|
||||
TCPStreamNew(workerID int, info ruleset.StreamInfo)
|
||||
TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
||||
TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool)
|
||||
TCPFlush(workerID, flushed, closed int)
|
||||
|
||||
UDPStreamNew(workerID int, info ruleset.StreamInfo)
|
||||
UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
|
||||
|
||||
@@ -2,6 +2,7 @@ package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/apernet/OpenGFW/io"
|
||||
"github.com/apernet/OpenGFW/ruleset"
|
||||
@@ -14,9 +15,12 @@ import (
|
||||
|
||||
const (
|
||||
defaultChanSize = 64
|
||||
defaultTCPMaxBufferedPagesTotal = 4096
|
||||
defaultTCPMaxBufferedPagesPerConnection = 64
|
||||
defaultTCPMaxBufferedPagesTotal = 65536
|
||||
defaultTCPMaxBufferedPagesPerConnection = 16
|
||||
defaultTCPTimeout = 10 * time.Minute
|
||||
defaultUDPMaxStreams = 4096
|
||||
|
||||
tcpFlushInterval = 1 * time.Minute
|
||||
)
|
||||
|
||||
type workerPacket struct {
|
||||
@@ -33,6 +37,7 @@ type worker struct {
|
||||
tcpStreamFactory *tcpStreamFactory
|
||||
tcpStreamPool *reassembly.StreamPool
|
||||
tcpAssembler *reassembly.Assembler
|
||||
tcpTimeout time.Duration
|
||||
|
||||
udpStreamFactory *udpStreamFactory
|
||||
udpStreamManager *udpStreamManager
|
||||
@@ -47,6 +52,7 @@ type workerConfig struct {
|
||||
Ruleset ruleset.Ruleset
|
||||
TCPMaxBufferedPagesTotal int
|
||||
TCPMaxBufferedPagesPerConn int
|
||||
TCPTimeout time.Duration
|
||||
UDPMaxStreams int
|
||||
}
|
||||
|
||||
@@ -60,6 +66,9 @@ func (c *workerConfig) fillDefaults() {
|
||||
if c.TCPMaxBufferedPagesPerConn <= 0 {
|
||||
c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection
|
||||
}
|
||||
if c.TCPTimeout <= 0 {
|
||||
c.TCPTimeout = defaultTCPTimeout
|
||||
}
|
||||
if c.UDPMaxStreams <= 0 {
|
||||
c.UDPMaxStreams = defaultUDPMaxStreams
|
||||
}
|
||||
@@ -98,6 +107,7 @@ func newWorker(config workerConfig) (*worker, error) {
|
||||
tcpStreamFactory: tcpSF,
|
||||
tcpStreamPool: tcpStreamPool,
|
||||
tcpAssembler: tcpAssembler,
|
||||
tcpTimeout: config.TCPTimeout,
|
||||
udpStreamFactory: udpSF,
|
||||
udpStreamManager: udpSM,
|
||||
modSerializeBuffer: gopacket.NewSerializeBuffer(),
|
||||
@@ -111,6 +121,10 @@ func (w *worker) Feed(p *workerPacket) {
|
||||
func (w *worker) Run(ctx context.Context) {
|
||||
w.logger.WorkerStart(w.id)
|
||||
defer w.logger.WorkerStop(w.id)
|
||||
|
||||
tcpFlushTicker := time.NewTicker(tcpFlushInterval)
|
||||
defer tcpFlushTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -122,6 +136,8 @@ func (w *worker) Run(ctx context.Context) {
|
||||
}
|
||||
v, b := w.handle(wPkt.StreamID, wPkt.Packet)
|
||||
_ = wPkt.SetVerdict(v, b)
|
||||
case <-tcpFlushTicker.C:
|
||||
w.flushTCP(w.tcpTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -176,6 +192,11 @@ func (w *worker) handleTCP(ipFlow gopacket.Flow, pMeta *gopacket.PacketMetadata,
|
||||
return io.Verdict(ctx.Verdict)
|
||||
}
|
||||
|
||||
func (w *worker) flushTCP(timeout time.Duration) {
|
||||
flushed, closed := w.tcpAssembler.FlushCloseOlderThan(time.Now().Add(-timeout))
|
||||
w.logger.TCPFlush(w.id, flushed, closed)
|
||||
}
|
||||
|
||||
func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) {
|
||||
ctx := &udpContext{
|
||||
Verdict: udpVerdictAccept,
|
||||
|
||||
2
go.mod
2
go.mod
@@ -5,7 +5,7 @@ go 1.21
|
||||
require (
|
||||
github.com/bwmarrin/snowflake v0.3.0
|
||||
github.com/coreos/go-iptables v0.7.0
|
||||
github.com/expr-lang/expr v1.15.7
|
||||
github.com/expr-lang/expr v1.16.3
|
||||
github.com/florianl/go-nfqueue v1.3.2-0.20231218173729-f2bdeb033acf
|
||||
github.com/google/gopacket v1.1.20-0.20220810144506-32ee38206866
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
|
||||
4
go.sum
4
go.sum
@@ -7,8 +7,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/expr-lang/expr v1.15.7 h1:BK0JcWUkoW6nrbLBo6xCKhz4BvH5DSOOu1Gx5lucyZo=
|
||||
github.com/expr-lang/expr v1.15.7/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
|
||||
github.com/expr-lang/expr v1.16.3 h1:NLldf786GffptcXNxxJx5dQ+FzeWDKChBDqOOwyK8to=
|
||||
github.com/expr-lang/expr v1.16.3/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
|
||||
github.com/florianl/go-nfqueue v1.3.2-0.20231218173729-f2bdeb033acf h1:NqGS3vTHzVENbIfd87cXZwdpO6MB2R1PjHMJLi4Z3ow=
|
||||
github.com/florianl/go-nfqueue v1.3.2-0.20231218173729-f2bdeb033acf/go.mod h1:eSnAor2YCfMCVYrVNEhkLGN/r1L+J4uDjc0EUy0tfq4=
|
||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||
|
||||
@@ -2,6 +2,8 @@ package io
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Verdict int
|
||||
@@ -23,13 +25,14 @@ const (
|
||||
type Packet interface {
|
||||
// StreamID is the ID of the stream the packet belongs to.
|
||||
StreamID() uint32
|
||||
// Timestamp is the time the packet was received.
|
||||
Timestamp() time.Time
|
||||
// Data is the raw packet data, starting with the IP header.
|
||||
Data() []byte
|
||||
}
|
||||
|
||||
// PacketCallback is called for each packet received.
|
||||
// Return false to "unregister" and stop receiving packets.
|
||||
// It must be safe for concurrent use.
|
||||
type PacketCallback func(Packet, error) bool
|
||||
|
||||
type PacketIO interface {
|
||||
@@ -39,8 +42,15 @@ type PacketIO interface {
|
||||
Register(context.Context, PacketCallback) error
|
||||
// SetVerdict sets the verdict for a packet.
|
||||
SetVerdict(Packet, Verdict, []byte) error
|
||||
// ProtectedDialContext is like net.DialContext, but the connection is "protected"
|
||||
// in the sense that the packets sent/received through the connection must bypass
|
||||
// the packet IO and not be processed by the callback.
|
||||
ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||
// Close closes the packet IO.
|
||||
Close() error
|
||||
// SetCancelFunc gives packet IO access to context cancel function, enabling it to
|
||||
// trigger a shutdown
|
||||
SetCancelFunc(cancelFunc context.CancelFunc) error
|
||||
}
|
||||
|
||||
type ErrInvalidPacket struct {
|
||||
|
||||
166
io/nfqueue.go
166
io/nfqueue.go
@@ -5,9 +5,12 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-iptables/iptables"
|
||||
"github.com/florianl/go-nfqueue"
|
||||
@@ -16,29 +19,28 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
nfqueueNum = 100
|
||||
nfqueueDefaultQueueNum = 100
|
||||
nfqueueMaxPacketLen = 0xFFFF
|
||||
nfqueueDefaultQueueSize = 128
|
||||
|
||||
nfqueueConnMarkAccept = 1001
|
||||
nfqueueConnMarkDrop = 1002
|
||||
nfqueueDefaultConnMarkAccept = 1001
|
||||
|
||||
nftFamily = "inet"
|
||||
nftTable = "opengfw"
|
||||
nftFamily = "inet"
|
||||
nftDefaultTable = "opengfw"
|
||||
)
|
||||
|
||||
func generateNftRules(local, rst bool) (*nftTableSpec, error) {
|
||||
if local && rst {
|
||||
func (n *nfqueuePacketIO) generateNftRules() (*nftTableSpec, error) {
|
||||
if n.local && n.rst {
|
||||
return nil, errors.New("tcp rst is not supported in local mode")
|
||||
}
|
||||
table := &nftTableSpec{
|
||||
Family: nftFamily,
|
||||
Table: nftTable,
|
||||
Table: n.table,
|
||||
}
|
||||
table.Defines = append(table.Defines, fmt.Sprintf("define ACCEPT_CTMARK=%d", nfqueueConnMarkAccept))
|
||||
table.Defines = append(table.Defines, fmt.Sprintf("define DROP_CTMARK=%d", nfqueueConnMarkDrop))
|
||||
table.Defines = append(table.Defines, fmt.Sprintf("define QUEUE_NUM=%d", nfqueueNum))
|
||||
if local {
|
||||
table.Defines = append(table.Defines, fmt.Sprintf("define ACCEPT_CTMARK=%d", n.connMarkAccept))
|
||||
table.Defines = append(table.Defines, fmt.Sprintf("define DROP_CTMARK=%d", n.connMarkDrop))
|
||||
table.Defines = append(table.Defines, fmt.Sprintf("define QUEUE_NUM=%d", n.queueNum))
|
||||
if n.local {
|
||||
table.Chains = []nftChainSpec{
|
||||
{Chain: "INPUT", Header: "type filter hook input priority filter; policy accept;"},
|
||||
{Chain: "OUTPUT", Header: "type filter hook output priority filter; policy accept;"},
|
||||
@@ -50,8 +52,9 @@ func generateNftRules(local, rst bool) (*nftTableSpec, error) {
|
||||
}
|
||||
for i := range table.Chains {
|
||||
c := &table.Chains[i]
|
||||
c.Rules = append(c.Rules, "meta mark $ACCEPT_CTMARK ct mark set $ACCEPT_CTMARK") // Bypass protected connections
|
||||
c.Rules = append(c.Rules, "ct mark $ACCEPT_CTMARK counter accept")
|
||||
if rst {
|
||||
if n.rst {
|
||||
c.Rules = append(c.Rules, "ip protocol tcp ct mark $DROP_CTMARK counter reject with tcp reset")
|
||||
}
|
||||
c.Rules = append(c.Rules, "ct mark $DROP_CTMARK counter drop")
|
||||
@@ -60,24 +63,26 @@ func generateNftRules(local, rst bool) (*nftTableSpec, error) {
|
||||
return table, nil
|
||||
}
|
||||
|
||||
func generateIptRules(local, rst bool) ([]iptRule, error) {
|
||||
if local && rst {
|
||||
func (n *nfqueuePacketIO) generateIptRules() ([]iptRule, error) {
|
||||
if n.local && n.rst {
|
||||
return nil, errors.New("tcp rst is not supported in local mode")
|
||||
}
|
||||
var chains []string
|
||||
if local {
|
||||
if n.local {
|
||||
chains = []string{"INPUT", "OUTPUT"}
|
||||
} else {
|
||||
chains = []string{"FORWARD"}
|
||||
}
|
||||
rules := make([]iptRule, 0, 4*len(chains))
|
||||
for _, chain := range chains {
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkAccept), "-j", "ACCEPT"}})
|
||||
if rst {
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-p", "tcp", "-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkDrop), "-j", "REJECT", "--reject-with", "tcp-reset"}})
|
||||
// Bypass protected connections
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-m", "mark", "--mark", strconv.Itoa(n.connMarkAccept), "-j", "CONNMARK", "--set-mark", strconv.Itoa(n.connMarkAccept)}})
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(n.connMarkAccept), "-j", "ACCEPT"}})
|
||||
if n.rst {
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-p", "tcp", "-m", "connmark", "--mark", strconv.Itoa(n.connMarkDrop), "-j", "REJECT", "--reject-with", "tcp-reset"}})
|
||||
}
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkDrop), "-j", "DROP"}})
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-j", "NFQUEUE", "--queue-num", strconv.Itoa(nfqueueNum), "--queue-bypass"}})
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(n.connMarkDrop), "-j", "DROP"}})
|
||||
rules = append(rules, iptRule{"filter", chain, []string{"-j", "NFQUEUE", "--queue-num", strconv.Itoa(n.queueNum), "--queue-bypass"}})
|
||||
}
|
||||
|
||||
return rules, nil
|
||||
@@ -88,18 +93,29 @@ var _ PacketIO = (*nfqueuePacketIO)(nil)
|
||||
var errNotNFQueuePacket = errors.New("not an NFQueue packet")
|
||||
|
||||
type nfqueuePacketIO struct {
|
||||
n *nfqueue.Nfqueue
|
||||
local bool
|
||||
rst bool
|
||||
rSet bool // whether the nftables/iptables rules have been set
|
||||
n *nfqueue.Nfqueue
|
||||
local bool
|
||||
rst bool
|
||||
rSet bool // whether the nftables/iptables rules have been set
|
||||
queueNum int
|
||||
table string // nftable name
|
||||
connMarkAccept int
|
||||
connMarkDrop int
|
||||
|
||||
// iptables not nil = use iptables instead of nftables
|
||||
ipt4 *iptables.IPTables
|
||||
ipt6 *iptables.IPTables
|
||||
|
||||
protectedDialer *net.Dialer
|
||||
}
|
||||
|
||||
type NFQueuePacketIOConfig struct {
|
||||
QueueSize uint32
|
||||
QueueSize uint32
|
||||
QueueNum *uint16
|
||||
Table string
|
||||
ConnMarkAccept uint32
|
||||
ConnMarkDrop uint32
|
||||
|
||||
ReadBuffer int
|
||||
WriteBuffer int
|
||||
Local bool
|
||||
@@ -110,6 +126,26 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) {
|
||||
if config.QueueSize == 0 {
|
||||
config.QueueSize = nfqueueDefaultQueueSize
|
||||
}
|
||||
if config.QueueNum == nil {
|
||||
queueNum := uint16(nfqueueDefaultQueueNum)
|
||||
config.QueueNum = &queueNum
|
||||
}
|
||||
if config.Table == "" {
|
||||
config.Table = nftDefaultTable
|
||||
}
|
||||
if config.ConnMarkAccept == 0 {
|
||||
config.ConnMarkAccept = nfqueueDefaultConnMarkAccept
|
||||
}
|
||||
if config.ConnMarkDrop == 0 {
|
||||
config.ConnMarkDrop = config.ConnMarkAccept + 1
|
||||
if config.ConnMarkDrop == 0 {
|
||||
// Overflow
|
||||
config.ConnMarkDrop = 1
|
||||
}
|
||||
}
|
||||
if config.ConnMarkAccept == config.ConnMarkDrop {
|
||||
return nil, errors.New("connMarkAccept and connMarkDrop cannot be the same")
|
||||
}
|
||||
var ipt4, ipt6 *iptables.IPTables
|
||||
var err error
|
||||
if nftCheck() != nil {
|
||||
@@ -124,7 +160,7 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) {
|
||||
}
|
||||
}
|
||||
n, err := nfqueue.Open(&nfqueue.Config{
|
||||
NfQueue: nfqueueNum,
|
||||
NfQueue: *config.QueueNum,
|
||||
MaxPacketLen: nfqueueMaxPacketLen,
|
||||
MaxQueueLen: config.QueueSize,
|
||||
Copymode: nfqueue.NfQnlCopyPacket,
|
||||
@@ -148,11 +184,27 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) {
|
||||
}
|
||||
}
|
||||
return &nfqueuePacketIO{
|
||||
n: n,
|
||||
local: config.Local,
|
||||
rst: config.RST,
|
||||
ipt4: ipt4,
|
||||
ipt6: ipt6,
|
||||
n: n,
|
||||
local: config.Local,
|
||||
rst: config.RST,
|
||||
queueNum: int(*config.QueueNum),
|
||||
table: config.Table,
|
||||
connMarkAccept: int(config.ConnMarkAccept),
|
||||
connMarkDrop: int(config.ConnMarkDrop),
|
||||
ipt4: ipt4,
|
||||
ipt6: ipt6,
|
||||
protectedDialer: &net.Dialer{
|
||||
Control: func(network, address string, c syscall.RawConn) error {
|
||||
var err error
|
||||
cErr := c.Control(func(fd uintptr) {
|
||||
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, int(config.ConnMarkAccept))
|
||||
})
|
||||
if cErr != nil {
|
||||
return cErr
|
||||
}
|
||||
return err
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -170,6 +222,12 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error
|
||||
streamID: ctIDFromCtBytes(*a.Ct),
|
||||
data: *a.Payload,
|
||||
}
|
||||
// Use timestamp from attribute if available, otherwise use current time as fallback
|
||||
if a.Timestamp != nil {
|
||||
p.timestamp = *a.Timestamp
|
||||
} else {
|
||||
p.timestamp = time.Now()
|
||||
}
|
||||
return okBoolToInt(cb(p, nil))
|
||||
},
|
||||
func(e error) int {
|
||||
@@ -186,9 +244,9 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error
|
||||
}
|
||||
if !n.rSet {
|
||||
if n.ipt4 != nil {
|
||||
err = n.setupIpt(n.local, n.rst, false)
|
||||
err = n.setupIpt(false)
|
||||
} else {
|
||||
err = n.setupNft(n.local, n.rst, false)
|
||||
err = n.setupNft(false)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -228,40 +286,49 @@ func (n *nfqueuePacketIO) SetVerdict(p Packet, v Verdict, newPacket []byte) erro
|
||||
case VerdictAcceptModify:
|
||||
return n.n.SetVerdictModPacket(nP.id, nfqueue.NfAccept, newPacket)
|
||||
case VerdictAcceptStream:
|
||||
return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfAccept, nfqueueConnMarkAccept)
|
||||
return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfAccept, n.connMarkAccept)
|
||||
case VerdictDrop:
|
||||
return n.n.SetVerdict(nP.id, nfqueue.NfDrop)
|
||||
case VerdictDropStream:
|
||||
return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfDrop, nfqueueConnMarkDrop)
|
||||
return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfDrop, n.connMarkDrop)
|
||||
default:
|
||||
// Invalid verdict, ignore for now
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (n *nfqueuePacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return n.protectedDialer.DialContext(ctx, network, address)
|
||||
}
|
||||
|
||||
func (n *nfqueuePacketIO) Close() error {
|
||||
if n.rSet {
|
||||
if n.ipt4 != nil {
|
||||
_ = n.setupIpt(n.local, n.rst, true)
|
||||
_ = n.setupIpt(true)
|
||||
} else {
|
||||
_ = n.setupNft(n.local, n.rst, true)
|
||||
_ = n.setupNft(true)
|
||||
}
|
||||
n.rSet = false
|
||||
}
|
||||
return n.n.Close()
|
||||
}
|
||||
|
||||
func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error {
|
||||
rules, err := generateNftRules(local, rst)
|
||||
// nfqueue IO does not issue shutdown
|
||||
func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nfqueuePacketIO) setupNft(remove bool) error {
|
||||
rules, err := n.generateNftRules()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rulesText := rules.String()
|
||||
if remove {
|
||||
err = nftDelete(nftFamily, nftTable)
|
||||
err = nftDelete(nftFamily, n.table)
|
||||
} else {
|
||||
// Delete first to make sure no leftover rules
|
||||
_ = nftDelete(nftFamily, nftTable)
|
||||
_ = nftDelete(nftFamily, n.table)
|
||||
err = nftAdd(rulesText)
|
||||
}
|
||||
if err != nil {
|
||||
@@ -270,8 +337,8 @@ func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error {
|
||||
rules, err := generateIptRules(local, rst)
|
||||
func (n *nfqueuePacketIO) setupIpt(remove bool) error {
|
||||
rules, err := n.generateIptRules()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -289,15 +356,20 @@ func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error {
|
||||
var _ Packet = (*nfqueuePacket)(nil)
|
||||
|
||||
type nfqueuePacket struct {
|
||||
id uint32
|
||||
streamID uint32
|
||||
data []byte
|
||||
id uint32
|
||||
streamID uint32
|
||||
timestamp time.Time
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (p *nfqueuePacket) StreamID() uint32 {
|
||||
return p.streamID
|
||||
}
|
||||
|
||||
func (p *nfqueuePacket) Timestamp() time.Time {
|
||||
return p.timestamp
|
||||
}
|
||||
|
||||
func (p *nfqueuePacket) Data() []byte {
|
||||
return p.data
|
||||
}
|
||||
|
||||
136
io/pcap.go
Normal file
136
io/pcap.go
Normal file
@@ -0,0 +1,136 @@
|
||||
package io
|
||||
|
||||
import (
|
||||
"context"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/pcapgo"
|
||||
)
|
||||
|
||||
var _ PacketIO = (*pcapPacketIO)(nil)
|
||||
|
||||
type pcapPacketIO struct {
|
||||
pcapFile io.ReadCloser
|
||||
pcap *pcapgo.Reader
|
||||
timeOffset *time.Duration
|
||||
ioCancel context.CancelFunc
|
||||
config PcapPacketIOConfig
|
||||
|
||||
dialer *net.Dialer
|
||||
}
|
||||
|
||||
type PcapPacketIOConfig struct {
|
||||
PcapFile string
|
||||
Realtime bool
|
||||
}
|
||||
|
||||
func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) {
|
||||
pcapFile, err := os.Open(config.PcapFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
handle, err := pcapgo.NewReader(pcapFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pcapPacketIO{
|
||||
pcapFile: pcapFile,
|
||||
pcap: handle,
|
||||
timeOffset: nil,
|
||||
ioCancel: nil,
|
||||
config: config,
|
||||
dialer: &net.Dialer{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error {
|
||||
go func() {
|
||||
packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType())
|
||||
for packet := range packetSource.Packets() {
|
||||
p.wait(packet)
|
||||
|
||||
networkLayer := packet.NetworkLayer()
|
||||
if networkLayer != nil {
|
||||
src, dst := networkLayer.NetworkFlow().Endpoints()
|
||||
endpoints := []string{src.String(), dst.String()}
|
||||
sort.Strings(endpoints)
|
||||
id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable)
|
||||
|
||||
cb(&pcapPacket{
|
||||
streamID: id,
|
||||
timestamp: packet.Metadata().Timestamp,
|
||||
data: packet.LinkLayer().LayerPayload(),
|
||||
}, nil)
|
||||
}
|
||||
}
|
||||
// Give the workers a chance to finish everything
|
||||
time.Sleep(time.Second)
|
||||
// Stop the engine when all packets are finished
|
||||
p.ioCancel()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// A normal dialer is sufficient as pcap IO does not mess up with the networking
|
||||
func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return p.dialer.DialContext(ctx, network, address)
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
|
||||
p.ioCancel = cancelFunc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) Close() error {
|
||||
return p.pcapFile.Close()
|
||||
}
|
||||
|
||||
// Intentionally slow down the replay
|
||||
// In realtime mode, this is to match the timestamps in the capture
|
||||
func (p *pcapPacketIO) wait(packet gopacket.Packet) {
|
||||
if !p.config.Realtime {
|
||||
return
|
||||
}
|
||||
|
||||
if p.timeOffset == nil {
|
||||
offset := time.Since(packet.Metadata().Timestamp)
|
||||
p.timeOffset = &offset
|
||||
} else {
|
||||
t := time.Until(packet.Metadata().Timestamp.Add(*p.timeOffset))
|
||||
time.Sleep(t)
|
||||
}
|
||||
}
|
||||
|
||||
var _ Packet = (*pcapPacket)(nil)
|
||||
|
||||
type pcapPacket struct {
|
||||
streamID uint32
|
||||
timestamp time.Time
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (p *pcapPacket) StreamID() uint32 {
|
||||
return p.streamID
|
||||
}
|
||||
|
||||
func (p *pcapPacket) Timestamp() time.Time {
|
||||
return p.timestamp
|
||||
}
|
||||
|
||||
func (p *pcapPacket) Data() []byte {
|
||||
return p.data
|
||||
}
|
||||
@@ -14,14 +14,12 @@ type GeoMatcher struct {
|
||||
ipMatcherLock sync.Mutex
|
||||
}
|
||||
|
||||
func NewGeoMatcher(geoSiteFilename, geoIpFilename string) (*GeoMatcher, error) {
|
||||
geoLoader := NewDefaultGeoLoader(geoSiteFilename, geoIpFilename)
|
||||
|
||||
func NewGeoMatcher(geoSiteFilename, geoIpFilename string) *GeoMatcher {
|
||||
return &GeoMatcher{
|
||||
geoLoader: geoLoader,
|
||||
geoLoader: NewDefaultGeoLoader(geoSiteFilename, geoIpFilename),
|
||||
geoSiteMatcher: make(map[string]hostMatcher),
|
||||
geoIpMatcher: make(map[string]hostMatcher),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GeoMatcher) MatchGeoIp(ip, condition string) bool {
|
||||
|
||||
188
ruleset/expr.go
188
ruleset/expr.go
@@ -1,11 +1,15 @@
|
||||
package ruleset
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/expr-lang/expr/builtin"
|
||||
|
||||
"github.com/expr-lang/expr"
|
||||
"github.com/expr-lang/expr/ast"
|
||||
@@ -16,7 +20,6 @@ import (
|
||||
"github.com/apernet/OpenGFW/analyzer"
|
||||
"github.com/apernet/OpenGFW/modifier"
|
||||
"github.com/apernet/OpenGFW/ruleset/builtins"
|
||||
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
|
||||
)
|
||||
|
||||
// ExprRule is the external representation of an expression rule.
|
||||
@@ -55,10 +58,9 @@ type compiledExprRule struct {
|
||||
var _ Ruleset = (*exprRuleset)(nil)
|
||||
|
||||
type exprRuleset struct {
|
||||
Rules []compiledExprRule
|
||||
Ans []analyzer.Analyzer
|
||||
Logger Logger
|
||||
GeoMatcher *geo.GeoMatcher
|
||||
Rules []compiledExprRule
|
||||
Ans []analyzer.Analyzer
|
||||
Logger Logger
|
||||
}
|
||||
|
||||
func (r *exprRuleset) Analyzers(info StreamInfo) []analyzer.Analyzer {
|
||||
@@ -100,10 +102,7 @@ func CompileExprRules(rules []ExprRule, ans []analyzer.Analyzer, mods []modifier
|
||||
fullAnMap := analyzersToMap(ans)
|
||||
fullModMap := modifiersToMap(mods)
|
||||
depAnMap := make(map[string]analyzer.Analyzer)
|
||||
geoMatcher, err := geo.NewGeoMatcher(config.GeoSiteFilename, config.GeoIpFilename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
funcMap := buildFunctionMap(config)
|
||||
// Compile all rules and build a map of analyzers that are used by the rules.
|
||||
for _, rule := range rules {
|
||||
if rule.Action == "" && !rule.Log {
|
||||
@@ -118,13 +117,19 @@ func CompileExprRules(rules []ExprRule, ans []analyzer.Analyzer, mods []modifier
|
||||
action = &a
|
||||
}
|
||||
visitor := &idVisitor{Variables: make(map[string]bool), Identifiers: make(map[string]bool)}
|
||||
patcher := &idPatcher{}
|
||||
patcher := &idPatcher{FuncMap: funcMap}
|
||||
program, err := expr.Compile(rule.Expr,
|
||||
func(c *conf.Config) {
|
||||
c.Strict = false
|
||||
c.Expect = reflect.Bool
|
||||
c.Visitors = append(c.Visitors, visitor, patcher)
|
||||
registerBuiltinFunctions(c.Functions, geoMatcher)
|
||||
for name, f := range funcMap {
|
||||
c.Functions[name] = &builtin.Function{
|
||||
Name: name,
|
||||
Func: f.Func,
|
||||
Types: f.Types,
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
@@ -138,24 +143,15 @@ func CompileExprRules(rules []ExprRule, ans []analyzer.Analyzer, mods []modifier
|
||||
if isBuiltInAnalyzer(name) || visitor.Variables[name] {
|
||||
continue
|
||||
}
|
||||
// Check if it's one of the built-in functions, and if so,
|
||||
// skip it as an analyzer & do initialization if necessary.
|
||||
switch name {
|
||||
case "geoip":
|
||||
if err := geoMatcher.LoadGeoIP(); err != nil {
|
||||
return nil, fmt.Errorf("rule %q failed to load geoip: %w", rule.Name, err)
|
||||
}
|
||||
case "geosite":
|
||||
if err := geoMatcher.LoadGeoSite(); err != nil {
|
||||
return nil, fmt.Errorf("rule %q failed to load geosite: %w", rule.Name, err)
|
||||
}
|
||||
case "cidr":
|
||||
// No initialization needed for CIDR.
|
||||
default:
|
||||
a, ok := fullAnMap[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("rule %q uses unknown analyzer %q", rule.Name, name)
|
||||
if f, ok := funcMap[name]; ok {
|
||||
// Built-in function, initialize if necessary
|
||||
if f.InitFunc != nil {
|
||||
if err := f.InitFunc(); err != nil {
|
||||
return nil, fmt.Errorf("rule %q failed to initialize function %q: %w", rule.Name, name, err)
|
||||
}
|
||||
}
|
||||
} else if a, ok := fullAnMap[name]; ok {
|
||||
// Analyzer, add to dependency map
|
||||
depAnMap[name] = a
|
||||
}
|
||||
}
|
||||
@@ -184,37 +180,12 @@ func CompileExprRules(rules []ExprRule, ans []analyzer.Analyzer, mods []modifier
|
||||
depAns = append(depAns, a)
|
||||
}
|
||||
return &exprRuleset{
|
||||
Rules: compiledRules,
|
||||
Ans: depAns,
|
||||
Logger: config.Logger,
|
||||
GeoMatcher: geoMatcher,
|
||||
Rules: compiledRules,
|
||||
Ans: depAns,
|
||||
Logger: config.Logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func registerBuiltinFunctions(funcMap map[string]*ast.Function, geoMatcher *geo.GeoMatcher) {
|
||||
funcMap["geoip"] = &ast.Function{
|
||||
Name: "geoip",
|
||||
Func: func(params ...any) (any, error) {
|
||||
return geoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil
|
||||
},
|
||||
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoIp)},
|
||||
}
|
||||
funcMap["geosite"] = &ast.Function{
|
||||
Name: "geosite",
|
||||
Func: func(params ...any) (any, error) {
|
||||
return geoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil
|
||||
},
|
||||
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoSite)},
|
||||
}
|
||||
funcMap["cidr"] = &ast.Function{
|
||||
Name: "cidr",
|
||||
Func: func(params ...any) (any, error) {
|
||||
return builtins.MatchCIDR(params[0].(string), params[1].(*net.IPNet)), nil
|
||||
},
|
||||
Types: []reflect.Type{reflect.TypeOf((func(string, string) bool)(nil)), reflect.TypeOf(builtins.MatchCIDR)},
|
||||
}
|
||||
}
|
||||
|
||||
func streamInfoToExprEnv(info StreamInfo) map[string]interface{} {
|
||||
m := map[string]interface{}{
|
||||
"id": info.ID,
|
||||
@@ -299,29 +270,108 @@ func (v *idVisitor) Visit(node *ast.Node) {
|
||||
// idPatcher patches the AST during expr compilation, replacing certain values with
|
||||
// their internal representations for better runtime performance.
|
||||
type idPatcher struct {
|
||||
Err error
|
||||
FuncMap map[string]*Function
|
||||
Err error
|
||||
}
|
||||
|
||||
func (p *idPatcher) Visit(node *ast.Node) {
|
||||
switch (*node).(type) {
|
||||
case *ast.CallNode:
|
||||
callNode := (*node).(*ast.CallNode)
|
||||
if callNode.Func == nil {
|
||||
if callNode.Callee == nil {
|
||||
// Ignore invalid call nodes
|
||||
return
|
||||
}
|
||||
switch callNode.Func.Name {
|
||||
case "cidr":
|
||||
cidrStringNode, ok := callNode.Arguments[1].(*ast.StringNode)
|
||||
if !ok {
|
||||
return
|
||||
if f, ok := p.FuncMap[callNode.Callee.String()]; ok {
|
||||
if f.PatchFunc != nil {
|
||||
if err := f.PatchFunc(&callNode.Arguments); err != nil {
|
||||
p.Err = err
|
||||
return
|
||||
}
|
||||
}
|
||||
cidr, err := builtins.CompileCIDR(cidrStringNode.Value)
|
||||
if err != nil {
|
||||
p.Err = err
|
||||
return
|
||||
}
|
||||
callNode.Arguments[1] = &ast.ConstantNode{Value: cidr}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Function struct {
|
||||
InitFunc func() error
|
||||
PatchFunc func(args *[]ast.Node) error
|
||||
Func func(params ...any) (any, error)
|
||||
Types []reflect.Type
|
||||
}
|
||||
|
||||
func buildFunctionMap(config *BuiltinConfig) map[string]*Function {
|
||||
return map[string]*Function{
|
||||
"geoip": {
|
||||
InitFunc: config.GeoMatcher.LoadGeoIP,
|
||||
PatchFunc: nil,
|
||||
Func: func(params ...any) (any, error) {
|
||||
return config.GeoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil
|
||||
},
|
||||
Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoIp)},
|
||||
},
|
||||
"geosite": {
|
||||
InitFunc: config.GeoMatcher.LoadGeoSite,
|
||||
PatchFunc: nil,
|
||||
Func: func(params ...any) (any, error) {
|
||||
return config.GeoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil
|
||||
},
|
||||
Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoSite)},
|
||||
},
|
||||
"cidr": {
|
||||
InitFunc: nil,
|
||||
PatchFunc: func(args *[]ast.Node) error {
|
||||
cidrStringNode, ok := (*args)[1].(*ast.StringNode)
|
||||
if !ok {
|
||||
return fmt.Errorf("cidr: invalid argument type")
|
||||
}
|
||||
cidr, err := builtins.CompileCIDR(cidrStringNode.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
(*args)[1] = &ast.ConstantNode{Value: cidr}
|
||||
return nil
|
||||
},
|
||||
Func: func(params ...any) (any, error) {
|
||||
return builtins.MatchCIDR(params[0].(string), params[1].(*net.IPNet)), nil
|
||||
},
|
||||
Types: []reflect.Type{reflect.TypeOf(builtins.MatchCIDR)},
|
||||
},
|
||||
"lookup": {
|
||||
InitFunc: nil,
|
||||
PatchFunc: func(args *[]ast.Node) error {
|
||||
var serverStr *ast.StringNode
|
||||
if len(*args) > 1 {
|
||||
// Has the optional server argument
|
||||
var ok bool
|
||||
serverStr, ok = (*args)[1].(*ast.StringNode)
|
||||
if !ok {
|
||||
return fmt.Errorf("lookup: invalid argument type")
|
||||
}
|
||||
}
|
||||
r := &net.Resolver{
|
||||
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
if serverStr != nil {
|
||||
address = serverStr.Value
|
||||
}
|
||||
return config.ProtectedDialContext(ctx, network, address)
|
||||
},
|
||||
}
|
||||
if len(*args) > 1 {
|
||||
(*args)[1] = &ast.ConstantNode{Value: r}
|
||||
} else {
|
||||
*args = append(*args, &ast.ConstantNode{Value: r})
|
||||
}
|
||||
return nil
|
||||
},
|
||||
Func: func(params ...any) (any, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
|
||||
defer cancel()
|
||||
return params[1].(*net.Resolver).LookupHost(ctx, params[0].(string))
|
||||
},
|
||||
Types: []reflect.Type{
|
||||
reflect.TypeOf((func(string, *net.Resolver) []string)(nil)),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package ruleset
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/apernet/OpenGFW/analyzer"
|
||||
"github.com/apernet/OpenGFW/modifier"
|
||||
"github.com/apernet/OpenGFW/ruleset/builtins/geo"
|
||||
)
|
||||
|
||||
type Action int
|
||||
@@ -100,7 +102,7 @@ type Logger interface {
|
||||
}
|
||||
|
||||
type BuiltinConfig struct {
|
||||
Logger Logger
|
||||
GeoSiteFilename string
|
||||
GeoIpFilename string
|
||||
Logger Logger
|
||||
GeoMatcher *geo.GeoMatcher
|
||||
ProtectedDialContext func(ctx context.Context, network, address string) (net.Conn, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user