7 Commits

Author SHA1 Message Date
Toby
7a52228ec6 docs: add a section about openwrt (#53) 2024-02-11 13:12:49 -08:00
Toby
6d33a0d51c fix: incorrect verdict handling that caused packets to pass through even after they had been blocked (#52) 2024-02-11 13:05:05 -08:00
Toby
27c9b91a61 feat: nftables support (#50)
* feat: nftables support

* fix: format
2024-02-11 13:04:49 -08:00
TAKAHASHI Shuuji
36bb4b796d docs: update README.ja.md (#49) 2024-02-05 20:44:56 -08:00
Toby
843f17896c fix: netlink race condition (#48) 2024-02-05 19:32:52 -08:00
Toby
6871244809 chore: improve built-in funcs handling (#43) 2024-02-04 11:17:19 -08:00
Toby
f8f0153664 feat: rules hot reload via SIGHUP (#44) 2024-02-03 10:55:20 -08:00
9 changed files with 276 additions and 93 deletions

View File

@@ -17,16 +17,16 @@ OpenGFW は、Linux 上の [GFW](https://en.wikipedia.org/wiki/Great_Firewall)
- フル IP/TCP 再アセンブル、各種プロトコルアナライザー
- HTTP、TLS、DNS、SSH、SOCKS4/5、WireGuard、その他多数
- Shadowsocks の"完全に暗号化されたトラフィック"の検出
など。 (https://gfw.report/publications/usenixsecurity23/data/paper/paper.pdf)
- Shadowsocks の完全に暗号化されたトラフィックの検出など (https://gfw.report/publications/usenixsecurity23/data/paper/paper.pdf)
- トロイの木馬キラー (https://github.com/XTLS/Trojan-killer) に基づくトロイの木馬 (プロキシプロトコル) 検出
- [WIP] 機械学習に基づくトラフィック分類
- IPv4 と IPv6 をフルサポート
- フローベースのマルチコア負荷分散
- 接続オフロード
- [expr](https://github.com/expr-lang/expr) に基づく強力なルールエンジン
- ルールのホットリロード (`SIGHUP` を送信してリロード)
- 柔軟なアナライザ&モディファイアフレームワーク
- 拡張可能な IO 実装(今のところ NFQueue のみ)
- 拡張可能な IO 実装 (今のところ NFQueue のみ)
- [WIP] ウェブ UI
## ユースケース
@@ -52,6 +52,16 @@ export OPENGFW_LOG_LEVEL=debug
./OpenGFW -c config.yaml rules.yaml
```
#### OpenWrt
OpenGFW は OpenWrt 23.05 で動作することがテストされています(他のバージョンも動作するはずですが、検証されていません)。
依存関係をインストールしてください:
```shell
opkg install kmod-nft-queue kmod-nf-conntrack-netlink
```
### 設定例
```yaml
@@ -69,8 +79,7 @@ workers:
### ルール例
サポートされているすべてのプロトコルと、それぞれのプロトコルがどのようなフィールドを持っているかについてのドキュメントはまだ準備できておりません。
一旦は、"analyzer "ディレクトリの下にあるコードを直接チェックする必要があります。
[アナライザーのプロパティ](docs/Analyzers.md)
式言語の構文については、[Expr 言語定義](https://expr-lang.org/docs/language-definition)を参照してください。
@@ -120,6 +129,6 @@ workers:
#### サポートされるアクション
- `allow`: 接続を許可し、それ以上の処理は行わない。
- `block`: 接続をブロックし。
- `block`: 接続をブロックし、それ以上の処理は行わない
- `drop`: UDP の場合、ルールのトリガーとなったパケットをドロップし、同じフローに含まれる以降のパケットの処理を継続する。TCP の場合は、`block` と同じ。
- `modify`: UDP の場合、与えられた修飾子を使って、ルールをトリガしたパケットを修正し、同じフロー内の今後のパケットを処理し続ける。TCP の場合は、`allow` と同じ。

View File

@@ -29,6 +29,7 @@ Linux that's in many ways more powerful than the real thing. It's cyber sovereig
- Flow-based multicore load balancing
- Connection offloading
- Powerful rule engine based on [expr](https://github.com/expr-lang/expr)
- Hot-reloadable rules (send `SIGHUP` to reload)
- Flexible analyzer & modifier framework
- Extensible IO implementation (only NFQueue for now)
- [WIP] Web UI
@@ -56,6 +57,16 @@ export OPENGFW_LOG_LEVEL=debug
./OpenGFW -c config.yaml rules.yaml
```
#### OpenWrt
OpenGFW has been tested to work on OpenWrt 23.05 (other versions should also work, just not verified).
Install the dependencies:
```shell
opkg install kmod-nft-queue kmod-nf-conntrack-netlink
```
### Example config
```yaml

View File

@@ -25,6 +25,7 @@ OpenGFW 是一个 Linux 上灵活、易用、开源的 [GFW](https://zh.wikipedi
- 基于流的多核负载均衡
- 连接 offloading
- 基于 [expr](https://github.com/expr-lang/expr) 的强大规则引擎
- 规则可以热重载 (发送 `SIGHUP` 信号)
- 灵活的协议解析和修改框架
- 可扩展的 IO 实现 (目前只有 NFQueue)
- [开发中] Web UI
@@ -52,6 +53,16 @@ export OPENGFW_LOG_LEVEL=debug
./OpenGFW -c config.yaml rules.yaml
```
#### OpenWrt
OpenGFW 在 OpenWrt 23.05 上测试可用(其他版本应该也可以,暂时未经验证)。
安装依赖:
```shell
opkg install kmod-nft-queue kmod-nf-conntrack-netlink
```
### 样例配置
```yaml

View File

@@ -7,6 +7,7 @@ import (
"os/signal"
"strconv"
"strings"
"syscall"
"github.com/apernet/OpenGFW/analyzer"
"github.com/apernet/OpenGFW/analyzer/tcp"
@@ -267,14 +268,42 @@ func runMain(cmd *cobra.Command, args []string) {
logger.Fatal("failed to initialize engine", zap.Error(err))
}
// Signal handling
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
sigChan := make(chan os.Signal)
signal.Notify(sigChan, os.Interrupt, os.Kill)
<-sigChan
// Graceful shutdown
shutdownChan := make(chan os.Signal)
signal.Notify(shutdownChan, os.Interrupt, os.Kill)
<-shutdownChan
logger.Info("shutting down gracefully...")
cancelFunc()
}()
go func() {
// Rule reload
reloadChan := make(chan os.Signal)
signal.Notify(reloadChan, syscall.SIGHUP)
for {
<-reloadChan
logger.Info("reloading rules")
rawRs, err := ruleset.ExprRulesFromYAML(args[0])
if err != nil {
logger.Error("failed to load rules, using old rules", zap.Error(err))
continue
}
rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig)
if err != nil {
logger.Error("failed to compile rules, using old rules", zap.Error(err))
continue
}
err = en.UpdateRuleset(rs)
if err != nil {
logger.Error("failed to update ruleset", zap.Error(err))
} else {
logger.Info("rules reloaded")
}
}
}()
logger.Info("engine started")
logger.Info("engine exited", zap.Error(en.Run(ctx)))
}

View File

@@ -65,7 +65,7 @@ func (f *tcpStreamFactory) New(ipFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, a
ctx.Verdict = tcpVerdictAcceptStream
f.Logger.TCPStreamAction(info, ruleset.ActionAllow, true)
// a tcpStream with no activeEntries is a no-op
return &tcpStream{}
return &tcpStream{finalVerdict: tcpVerdictAcceptStream}
}
// Create entries for each analyzer
entries := make([]*tcpStreamEntry, 0, len(ans))
@@ -109,6 +109,7 @@ type tcpStream struct {
ruleset ruleset.Ruleset
activeEntries []*tcpStreamEntry
doneEntries []*tcpStreamEntry
finalVerdict tcpVerdict
}
type tcpStreamEntry struct {
@@ -119,8 +120,13 @@ type tcpStreamEntry struct {
}
func (s *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// Only accept packets if we still have active entries
return len(s.activeEntries) > 0
if len(s.activeEntries) > 0 {
return true
} else {
ctx := ac.(*tcpContext)
ctx.Verdict = s.finalVerdict
return false
}
}
func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
@@ -152,7 +158,9 @@ func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
action := result.Action
if action != ruleset.ActionMaybe && action != ruleset.ActionModify {
ctx.Verdict = actionToTCPVerdict(action)
verdict := actionToTCPVerdict(action)
s.finalVerdict = verdict
ctx.Verdict = verdict
s.logger.TCPStreamAction(s.info, action, false)
// Verdict issued, no need to process any more packets
s.closeActiveEntries()
@@ -160,6 +168,7 @@ func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
if len(s.activeEntries) == 0 && ctx.Verdict == tcpVerdictAccept {
// All entries are done but no verdict issued, accept stream
s.finalVerdict = tcpVerdictAcceptStream
ctx.Verdict = tcpVerdictAcceptStream
s.logger.TCPStreamAction(s.info, ruleset.ActionAllow, true)
}

View File

@@ -65,7 +65,7 @@ func (f *udpStreamFactory) New(ipFlow, udpFlow gopacket.Flow, udp *layers.UDP, u
uc.Verdict = udpVerdictAcceptStream
f.Logger.UDPStreamAction(info, ruleset.ActionAllow, true)
// a udpStream with no activeEntries is a no-op
return &udpStream{}
return &udpStream{finalVerdict: udpVerdictAcceptStream}
}
// Create entries for each analyzer
entries := make([]*udpStreamEntry, 0, len(ans))
@@ -167,6 +167,7 @@ type udpStream struct {
ruleset ruleset.Ruleset
activeEntries []*udpStreamEntry
doneEntries []*udpStreamEntry
finalVerdict udpVerdict
}
type udpStreamEntry struct {
@@ -177,8 +178,12 @@ type udpStreamEntry struct {
}
func (s *udpStream) Accept(udp *layers.UDP, rev bool, uc *udpContext) bool {
// Only accept packets if we still have active entries
return len(s.activeEntries) > 0
if len(s.activeEntries) > 0 {
return true
} else {
uc.Verdict = s.finalVerdict
return false
}
}
func (s *udpStream) Feed(udp *layers.UDP, rev bool, uc *udpContext) {
@@ -221,16 +226,18 @@ func (s *udpStream) Feed(udp *layers.UDP, rev bool, uc *udpContext) {
}
}
if action != ruleset.ActionMaybe {
var final bool
uc.Verdict, final = actionToUDPVerdict(action)
verdict, final := actionToUDPVerdict(action)
uc.Verdict = verdict
s.logger.UDPStreamAction(s.info, action, false)
if final {
s.finalVerdict = verdict
s.closeActiveEntries()
}
}
}
if len(s.activeEntries) == 0 && uc.Verdict == udpVerdictAccept {
// All entries are done but no verdict issued, accept stream
s.finalVerdict = udpVerdictAcceptStream
uc.Verdict = udpVerdictAcceptStream
s.logger.UDPStreamAction(s.info, ruleset.ActionAllow, true)
}

View File

@@ -127,7 +127,10 @@ func (w *worker) Run(ctx context.Context) {
}
func (w *worker) UpdateRuleset(r ruleset.Ruleset) error {
return w.tcpStreamFactory.UpdateRuleset(r)
if err := w.tcpStreamFactory.UpdateRuleset(r); err != nil {
return err
}
return w.udpStreamFactory.UpdateRuleset(r)
}
func (w *worker) handle(streamID uint32, p gopacket.Packet) (io.Verdict, []byte) {

View File

@@ -4,7 +4,10 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"os/exec"
"strconv"
"strings"
"github.com/coreos/go-iptables/iptables"
"github.com/florianl/go-nfqueue"
@@ -18,8 +21,50 @@ const (
nfqueueConnMarkAccept = 1001
nfqueueConnMarkDrop = 1002
nftFamily = "inet"
nftTable = "opengfw"
)
var nftRulesForward = fmt.Sprintf(`
define ACCEPT_CTMARK=%d
define DROP_CTMARK=%d
define QUEUE_NUM=%d
table %s %s {
chain FORWARD {
type filter hook forward priority filter; policy accept;
ct mark $ACCEPT_CTMARK counter accept
ct mark $DROP_CTMARK counter drop
counter queue num $QUEUE_NUM bypass
}
}
`, nfqueueConnMarkAccept, nfqueueConnMarkDrop, nfqueueNum, nftFamily, nftTable)
var nftRulesLocal = fmt.Sprintf(`
define ACCEPT_CTMARK=%d
define DROP_CTMARK=%d
define QUEUE_NUM=%d
table %s %s {
chain INPUT {
type filter hook input priority filter; policy accept;
ct mark $ACCEPT_CTMARK counter accept
ct mark $DROP_CTMARK counter drop
counter queue num $QUEUE_NUM bypass
}
chain OUTPUT {
type filter hook output priority filter; policy accept;
ct mark $ACCEPT_CTMARK counter accept
ct mark $DROP_CTMARK counter drop
counter queue num $QUEUE_NUM bypass
}
}
`, nfqueueConnMarkAccept, nfqueueConnMarkDrop, nfqueueNum, nftFamily, nftTable)
var iptRulesForward = []iptRule{
{"filter", "FORWARD", []string{"-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkAccept), "-j", "ACCEPT"}},
{"filter", "FORWARD", []string{"-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkDrop), "-j", "DROP"}},
@@ -43,8 +88,11 @@ var errNotNFQueuePacket = errors.New("not an NFQueue packet")
type nfqueuePacketIO struct {
n *nfqueue.Nfqueue
local bool
ipt4 *iptables.IPTables
ipt6 *iptables.IPTables
rSet bool // whether the nftables/iptables rules have been set
// iptables not nil = use iptables instead of nftables
ipt4 *iptables.IPTables
ipt6 *iptables.IPTables
}
type NFQueuePacketIOConfig struct {
@@ -56,13 +104,18 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) {
if config.QueueSize == 0 {
config.QueueSize = nfqueueDefaultQueueSize
}
ipt4, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
if err != nil {
return nil, err
}
ipt6, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
return nil, err
var ipt4, ipt6 *iptables.IPTables
var err error
if nftCheck() != nil {
// We prefer nftables, but if it's not available, fall back to iptables
ipt4, err = iptables.NewWithProtocol(iptables.ProtocolIPv4)
if err != nil {
return nil, err
}
ipt6, err = iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
return nil, err
}
}
n, err := nfqueue.Open(&nfqueue.Config{
NfQueue: nfqueueNum,
@@ -74,22 +127,16 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) {
if err != nil {
return nil, err
}
io := &nfqueuePacketIO{
return &nfqueuePacketIO{
n: n,
local: config.Local,
ipt4: ipt4,
ipt6: ipt6,
}
err = io.setupIpt(config.Local, false)
if err != nil {
_ = n.Close()
return nil, err
}
return io, nil
}, nil
}
func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error {
return n.n.RegisterWithErrorFunc(ctx,
err := n.n.RegisterWithErrorFunc(ctx,
func(a nfqueue.Attribute) int {
if a.PacketID == nil || a.Ct == nil || a.Payload == nil || len(*a.Payload) < 20 {
// Invalid packet, ignore
@@ -106,6 +153,21 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error
func(e error) int {
return okBoolToInt(cb(nil, e))
})
if err != nil {
return err
}
if !n.rSet {
if n.ipt4 != nil {
err = n.setupIpt(n.local, false)
} else {
err = n.setupNft(n.local, false)
}
if err != nil {
return err
}
n.rSet = true
}
return nil
}
func (n *nfqueuePacketIO) SetVerdict(p Packet, v Verdict, newPacket []byte) error {
@@ -130,6 +192,39 @@ func (n *nfqueuePacketIO) SetVerdict(p Packet, v Verdict, newPacket []byte) erro
}
}
func (n *nfqueuePacketIO) Close() error {
if n.rSet {
if n.ipt4 != nil {
_ = n.setupIpt(n.local, true)
} else {
_ = n.setupNft(n.local, true)
}
n.rSet = false
}
return n.n.Close()
}
func (n *nfqueuePacketIO) setupNft(local, remove bool) error {
var rules string
if local {
rules = nftRulesLocal
} else {
rules = nftRulesForward
}
var err error
if remove {
err = nftDelete(nftFamily, nftTable)
} else {
// Delete first to make sure no leftover rules
_ = nftDelete(nftFamily, nftTable)
err = nftAdd(rules)
}
if err != nil {
return err
}
return nil
}
func (n *nfqueuePacketIO) setupIpt(local, remove bool) error {
var rules []iptRule
if local {
@@ -149,12 +244,6 @@ func (n *nfqueuePacketIO) setupIpt(local, remove bool) error {
return nil
}
func (n *nfqueuePacketIO) Close() error {
err := n.setupIpt(n.local, true)
_ = n.n.Close()
return err
}
var _ Packet = (*nfqueuePacket)(nil)
type nfqueuePacket struct {
@@ -179,6 +268,25 @@ func okBoolToInt(ok bool) int {
}
}
func nftCheck() error {
_, err := exec.LookPath("nft")
if err != nil {
return err
}
return nil
}
func nftAdd(input string) error {
cmd := exec.Command("nft", "-f", "-")
cmd.Stdin = strings.NewReader(input)
return cmd.Run()
}
func nftDelete(family, table string) error {
cmd := exec.Command("nft", "delete", "table", family, table)
return cmd.Run()
}
type iptRule struct {
Table, Chain string
RuleSpec []string

View File

@@ -46,7 +46,6 @@ type compiledExprRule struct {
Action Action
ModInstance modifier.Instance
Program *vm.Program
Analyzers map[string]struct{}
}
var _ Ruleset = (*exprRuleset)(nil)
@@ -100,55 +99,45 @@ func CompileExprRules(rules []ExprRule, ans []analyzer.Analyzer, mods []modifier
if !ok {
return nil, fmt.Errorf("rule %q has invalid action %q", rule.Name, rule.Action)
}
visitor := &depVisitor{Analyzers: make(map[string]struct{})}
geoip := expr.Function(
"geoip",
func(params ...any) (any, error) {
return geoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil
},
new(func(string, string) bool),
)
geosite := expr.Function(
"geosite",
func(params ...any) (any, error) {
return geoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil
},
new(func(string, string) bool),
)
visitor := &idVisitor{Identifiers: make(map[string]bool)}
program, err := expr.Compile(rule.Expr,
func(c *conf.Config) {
c.Strict = false
c.Expect = reflect.Bool
c.Visitors = append(c.Visitors, visitor)
registerBuiltinFunctions(c.Functions, geoMatcher)
},
geoip,
geosite,
)
if err != nil {
return nil, fmt.Errorf("rule %q has invalid expression: %w", rule.Name, err)
}
for name := range visitor.Analyzers {
a, ok := fullAnMap[name]
if !ok && !isBuiltInAnalyzer(name) {
return nil, fmt.Errorf("rule %q uses unknown analyzer %q", rule.Name, name)
for name := range visitor.Identifiers {
if isBuiltInAnalyzer(name) {
continue
}
depAnMap[name] = a
}
if visitor.UseGeoSite {
if err := geoMatcher.LoadGeoSite(); err != nil {
return nil, fmt.Errorf("rule %q failed to load geosite: %w", rule.Name, err)
}
}
if visitor.UseGeoIp {
if err := geoMatcher.LoadGeoIP(); err != nil {
return nil, fmt.Errorf("rule %q failed to load geoip: %w", rule.Name, err)
// Check if it's one of the built-in functions, and if so,
// skip it as an analyzer & do initialization if necessary.
switch name {
case "geoip":
if err := geoMatcher.LoadGeoIP(); err != nil {
return nil, fmt.Errorf("rule %q failed to load geoip: %w", rule.Name, err)
}
case "geosite":
if err := geoMatcher.LoadGeoSite(); err != nil {
return nil, fmt.Errorf("rule %q failed to load geosite: %w", rule.Name, err)
}
default:
a, ok := fullAnMap[name]
if !ok {
return nil, fmt.Errorf("rule %q uses unknown analyzer %q", rule.Name, name)
}
depAnMap[name] = a
}
}
cr := compiledExprRule{
Name: rule.Name,
Action: action,
Program: program,
Analyzers: visitor.Analyzers,
Name: rule.Name,
Action: action,
Program: program,
}
if action == ActionModify {
mod, ok := fullModMap[rule.Modifier.Name]
@@ -175,6 +164,23 @@ func CompileExprRules(rules []ExprRule, ans []analyzer.Analyzer, mods []modifier
}, nil
}
func registerBuiltinFunctions(funcMap map[string]*ast.Function, geoMatcher *geo.GeoMatcher) {
funcMap["geoip"] = &ast.Function{
Name: "geoip",
Func: func(params ...any) (any, error) {
return geoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil
},
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoIp)},
}
funcMap["geosite"] = &ast.Function{
Name: "geosite",
Func: func(params ...any) (any, error) {
return geoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil
},
Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoSite)},
}
}
func streamInfoToExprEnv(info StreamInfo) map[string]interface{} {
m := map[string]interface{}{
"id": info.ID,
@@ -241,22 +247,12 @@ func modifiersToMap(mods []modifier.Modifier) map[string]modifier.Modifier {
return modMap
}
type depVisitor struct {
Analyzers map[string]struct{}
UseGeoSite bool
UseGeoIp bool
type idVisitor struct {
Identifiers map[string]bool
}
func (v *depVisitor) Visit(node *ast.Node) {
func (v *idVisitor) Visit(node *ast.Node) {
if idNode, ok := (*node).(*ast.IdentifierNode); ok {
switch idNode.Value {
case "geosite":
v.UseGeoSite = true
case "geoip":
v.UseGeoIp = true
default:
v.Analyzers[idNode.Value] = struct{}{}
}
v.Identifiers[idNode.Value] = true
}
}