commit 86dee2780c1be4c3950473f0ace46cc8e84341ef Author: Louis Date: Fri Mar 23 18:59:10 2018 +0100 Initial diff --git a/README.md b/README.md new file mode 100644 index 0000000..0d87a85 --- /dev/null +++ b/README.md @@ -0,0 +1,81 @@ +# FGBGP + +This is a BGP library in Go. + +## Features + +* Decode/encode BGP messages +* Maintain a connexion with peers +* RIB data structure (add/remove IPv4/IPv6) +* Event-based API +* Encode/decode MRT format + +## Why use this library? + +It was developped to have a flexible BGP server running on a cluster with +load-balanced IPs and ready for scale. +A full-table will use around 300MB of RAM. + +This is not a fully integrated BGP daemon: it does not filter routes or +automatically send BGP updates on the routes learned. + +The behavior has to be implemented using the event-based API: +* Peer status change +* Update received + +This library can also be used for standalone BGP Messages decoding, +reading MRT files or storing updates into a RIB and perform lookups. + +### Supported BGP features + +* Add-path (decode/encode only, not storing into the RIB) +* Route-refresh +* Basic BGP attributes (Origin, MED...) +* Aggregator + +### Supported MRT features + +* TABLEDUMPV2 +* MRT update +* Status + +## Example + +``` +import ( + "fmt" + server "fgbgp/server" +) + +type Collector struct { + +} + +func (col *Collector) Notification(msg *messages.BGPMessageNotification, n *server.Neighbor) (bool) { + return true +} + +func (col *Collector) ProcessReceived(v interface{}, n *server.Neighbor) (bool, error) { + return true, nil +} + +func (col *Collector) ProcessSend(v interface{}, n *server.Neighbor) (bool, error) { + return true, nil +} + +func (col *Collector) ProcessUpdateEvent(e *messages.BGPMessageUpdate, n *server.Neighbor) (add bool) { + +} + +func main() { + m := server.NewManager(65001, net.ParseIP("10.0.0.1"), false, false) + m.UseDefaultUpdateHandler(10) + col := &Collector{} + m.SetEventHandler(&col) + m.SetUpdateEventHandler(&col) + err := m.NewServer(*BgpAddr) + if err != nil { + log.Fatal(err) + } +} +``` diff --git a/fgbgp.go b/fgbgp.go new file mode 100644 index 0000000..b7a4527 --- /dev/null +++ b/fgbgp.go @@ -0,0 +1,39 @@ +package main + +import ( + "flag" + "fmt" + server "github.com/cloudflare/fgbgp/server" + log "github.com/sirupsen/logrus" + "net" + "os" +) + +const AppVersion = "fgbgp 2017.8.0" + +var ( + LogLevel = flag.String("loglevel", "info", "Log level") + BgpAddr = flag.String("bgp.addr", ":1179", "Listen address") + Version = flag.Bool("version", false, "Print version") +) + +func main() { + flag.Parse() + + if *Version { + fmt.Println(AppVersion) + os.Exit(0) + } + + lvl, _ := log.ParseLevel(*LogLevel) + log.SetLevel(lvl) + + m := server.NewManager(65001, net.ParseIP("1.2.3.4"), false, false) + m.UseDefaultUpdateHandler(10) + err := m.NewServer(*BgpAddr) + if err != nil { + log.Fatal(err) + } + fmt.Printf("hello %v\n", m) + m.Start() +} diff --git a/messages/bgp_open.go b/messages/bgp_open.go new file mode 100644 index 0000000..04eb7a0 --- /dev/null +++ b/messages/bgp_open.go @@ -0,0 +1,385 @@ +package messages + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "io" + "net" +) + +type BGPCapability_MP struct { + Afi uint16 + Safi byte +} + +type BGPCapability_ASN struct { + ASN uint32 +} + +type AddPath struct { + Afi uint16 + Safi byte + TxRx byte +} + +type BGPCapability_ROUTEREFRESH struct { +} + +type BGPCapability_ADDPATH struct { + AddPathList []AddPath +} + +type BGPCapability struct { + Type byte + Data []byte +} + +type BGPCapabilityIf SerializableInterface + +type BGPCapabilities struct { + BGPCapabilities []BGPCapabilityIf +} + +type BGPParameterIf SerializableInterface + +type BGPParameter struct { + Type byte + Data BGPParameterIf +} + +type BGPMessageOpen struct { + BGPMessageHead + Version byte + ASN uint16 + HoldTime uint16 + Identifier []byte + Parameters []BGPParameter +} + +func (m BGPParameter) String() string { + str := "Parameter (%v):" + str += m.Data.String() + return fmt.Sprintf(str, m.Type) +} + +func (m BGPCapability) String() string { + str := "Capability: %v (%v): %v" + return fmt.Sprintf(str, CapaDescr[int(m.Type)], m.Type, m.Data) +} + +func (c BGPCapability_ROUTEREFRESH) String() string { + return "Capability Route-Refresh" +} + +func (c BGPCapability_MP) String() string { + return fmt.Sprintf("Capability Multiprotocol: %v-%v (%v) (%v)", AfiToStr[c.Afi], SafiToStr[c.Safi], c.Afi, c.Safi) +} + +func (c BGPCapability_ADDPATH) String() string { + var addpathstr string + for i := range c.AddPathList { + addpathstr += c.AddPathList[i].String() + ", " + } + return fmt.Sprintf("Capability Add-Path: [ %v]", addpathstr) +} + +func (c BGPCapability_ASN) String() string { + return fmt.Sprintf("Capability ASN: %v", c.ASN) +} + +func (c BGPCapabilities) String() string { + var str string + for i := range c.BGPCapabilities { + if c.BGPCapabilities[i] != nil { + str += c.BGPCapabilities[i].String() + ", " + } + } + return str +} + +func (m *BGPMessageOpen) String() string { + str := "BGP Open: Version: %v / ASN: %v / HoldTime: %v / Identifier: %v / Parameters (%v): [ " + ip := net.IP(m.Identifier) + str = fmt.Sprintf(str, m.Version, m.ASN, m.HoldTime, ip.String(), len(m.Parameters)) + for i := range m.Parameters { + str += m.Parameters[i].String() + } + str += "]" + return str +} + +func (m BGPCapabilities) Len() int { + var sum int + for c := range m.BGPCapabilities { + sum += m.BGPCapabilities[c].Len() + } + return sum +} + +func (c BGPCapabilities) Write(bw io.Writer) { + for i := range c.BGPCapabilities { + c.BGPCapabilities[i].Write(bw) + } +} + +func (m BGPCapability_ROUTEREFRESH) Len() int { + return 2 +} + +func (m BGPCapability_ROUTEREFRESH) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, byte(CAPA_RR)) + binary.Write(bw, binary.BigEndian, byte(0)) +} + +func (m BGPCapability_MP) Len() int { + return 6 +} + +func (m BGPCapability_MP) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, byte(CAPA_MP)) + binary.Write(bw, binary.BigEndian, byte(4)) + + binary.Write(bw, binary.BigEndian, m.Afi) + binary.Write(bw, binary.BigEndian, byte(0)) + binary.Write(bw, binary.BigEndian, m.Safi) +} + +func (m BGPCapability_ASN) Len() int { + return 6 +} + +func (m BGPCapability_ASN) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, byte(CAPA_ASN)) + binary.Write(bw, binary.BigEndian, byte(4)) + + binary.Write(bw, binary.BigEndian, m.ASN) +} + +func (p AddPath) Len() int { + return 4 +} + +func (p AddPath) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, p.Afi) + binary.Write(bw, binary.BigEndian, p.Safi) + binary.Write(bw, binary.BigEndian, p.TxRx) +} + +func (m BGPCapability_ADDPATH) Len() int { + var sum int + for c := range m.AddPathList { + sum += m.AddPathList[c].Len() + } + return sum +} + +func (m BGPCapability_ADDPATH) Write(bw io.Writer) { + for c := range m.AddPathList { + m.AddPathList[c].Write(bw) + } +} + +func (m BGPCapability) Len() int { + return 1 + 1 + len(m.Data) +} + +func (m BGPCapability) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, m.Type) + binary.Write(bw, binary.BigEndian, byte(len(m.Data))) + binary.Write(bw, binary.BigEndian, m.Data) +} + +func (m BGPParameter) Len() int { + return 1 + 1 + m.Data.Len() +} + +func (m BGPParameter) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, m.Type) + binary.Write(bw, binary.BigEndian, byte(m.Data.Len())) + m.Data.Write(bw) +} + +func (m BGPMessageOpen) LenParams() int { + sum := 0 + for i := range m.Parameters { + sum += m.Parameters[i].Len() + } + return sum +} + +func (m BGPMessageOpen) LenContent() int { + return 10 + m.LenParams() +} + +func (m BGPMessageOpen) Len() int { + return GetBGPHeaderLen() + m.LenContent() +} + +func (m BGPMessageOpen) Write(bw io.Writer) { + WriteBGPHeader(MESSAGE_OPEN, uint16(m.LenContent()), bw) + binary.Write(bw, binary.BigEndian, m.Version) + binary.Write(bw, binary.BigEndian, m.ASN) + binary.Write(bw, binary.BigEndian, m.HoldTime) + binary.Write(bw, binary.BigEndian, m.Identifier[0:4]) + binary.Write(bw, binary.BigEndian, byte(m.LenParams())) + for i := range m.Parameters { + m.Parameters[i].Write(bw) + } +} + +func (c BGPCapability) ParseCapability() BGPCapabilityIf { + buf := bytes.NewBuffer(c.Data) + var ret BGPCapabilityIf + switch c.Type { + case CAPA_MP: + mpstruct := BGPCapability_MP{} + binary.Read(buf, binary.BigEndian, &mpstruct.Afi) + buf.ReadByte() + binary.Read(buf, binary.BigEndian, &mpstruct.Safi) + ret = mpstruct + case CAPA_ADDPATH: + apstruct := BGPCapability_ADDPATH{ + AddPathList: make([]AddPath, len(c.Data)/4), + } + for i := 0; i < len(c.Data)/4; i++ { + binary.Read(buf, binary.BigEndian, &(apstruct.AddPathList[i].Afi)) + binary.Read(buf, binary.BigEndian, &(apstruct.AddPathList[i].Safi)) + binary.Read(buf, binary.BigEndian, &(apstruct.AddPathList[i].TxRx)) + } + + ret = apstruct + case CAPA_ASN: + asnstruct := BGPCapability_ASN{} + binary.Read(buf, binary.BigEndian, &asnstruct.ASN) + ret = asnstruct + case CAPA_RR: + ret = BGPCapability_ROUTEREFRESH{} + default: + unknownstruct := BGPCapability{} + unknownstruct.Type = c.Type + unknownstruct.Data = c.Data + ret = unknownstruct + } + return ret +} + +func ParseOpen(b []byte) (*BGPMessageOpen, error) { + m := BGPMessageOpen{} + + if len(b) < 10 { + return nil, errors.New(fmt.Sprintf("ParseOpen: wrong open size: %v < 10", len(b))) + } + version := b[0] + asn := uint16(b[1])<<8 | uint16(b[2]) + holdtime := uint16(b[3])<<8 | uint16(b[4]) + + if holdtime > 0 && holdtime < 3 { + log.Warnf("ParseOpen: BGP open hold time must be zero or at least 3. Got %v.", holdtime) + } + + identifier := b[5:9] + optparamlen := int(b[9]) + + m.Version = version + m.HoldTime = holdtime + m.Identifier = identifier + m.ASN = asn + m.Parameters = make([]BGPParameter, 0) + + if len(b)-10 != optparamlen { + return nil, errors.New(fmt.Sprintf("ParseOpen: wrong open size for optional parameters: %v != %v", len(b)-10, optparamlen)) + } + + if optparamlen > 0 && len(b)-10 >= 2 { + i := 10 + for i < len(b)-1 { + parmtype := b[i] + parmlength := int(b[i+1]) + + if i+1+parmlength > len(b) { + return nil, errors.New(fmt.Sprintf("ParseOpen: wrong parameter length: %v > %v", i+1+parmlength, len(b))) + } + + i += 2 + bgpparameter := BGPParameter{ + Type: parmtype, + } + + var parameterdata BGPCapabilityIf + baseparam := i + + switch parmtype { + case PARAMETER_CAPA: + bgpcapa := BGPCapabilities{make([]BGPCapabilityIf, 0)} + for i < len(b)-1 && i < baseparam+parmlength { + capatype := b[i] + capalength := int(b[i+1]) + + if i+1+capalength > len(b) { + return nil, errors.New(fmt.Sprintf("ParseOpen: wrong capability length: %v > %v", i+1+capalength, len(b))) + } + capa := b[i+2 : i+2+capalength] + //log.Debugf("ParseOpen: Capa %v %v %v", capatype, capalength, capa) + + i += 2 + capalength + + capastruct := BGPCapability{ + Type: capatype, + Data: capa, + } + capastructparsed := capastruct.ParseCapability() + bgpcapa.BGPCapabilities = append(bgpcapa.BGPCapabilities, capastructparsed) + } + parameterdata = bgpcapa + } + + bgpparameter.Data = parameterdata + m.Parameters = append(m.Parameters, bgpparameter) + } + + } + return &m, nil +} + +func CraftOpenMessage(asn uint32, holdtime uint16, identifier []byte, mplist []BGPCapability_MP, addpathlist []AddPath, routerefresh bool) *BGPMessageOpen { + asn_2o := uint16(asn) + if asn >= 65536 { + asn_2o = 23456 + } + // Check for identifier = 4: "bytes", + open := &BGPMessageOpen{ + Version: 4, + ASN: asn_2o, + HoldTime: holdtime, + Identifier: identifier, + Parameters: make([]BGPParameter, 1)} + + ptr := &BGPCapabilities{make([]BGPCapabilityIf, 0)} + ptr.BGPCapabilities = append(ptr.BGPCapabilities, &BGPCapability_ASN{asn}) + + if mplist != nil && len(mplist) > 0 { + for i := range mplist { + ptr.BGPCapabilities = append(ptr.BGPCapabilities, &mplist[i]) + } + } + + if addpathlist != nil && len(addpathlist) > 0 { + addpathcapa := BGPCapability_ADDPATH{AddPathList: addpathlist} + ptr.BGPCapabilities = append(ptr.BGPCapabilities, addpathcapa) + } + + if routerefresh { + ptr.BGPCapabilities = append(ptr.BGPCapabilities, BGPCapability_ROUTEREFRESH{}) + } + + parameter := BGPParameter{ + Type: PARAMETER_CAPA, + Data: ptr, + } + + open.Parameters[0] = parameter + return open +} diff --git a/messages/bgp_update.go b/messages/bgp_update.go new file mode 100644 index 0000000..51ca4fd --- /dev/null +++ b/messages/bgp_update.go @@ -0,0 +1,797 @@ +package messages + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + "time" +) + +type BGPAttribute_NEXTHOP struct { + NextHop net.IP +} + +type BGPAttribute_ORIGIN struct { + Origin byte +} + +type BGPAttribute_MED struct { + Med uint32 +} + +type BGPAttribute_LOCPREF struct { + LocPref uint32 +} + +type BGPAttribute_COMMUNITIES struct { + Communities []uint32 +} + +type ASPath_Segment struct { + SType byte + ASPath []uint32 +} + +type BGPAttribute_ASPATH struct { + Segments []ASPath_Segment + Enc2Bytes bool +} + +type BGPAttribute_MP_UNREACH struct { + Afi uint16 + Safi byte + NLRI []NLRI + + EnableAddPath bool +} + +type BGPAttribute_MP_REACH struct { + Afi uint16 + Safi byte + NextHop net.IP + NLRI []NLRI + + EnableAddPath bool +} + +type BGPAttribute_ATOMIC_AGGREGATE struct { +} + +type BGPAttribute_AGGREGATOR struct { + ASN uint32 + Identifier []byte + Enc2Bytes bool +} + +type BGPAttributeIf SerializableInterface + +type BGPAttribute struct { + Flags byte + Code byte + Data []byte +} + +type BGPMessageUpdate struct { + BGPMessageHead + WithdrawnRoutes []NLRI + PathAttributes []BGPAttributeIf + NLRI []NLRI + + EnableAddPath bool +} + +func (m BGPAttribute_ORIGIN) String() string { + return fmt.Sprintf("Origin: %v", m.Origin) +} + +func (m BGPAttribute_AGGREGATOR) String() string { + id := net.IP(m.Identifier) + return fmt.Sprintf("Aggregator: ASN: %v / Id: %v", m.ASN, id) +} + +func (m BGPAttribute_ATOMIC_AGGREGATE) String() string { + return "Atomic aggregate" +} + +func (m BGPAttribute_MED) String() string { + return fmt.Sprintf("Med: %v", m.Med) +} + +func (m BGPAttribute_LOCPREF) String() string { + return fmt.Sprintf("LocPref: %v", m.LocPref) +} + +func (m BGPAttribute_ASPATH) String() string { + return fmt.Sprintf("ASPath: %v", m.Segments) +} + +func (m ASPath_Segment) String() string { + return fmt.Sprintf("Segment (type: %v | len: %v): %v", m.SType, len(m.ASPath), m.ASPath) +} + +func (m BGPAttribute_COMMUNITIES) String() string { + var comlist string + for i := range m.Communities { + comlist += fmt.Sprintf("%v:%v, ", m.Communities[i]&0xFFFF0000>>16, m.Communities[i]&0xFFFF) + } + + return fmt.Sprintf("Communities: [ %v]", comlist) +} + +func (m BGPAttribute_MP_REACH) String() string { + var NLRI string + for i := range m.NLRI { + NLRI += m.NLRI[i].String() + ", " + } + + return fmt.Sprintf("MP Reach: %v-%v (%v) (%v) / Nexthop: %v / NLRI: [ %v]", AfiToStr[m.Afi], SafiToStr[m.Safi], m.Afi, m.Safi, m.NextHop, NLRI) +} + +func (m BGPAttribute_MP_UNREACH) String() string { + var NLRI string + for i := range m.NLRI { + NLRI += m.NLRI[i].String() + ", " + } + + return fmt.Sprintf("MP Unreach: %v-%v (%v) (%v) / NLRI: [ %v]", AfiToStr[m.Afi], SafiToStr[m.Safi], m.Afi, m.Safi, NLRI) +} + +func (m BGPAttribute_NEXTHOP) String() string { + return fmt.Sprintf("Nexthop %v", m.NextHop.String()) +} + +func (m BGPAttribute) String() string { + str := "%b %v (%v): %v" + return fmt.Sprintf(str, m.Flags, BgpAttributes[int(m.Code)], m.Code, m.Data) +} + +func (m BGPMessageUpdate) String() string { + // To be completed + return fmt.Sprintf("BGP Update: Withdraw: %v / PathAttributes: %v / NLRI: %v", m.WithdrawnRoutes, m.PathAttributes, m.NLRI) +} + +func (m BGPMessageUpdate) LenWithdrawn() int { + var sumW int + for i := range m.WithdrawnRoutes { + sumW += m.WithdrawnRoutes[i].Len(m.EnableAddPath) + } + return sumW +} + +func (m BGPMessageUpdate) LenPathAttribute() int { + var sumPA int + for i := range m.PathAttributes { + sumPA += m.PathAttributes[i].Len() + } + return sumPA +} + +func (m BGPMessageUpdate) LenContent() int { + sumW := m.LenWithdrawn() + sumPA := m.LenPathAttribute() + + var sumA int + for i := range m.NLRI { + sumA += m.NLRI[i].Len(m.EnableAddPath) + } + + return 2 + sumW + 2 + sumPA + sumA +} + +func (m BGPMessageUpdate) Len() int { + return GetBGPHeaderLen() + m.LenContent() +} + +func (m BGPMessageUpdate) Write(bw io.Writer) { + WriteBGPHeader(MESSAGE_UPDATE, uint16(m.LenContent()), bw) + + binary.Write(bw, binary.BigEndian, uint16(m.LenWithdrawn())) + for i := range m.WithdrawnRoutes { + m.WithdrawnRoutes[i].Write(bw, m.EnableAddPath) + } + + binary.Write(bw, binary.BigEndian, uint16(m.LenPathAttribute())) + for i := range m.PathAttributes { + m.PathAttributes[i].Write(bw) + } + + for i := range m.NLRI { + m.NLRI[i].Write(bw, m.EnableAddPath) + } +} + +func (m BGPAttribute_ORIGIN) Len() int { + return AttributeHeaderLen(1) + 1 +} + +func (m BGPAttribute_ORIGIN) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, byte(ATTRIBUTE_TRANSITIVE)) + binary.Write(bw, binary.BigEndian, byte(ATTRIBUTE_ORIGIN)) + binary.Write(bw, binary.BigEndian, byte(1)) + binary.Write(bw, binary.BigEndian, m.Origin) +} + +func (m BGPAttribute_AGGREGATOR) Len() int { + return AttributeHeaderLen(8) + 8 +} + +func (m BGPAttribute_AGGREGATOR) Write(bw io.Writer) { + // May cause issue as ASN is 32 bits + + binary.Write(bw, binary.BigEndian, byte(ATTRIBUTE_OPTIONAL)) + binary.Write(bw, binary.BigEndian, byte(ATTRIBUTE_AGGREGATOR)) + binary.Write(bw, binary.BigEndian, byte(8)) + binary.Write(bw, binary.BigEndian, m.ASN) + binary.Write(bw, binary.BigEndian, m.Identifier) +} + +func (m BGPAttribute_ATOMIC_AGGREGATE) Len() int { + return AttributeHeaderLen(0) +} + +func (m BGPAttribute_ATOMIC_AGGREGATE) Write(bw io.Writer) { + WriteAttributeHeader(bw, 0, ATTRIBUTE_OPTIONAL, ATTRIBUTE_ATOMIC_AGGREGATE) +} + +func (m BGPAttribute_MED) Len() int { + return AttributeHeaderLen(4) + 4 +} + +func (m BGPAttribute_MED) Write(bw io.Writer) { + binary.Write(bw, binary.BigEndian, byte(ATTRIBUTE_OPTIONAL)) + binary.Write(bw, binary.BigEndian, byte(ATTRIBUTE_MED)) + binary.Write(bw, binary.BigEndian, byte(4)) + binary.Write(bw, binary.BigEndian, m.Med) +} + +func (m BGPAttribute_LOCPREF) Len() int { + return AttributeHeaderLen(4) + 4 +} + +func (m BGPAttribute_LOCPREF) Write(bw io.Writer) { + WriteAttributeHeader(bw, 4, ATTRIBUTE_OPTIONAL, ATTRIBUTE_LOCPREF) + binary.Write(bw, binary.BigEndian, m.LocPref) +} + +func (m BGPAttribute_ASPATH) Len() int { + var size int + for i := range m.Segments { + size += m.Segments[i].LenContent(m.Enc2Bytes) + } + return AttributeHeaderLen(size) + size +} + +func (m BGPAttribute_ASPATH) Write(bw io.Writer) { + WriteAttributeHeader(bw, m.LenContent(), ATTRIBUTE_TRANSITIVE, ATTRIBUTE_ASPATH) + if len(m.Segments) == 0 { + return + } + + for i := range m.Segments { + m.Segments[i].Write(bw, m.Enc2Bytes) + } +} + +func (m BGPAttribute_ASPATH) LenContent() int { + var size int + for i := range m.Segments { + size += m.Segments[i].LenContent(m.Enc2Bytes) + } + return size +} + +func (m ASPath_Segment) LenSets() int { + aspathlen := len(m.ASPath) + itera := aspathlen / 0xff + if aspathlen%0xff != 0 { + itera += 1 + } + return itera +} + +func (m ASPath_Segment) LenContent(enc2bytes bool) int { + if len(m.ASPath) == 0 { + return 0 + } + + numsets := m.LenSets() + if enc2bytes { + return 2*numsets + 2*len(m.ASPath) + } + return 2*numsets + 4*len(m.ASPath) +} + +func (m ASPath_Segment) Write(bw io.Writer, enc2bytes bool) { + if len(m.ASPath) == 0 { + return + } + + itera := m.LenSets() + var cursetlen byte + + for i := 0; i < itera; i++ { + cursetlen = 0xff + if i == itera-1 { + cursetlen = byte(len(m.ASPath) % 0xff) + } + binary.Write(bw, binary.BigEndian, m.SType) + binary.Write(bw, binary.BigEndian, byte(cursetlen)) + for j := 0xff * i; j < len(m.ASPath) && j < 0xff*(i+1); j++ { + if enc2bytes { + binary.Write(bw, binary.BigEndian, uint16(m.ASPath[j])) + } else { + binary.Write(bw, binary.BigEndian, m.ASPath[j]) + } + } + } +} + +func AttributeHeaderLen(size int) int { + if size > 0xff { + return 4 + } else { + return 3 + } +} + +func IPtoBytes(ip net.IP) []byte { + if ip.To4() == nil { + return ip.To16() + } + return ip.To4() +} + +func WriteAttributeHeader(bw io.Writer, size int, attrflag byte, attrcode byte) { + var extended byte + if size > 0xff { + extended = ATTRIBUTE_EXTENDED + } + binary.Write(bw, binary.BigEndian, byte(attrflag|extended)) + binary.Write(bw, binary.BigEndian, attrcode) + if extended != 0 { + binary.Write(bw, binary.BigEndian, byte((size&0xff00)>>8)) + } + binary.Write(bw, binary.BigEndian, byte(size&0xff)) +} + +func (m BGPAttribute_COMMUNITIES) LenContent() int { + return 4 * len(m.Communities) +} + +func (m BGPAttribute_COMMUNITIES) Len() int { + size := m.LenContent() + return AttributeHeaderLen(size) + size +} + +func (m BGPAttribute_COMMUNITIES) Write(bw io.Writer) { + WriteAttributeHeader(bw, m.LenContent(), ATTRIBUTE_TRANSITIVEOPT, ATTRIBUTE_COMMUNITIES) + for i := range m.Communities { + binary.Write(bw, binary.BigEndian, m.Communities[i]) + } +} + +func (m BGPAttribute_MP_REACH) LenMrt() uint16 { + ip := IPtoBytes(m.NextHop) + return 2 + 1 + 1 + uint16(len(ip)) +} + +func (m BGPAttribute_MP_REACH) WriteMrt(buf io.Writer) { + binary.Write(buf, binary.BigEndian, byte(ATTRIBUTE_OPTIONAL)) + binary.Write(buf, binary.BigEndian, byte(ATTRIBUTE_REACH)) + + nhb := IPtoBytes(m.NextHop) + lengthnh := len(nhb) + + binary.Write(buf, binary.BigEndian, byte(1+lengthnh)) + binary.Write(buf, binary.BigEndian, byte(lengthnh)) + binary.Write(buf, binary.BigEndian, nhb) +} + +func (m BGPAttribute_MP_REACH) GetNextHopLen() int { + return len(IPtoBytes(m.NextHop)) +} + +func (m BGPAttribute_MP_REACH) ContentLen() int { + var sum int + for i := range m.NLRI { + sum += m.NLRI[i].Len(m.EnableAddPath) + } + + size := 4 + m.GetNextHopLen() + 1 + sum + return size +} + +func (m BGPAttribute_MP_REACH) Len() int { + size := m.ContentLen() + return AttributeHeaderLen(size) + size +} + +func (m BGPAttribute_MP_REACH) Write(bw io.Writer) { + WriteAttributeHeader(bw, m.ContentLen(), ATTRIBUTE_OPTIONAL, ATTRIBUTE_REACH) + + binary.Write(bw, binary.BigEndian, m.Afi) + binary.Write(bw, binary.BigEndian, m.Safi) + binary.Write(bw, binary.BigEndian, byte(m.GetNextHopLen())) + binary.Write(bw, binary.BigEndian, IPtoBytes(m.NextHop)) + binary.Write(bw, binary.BigEndian, byte(0)) + for i := range m.NLRI { + m.NLRI[i].Write(bw, m.EnableAddPath) + } +} +func (m BGPAttribute_MP_UNREACH) ContentLen() int { + var sum int + for i := range m.NLRI { + sum += m.NLRI[i].Len(m.EnableAddPath) + } + size := 3 + sum + return size +} + +func (m BGPAttribute_MP_UNREACH) Len() int { + size := m.ContentLen() + return AttributeHeaderLen(size) + size +} + +func (m BGPAttribute_MP_UNREACH) Write(bw io.Writer) { + WriteAttributeHeader(bw, m.ContentLen(), ATTRIBUTE_OPTIONAL, ATTRIBUTE_UNREACH) + + binary.Write(bw, binary.BigEndian, m.Afi) + binary.Write(bw, binary.BigEndian, m.Safi) + for i := range m.NLRI { + m.NLRI[i].Write(bw, m.EnableAddPath) + } +} + +func (m BGPAttribute_NEXTHOP) ContentLen() int { + ip := IPtoBytes(m.NextHop) + return len(ip) +} + +func (m BGPAttribute_NEXTHOP) Len() int { + size := m.ContentLen() + return AttributeHeaderLen(size) + size +} + +func (m BGPAttribute_NEXTHOP) Write(bw io.Writer) { + WriteAttributeHeader(bw, m.ContentLen(), ATTRIBUTE_TRANSITIVE, ATTRIBUTE_NEXTHOP) + + ip := IPtoBytes(m.NextHop) + binary.Write(bw, binary.BigEndian, ip) +} + +func (m BGPAttribute) Len() int { + size := len(m.Data) + return AttributeHeaderLen(size) + size +} + +func (m BGPAttribute) Write(bw io.Writer) { + WriteAttributeHeader(bw, len(m.Data), m.Flags, m.Code) + binary.Write(bw, binary.BigEndian, m.Data) +} + +/*func (m *BGPMessageUpdate) AddASNToPath(asn uint32) { + for i := range(m.PathAttributes) { + if v,ok := m.PathAttributes[i].(BGPAttribute_ASPATH); ok { + v.ASPath = append(v.ASPath, asn) + m.PathAttributes[i] = v + return + } + } + m.PathAttributes = append(m.PathAttributes, BGPAttribute_ASPATH{ + SType: 2, + ASPath: []uint32{asn,}, + }) +}*/ + +func ParseNLRI(b []byte, afi uint16, safi byte, path bool) ([]NLRI, error) { + prefixlist := make([]NLRI, 0) + + if afi != AFI_IPV4 && afi != AFI_IPV6 { + return prefixlist, errors.New(fmt.Sprintf("ParseNLRI: cannot decode this Afi/Safi %v/%v", afi, safi)) + } + + psize := 32 + asize := 4 + if afi == AFI_IPV6 { + psize = 128 + asize = 16 + } + + i := 0 + + for i < len(b) { + + var pathid uint32 + if path { + if len(b)-i < 5 { + return nil, errors.New(fmt.Sprintf("ParseNLRI: wrong NLRI size with add-path: %v < 5", len(b))) + } + pathid = uint32(b[i])<<24 | uint32(b[i+1])<<16 | uint32(b[i+2])<<8 | uint32(b[i+3]) + i += 4 + } + + length := int(b[i]) + lengthb := length + add := 0 + if length%8 != 0 { + add = 1 + } + length = length/8 + add + i++ + if i+length > len(b) { + return prefixlist, errors.New(fmt.Sprintf("ParseNLRI: wrong NLRI size: %v > %v", i+length, len(b))) + } + prefix := b[i : i+length] + + mask := net.CIDRMask(lengthb, psize) + ip := make([]byte, asize) + + if len(prefix) > len(ip) { + return prefixlist, errors.New(fmt.Sprintf("ParseNLRI: wrong IP size: %v > %v", len(prefix), len(ip))) + } + + for j := range prefix { + ip[j] = prefix[j] + } + ipnet := net.IPNet{ + IP: ip, + Mask: mask, + } + prefixlist = append(prefixlist, NLRI_IPPrefix{ + Prefix: ipnet, + PathId: pathid, + }) + + i += length + } + return prefixlist, nil +} + +func ParsePathAttribute(b []byte, addpathlist []AfiSafi, enc2bytes bool) ([]BGPAttributeIf, error) { + attributes := make([]BGPAttributeIf, 0) + i := 0 + for i < len(b) { + if len(b)-i < 3 { + return attributes, errors.New(fmt.Sprintf("ParsePathAttribute: attribute size (need 3 bytes, got %v)", len(b)-i)) + } + attrflag := b[i] + attrcode := b[i+1] + extended := byte((attrflag & ATTRIBUTE_EXTENDED) >> 4) + length := int(b[i+2]) + + if extended != 0 && i+3 > len(b)-1 { + return attributes, errors.New(fmt.Sprintf("ParsePathAttribute: wrong extended size: %v > %v", i+3, len(b)-1)) + } + + offset := 0 + if extended != 0 { + length = int(uint16(b[i+2])<<8 | uint16(b[i+3])) + offset = 1 + } + if i+offset+3+length > len(b) || i+offset+3 > len(b) { + return attributes, errors.New(fmt.Sprintf("ParsePathAttribute: wrong size: %v > %v or %v > %v (ext: %v / %v)", i+offset+3+length, len(b), i+offset+3, len(b), i, extended)) + } + + data := b[i+offset+3 : i+offset+3+length] + + if i+3+offset+length > len(b) { + return attributes, errors.New(fmt.Sprintf("ParsePathAttribute: wrong attribute size: %v > %v", i+3+length, len(b))) + } + + var intf SerializableInterface + buf := bytes.NewBuffer(data) + + switch attrcode { + case ATTRIBUTE_ORIGIN: + o := byte(2) + if len(data) > 0 { + o = data[0] + } else { + return attributes, errors.New(fmt.Sprintf("ParsePathAttribute: empty data for ORIGIN attribute")) + } + a := BGPAttribute_ORIGIN{ + Origin: o, + } + intf = a + case ATTRIBUTE_MED: + a := BGPAttribute_MED{} + binary.Read(buf, binary.BigEndian, &(a.Med)) + intf = a + case ATTRIBUTE_AGGREGATOR: + a := BGPAttribute_AGGREGATOR{} + if len(data) == 8 && !enc2bytes { + binary.Read(buf, binary.BigEndian, &(a.ASN)) + a.Identifier = data[4:8] + } else if len(data) == 6 && enc2bytes { + var tmpas uint16 + binary.Read(buf, binary.BigEndian, &tmpas) + a.ASN = uint32(tmpas) + a.Identifier = data[2:6] + } + a.Enc2Bytes = enc2bytes + intf = a + case ATTRIBUTE_ATOMIC_AGGREGATE: + a := BGPAttribute_ATOMIC_AGGREGATE{} + intf = a + case ATTRIBUTE_LOCPREF: + a := BGPAttribute_LOCPREF{} + binary.Read(buf, binary.BigEndian, &(a.LocPref)) + intf = a + case ATTRIBUTE_ASPATH: + a := BGPAttribute_ASPATH{Segments: make([]ASPath_Segment, 0)} + + var aslen byte + var err_rd error + var stype byte + stype, err_rd = buf.ReadByte() + for err_rd == nil { + aslen, err_rd = buf.ReadByte() + if err_rd != nil { + break + } + + s := ASPath_Segment{SType: stype, ASPath: make([]uint32, 0)} + + if err_rd != nil { + break + } + if !enc2bytes { + var tmpas uint32 + if err_rd != nil { + break + } + for j := 0; j < int(aslen) && j <= 255; j++ { + binary.Read(buf, binary.BigEndian, &tmpas) + s.ASPath = append(s.ASPath, tmpas) + } + + } else { + var tmpas uint16 + if err_rd != nil { + break + } + for j := 0; j < int(aslen) && j <= 255; j++ { + binary.Read(buf, binary.BigEndian, &tmpas) + s.ASPath = append(s.ASPath, uint32(tmpas)) + } + + } + a.Segments = append(a.Segments, s) + stype, err_rd = buf.ReadByte() + if err_rd != nil { + break + } + } + a.Enc2Bytes = enc2bytes + intf = a + case ATTRIBUTE_NEXTHOP: + intf = BGPAttribute_NEXTHOP{NextHop: data[0:4]} + case ATTRIBUTE_COMMUNITIES: + a := BGPAttribute_COMMUNITIES{Communities: make([]uint32, length/4)} + for j := 0; j < length/4; j++ { + binary.Read(buf, binary.BigEndian, &(a.Communities[j])) + } + intf = a + case ATTRIBUTE_REACH: + a := BGPAttribute_MP_REACH{} + binary.Read(buf, binary.BigEndian, &(a.Afi)) + binary.Read(buf, binary.BigEndian, &(a.Safi)) + nhlen, _ := buf.ReadByte() + nh := make([]byte, nhlen) + buf.Read(nh) + a.NextHop = nh + buf.ReadByte() + parseinfo := InAfiSafi(a.Afi, a.Safi, addpathlist) + a.NLRI, _ = ParseNLRI(buf.Bytes(), a.Afi, a.Safi, parseinfo) + a.EnableAddPath = parseinfo + intf = a + case ATTRIBUTE_UNREACH: + a := BGPAttribute_MP_UNREACH{} + binary.Read(buf, binary.BigEndian, &(a.Afi)) + binary.Read(buf, binary.BigEndian, &(a.Safi)) + parseinfo := InAfiSafi(a.Afi, a.Safi, addpathlist) + a.NLRI, _ = ParseNLRI(buf.Bytes(), a.Afi, a.Safi, parseinfo) + intf = a + default: + intf = BGPAttribute{ + attrflag, + attrcode, + data, + } + } + + attributes = append(attributes, intf) + + i += 3 + length + offset + } + return attributes, nil +} + +func ParseUpdate(b []byte, addpathlist []AfiSafi, enc2bytes bool) (*BGPMessageUpdate, error) { + m := &BGPMessageUpdate{} + var err error + + addpath_ipv4uni := InAfiSafi(AFI_IPV4, SAFI_UNICAST, addpathlist) + m.EnableAddPath = addpath_ipv4uni + + if len(b) < 4 { + return nil, errors.New(fmt.Sprintf("ParseUpdate: wrong withdrawn routes size: %v < 4", len(b))) + } + + wdrouteslen := int(uint16(b[0])<<8 | uint16(b[1])) + + if wdrouteslen+4 > len(b) { + return nil, errors.New(fmt.Sprintf("ParseUpdate: wrong withdrawn routes size: %v > %v", wdrouteslen+4, len(b))) + } + offset := 2 + withdrawnroutes := b[offset : offset+wdrouteslen] + m.WithdrawnRoutes, err = ParseNLRI(withdrawnroutes, AFI_IPV4, SAFI_UNICAST, addpath_ipv4uni) + if err != nil { + return nil, err + } + offset += wdrouteslen + + tplen := int(uint16(b[offset])<<8 | uint16(b[offset+1])) + offset += 2 + + if tplen+offset > len(b) { + return nil, errors.New(fmt.Sprintf("ParseUpdate: wrong total path size: %v > %v", tplen+offset, len(b))) + } + pathattributes := b[offset : offset+tplen] + m.PathAttributes, err = ParsePathAttribute(pathattributes, addpathlist, enc2bytes) + if err != nil { + return m, err + } + + offset += tplen + NLRI := b[offset:] + m.NLRI, err = ParseNLRI(NLRI, AFI_IPV4, SAFI_UNICAST, addpath_ipv4uni) + if err != nil { + return m, err + } + + return m, nil +} + +func CraftUpdateMessage() *BGPMessageUpdate { + //_, prefix, _ := net.ParseCIDR("8.8.8.8/24") + + _, prefixtest, _ := net.ParseCIDR("2001::/46") + ip := net.ParseIP("2002::1") + pa := []BGPAttributeIf{ + BGPAttribute_ORIGIN{ + Origin: 2, + }, + BGPAttribute_ASPATH{ + Segments: []ASPath_Segment{ASPath_Segment{ASPath: []uint32{65001}}}, + }, + BGPAttribute_COMMUNITIES{ + Communities: []uint32{0x7b0929}, + }, + //BGPAttribute_NEXTHOP{ + // NextHop: []byte{1,2,3,4}, + //}, + BGPAttribute_MP_REACH{ + Afi: AFI_IPV6, + Safi: SAFI_UNICAST, + NextHop: ip, + //enableAddPath: true, + NLRI: []NLRI{ + NLRI_IPPrefix{ + PathId: uint32(time.Now().UTC().Unix()), + Prefix: *prefixtest, + }, + }, + }, + } + + m := &BGPMessageUpdate{ + //NLRI: []NLRI{NLRI{Prefix: *prefix,PathId:123,},}, + PathAttributes: pa, + } + + return m +} diff --git a/messages/messages.go b/messages/messages.go new file mode 100644 index 0000000..56e7271 --- /dev/null +++ b/messages/messages.go @@ -0,0 +1,473 @@ +package messages + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "io" + "net" + "time" +) + +var ( + errCodeToStr = map[int]string{ + 1: "Message Header Error", + 2: "OPEN Message Error", + 3: "UPDATE Message Error", + 4: "Hold Timer Expired", + 5: "Finite State Machine Error", + 6: "Cease", + } + errSubCodeToStr = map[int]map[int]string{ + 1: map[int]string{ + 1: "Connection Not Synchronized.", + 2: "Bad Message Length.", + 3: "Bad Message Type.", + }, + 2: map[int]string{ + 1: "Unsupported Version Number.", + 2: "Bad Peer AS.", + 3: "Bad BGP Identifier.", + 4: "Unsupported Optional Parameter.", + 6: "Unacceptable Hold Time.", + }, + 3: map[int]string{ + 1: "Malformed Attribute List.", + 2: "Unrecognized Well-known Attribute.", + 3: "Missing Well-known Attribute.", + 4: "Attribute Flags Error.", + 5: "Attribute Length Error.", + 6: "Invalid ORIGIN Attribute.", + 8: "Invalid NEXT_HOP Attribute.", + 9: "Optional Attribute Error.", + 10: "Invalid Network Field.", + 11: "Malformed AS_PATH.", + }, + } + CapaDescr = map[int]string{ + 1: "Multiprotocol Extensions for BGP-4", + 2: "Route Refresh Capability for BGP-4", + 3: "Outbound Route Filtering Capability", + 4: "Multiple routes to a destination capability", + 5: "Extended Next Hop Encoding", + 6: "BGP-Extended Message", + 7: "BGPsec Capability", + 8: "Multiple Labels Capability", + 64: "Graceful Restart Capability", + 65: "Support for 4-octet AS number capability", + 66: "Deprecated (2003-03-06)", + 67: "Support for Dynamic Capability (capability specific)", + 68: "Multisession BGP Capability", + 69: "ADD-PATH Capability", + 70: "Enhanced Route Refresh Capability", + 71: "Long-Lived Graceful Restart (LLGR) Capability", + 72: "Unassigned", + 73: "FQDN Capability", + } + BgpAttributes = map[int]string{ + 0: "Reserved", + ATTRIBUTE_ORIGIN: "ORIGIN", + ATTRIBUTE_ASPATH: "AS_PATH", + ATTRIBUTE_NEXTHOP: "NEXT_HOP", + ATTRIBUTE_MED: "MULTI_EXIT_DISC", + ATTRIBUTE_LOCPREF: "LOCAL_PREF", + 6: "ATOMIC_AGGREGATE", + 7: "AGGREGATOR", + ATTRIBUTE_COMMUNITIES: "COMMUNITY", + 9: "ORIGINATOR_ID", + 10: "CLUSTER_LIST", + 11: "DPA (deprecated)", + 12: "ADVERTISER (historic) (deprecated)", + 13: "RCID_PATH / CLUSTER_ID (Historic) (deprecated)", + ATTRIBUTE_REACH: "MP_REACH_NLRI", + ATTRIBUTE_UNREACH: "MP_UNREACH_NLRI", + 16: "EXTENDED COMMUNITIES", + ATTRIBUTE_AS4PATH: "AS4_PATH", + 18: "AS4_AGGREGATOR", + 19: "SAFI Specific Attribute (SSA) (deprecated)", + 20: "Connector Attribute (deprecated)", + 21: "AS_PATHLIMIT (deprecated)", + 22: "PMSI_TUNNEL", + 23: "Tunnel Encapsulation Attribute", + 24: "Traffic Engineering", + 25: "IPv6 Address Specific Extended Community", + 26: "AIGP", + 27: "PE Distinguisher Labels", + 28: "BGP Entropy Label Capability Attribute (deprecated)", + 29: "BGP-LS Attribute", + 30: "Deprecated", + 31: "Deprecated", + 32: "LARGE_COMMUNITY", + 33: "BGPsec_Path", + 34: "BGP Community Container Attribute", + 40: "BGP Prefix-SID", + 128: "ATTR_SET", + 129: "Deprecated", + 241: "Deprecated", + 242: "Deprecated", + 243: "Deprecated", + } + Afi = map[string]uint16{ + "ipv4": AFI_IPV4, + "ipv6": AFI_IPV6, + } + AfiToStr = map[uint16]string{ + AFI_IPV4: "ipv4", + AFI_IPV6: "ipv6", + } + Safi = map[string]byte{ + "unicast": SAFI_UNICAST, + "multicast": SAFI_MULTICAST, + } + SafiToStr = map[byte]string{ + SAFI_UNICAST: "unicast", + SAFI_MULTICAST: "multicast", + } +) + +const ( + MESSAGE_OPEN = 1 + MESSAGE_UPDATE = 2 + MESSAGE_NOTIFICATION = 3 + MESSAGE_KEEPALIVE = 4 + MESSAGE_ROUTEREFRESH = 5 + + CAPA_MP = 1 + CAPA_ASN = 65 + CAPA_ADDPATH = 69 + CAPA_RR = 2 + + ATTRIBUTE_ORIGIN = 1 + ATTRIBUTE_ASPATH = 2 + ATTRIBUTE_NEXTHOP = 3 + ATTRIBUTE_MED = 4 + ATTRIBUTE_LOCPREF = 5 + ATTRIBUTE_ATOMIC_AGGREGATE = 6 + ATTRIBUTE_AGGREGATOR = 7 + ATTRIBUTE_COMMUNITIES = 8 + ATTRIBUTE_REACH = 14 + ATTRIBUTE_UNREACH = 15 + ATTRIBUTE_AS4PATH = 17 + + ATTRIBUTE_TRANSITIVE = 0x40 + ATTRIBUTE_TRANSITIVEOPT = 0xC0 + ATTRIBUTE_OPTIONAL = 0x80 + ATTRIBUTE_EXTENDED = 0x16 + + PARAMETER_CAPA = 2 + + AFI_IPV4 = 1 + AFI_IPV6 = 2 + + SAFI_UNICAST = 1 + SAFI_MULTICAST = 2 +) + +type NLRI interface { + GetAfi() uint16 + GetSafi() byte + Len(addpath bool) int + Write(w io.Writer, addpath bool) + Bytes(addpath bool) []byte + String() string + Equals(nlri NLRI) bool +} + +type NLRI_IPPrefix struct { + Prefix net.IPNet + PathId uint32 +} + +func (m NLRI_IPPrefix) Equals(mm NLRI) bool { + mmc, ok := mm.(NLRI_IPPrefix) + if ok && mmc.GetAfi() == m.GetAfi() && mmc.GetSafi() == m.GetSafi() && + mmc.PathId == m.PathId && + mmc.Prefix.IP.Equal(m.Prefix.IP) && + bytes.Equal(mmc.Prefix.Mask, m.Prefix.Mask) { + return true + } + + return false +} + +func (m NLRI_IPPrefix) GetAfi() uint16 { + if m.Prefix.IP.To4() != nil { + return AFI_IPV4 + } else if m.Prefix.IP.To16() != nil { + return AFI_IPV6 + } else { + return 0 + } +} + +func (n NLRI_IPPrefix) GetSafi() byte { + return SAFI_UNICAST +} + +func (n NLRI_IPPrefix) String() string { + return fmt.Sprintf("PathId: %v / Prefix: %v", n.PathId, n.Prefix.String()) +} + +func (n NLRI_IPPrefix) Len(addpath bool) int { + add := 0 + if addpath { + add = 4 + } + return add + 1 + n.GetSplitLen() +} + +func (n NLRI_IPPrefix) GetSplitLen() int { + ones, _ := n.Prefix.Mask.Size() + add := 0 + if ones%8 != 0 { + add = 1 + } + return ones/8 + add +} + +func (n NLRI_IPPrefix) Write(w io.Writer, addpath bool) { + if addpath { + binary.Write(w, binary.BigEndian, n.PathId) + } + + ones, _ := n.Prefix.Mask.Size() + binary.Write(w, binary.BigEndian, byte(ones)) + length := n.GetSplitLen() + + for i := 0; i < length; i++ { + binary.Write(w, binary.BigEndian, n.Prefix.IP[i]) + } +} + +func (n NLRI_IPPrefix) Bytes(addpath bool) []byte { + buf := make([]byte, 0) + w := bytes.NewBuffer(buf) + + n.Write(w, addpath) + + return w.Bytes() +} + +type AfiSafi struct { + Afi uint16 + Safi byte +} + +/* +type NLRI struct { + Prefix net.IPNet + PathId uint32 +}*/ + +type SerializableInterface interface { + //Bytes() []byte + String() string + Write(w io.Writer) + Len() int +} + +type BGPMessageHead struct { + Received time.Time +} + +type BGPMessageKeepAlive struct { + BGPMessageHead +} + +type BGPMessageRouteRefresh struct { + BGPMessageHead + AfiSafi AfiSafi +} + +type BGPMessageNotification struct { + BGPMessageHead + ErrorCode byte + ErrorSubcode byte + Data []byte +} + +func (m AfiSafi) String() string { + return fmt.Sprintf("%v-%v (%v) (%v)", AfiToStr[m.Afi], SafiToStr[m.Safi], m.Afi, m.Safi) +} + +func (ap AddPath) EqualsAfiSafi(comp AddPath) bool { + return ap.Afi == comp.Afi && ap.Safi == comp.Safi +} + +func (p AddPath) String() string { + return fmt.Sprintf("Afi: %v-%v (%v) (%v) / TxRx: %v", AfiToStr[p.Afi], SafiToStr[p.Safi], p.Afi, p.Safi, p.TxRx) +} + +func (m BGPMessageKeepAlive) String() string { + str := "BGP KeepAlive\n" + return str +} + +func (m BGPMessageKeepAlive) Write(bw io.Writer) { + WriteBGPHeader(MESSAGE_KEEPALIVE, 0, bw) +} + +func (m BGPMessageKeepAlive) Len() int { + return GetBGPHeaderLen() +} + +func (m BGPMessageKeepAlive) Bytes() []byte { + buf := make([]byte, 0) + bw := bytes.NewBuffer(buf) + + m.Write(bw) + + return bw.Bytes() +} + +func (m BGPMessageRouteRefresh) String() string { + str := fmt.Sprintf("BGP Route Refresh %v\n", m.AfiSafi.String()) + return str +} + +func (m BGPMessageRouteRefresh) Len() int { + return GetBGPHeaderLen() + 4 +} + +func (m BGPMessageRouteRefresh) Write(bw io.Writer) { + WriteBGPHeader(MESSAGE_ROUTEREFRESH, 4, bw) + binary.Write(bw, binary.BigEndian, m.AfiSafi.Afi) + binary.Write(bw, binary.BigEndian, byte(0)) + binary.Write(bw, binary.BigEndian, m.AfiSafi.Safi) +} + +/* +func (m NLRI) String() string { + return fmt.Sprintf("PathId: %v / Prefix: %v", m.PathId, m.Prefix.String()) +} + +func (n NLRI) Bytes(addpath bool) []byte { + buf := make([]byte, 0) + bw := bytes.NewBuffer(buf) + + if addpath { + binary.Write(bw, binary.BigEndian, n.PathId) + } + + ones, _ := n.Prefix.Mask.Size() + bw.WriteByte(byte(ones)) + + add := 0 + if ones%8 != 0 { + add = 1 + } + length := ones/8 + add + + for i := 0; i= 19 { + length := uint16(b[16])<<8 | uint16(b[17]) + if length < 19 || length > 4096 { + return 0, 0, errors.New(fmt.Sprintf("BGP Packet parser: wrong length: 19: !<= %v !<= 4096", length)) + } + if length < 19 { + return 0, 0, errors.New(fmt.Sprintf("BGP Packet parser: wrong length: 19: !<= %v", length)) + } + + length -= 19 + bgptype := b[18] + //log.Debugf("ParsePacketHeader: len: %v type: %v", length, bgptype) + return bgptype, length, nil + } else { + return 0, 0, errors.New(fmt.Sprintf("BGP Packet parser: wrong header size: %v < 19", len(b))) + } +} + +func ParseKeepAlive() (*BGPMessageKeepAlive, error) { + return &BGPMessageKeepAlive{}, nil +} + +func ParseNotification(b []byte) (*BGPMessageNotification, error) { + if len(b) < 2 { + return nil, errors.New(fmt.Sprintf("ParseNotification: wrong open size: %v < 2", len(b))) + } + + errCode := b[0] + errSubcode := b[1] + errData := make([]byte, 0) + if len(b) >= 2 { + errData = b[2:] + } + + r := &BGPMessageNotification{ + BGPMessageHead{time.Now()}, + errCode, + errSubcode, + errData, + } + + log.Errorf("ParseNotification: %v", r.String()) + return r, nil +} + +func ParsePacket(bgptype byte, b []byte) (SerializableInterface, error) { + switch bgptype { + case MESSAGE_OPEN: + openmsg, err := ParseOpen(b) + return openmsg, err + case MESSAGE_UPDATE: + return ParseUpdate(b, nil, false) + case MESSAGE_NOTIFICATION: + return ParseNotification(b) + case MESSAGE_KEEPALIVE: + return ParseKeepAlive() + default: + return nil, errors.New(fmt.Sprintf("Unknown packet type: %v", bgptype)) + } + return nil, nil +} + +func CraftKeepAliveMessage() *BGPMessageKeepAlive { + return &BGPMessageKeepAlive{} +} + +func GetBGPHeaderLen() int { + return 19 +} + +func WriteBGPHeader(bgptype byte, size uint16, bw io.Writer) { + bw.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}) + binary.Write(bw, binary.BigEndian, uint16(19+size)) + binary.Write(bw, binary.BigEndian, bgptype) +} diff --git a/mrt/mrt.go b/mrt/mrt.go new file mode 100644 index 0000000..89c7b04 --- /dev/null +++ b/mrt/mrt.go @@ -0,0 +1,812 @@ +package mrt + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "github.com/cloudflare/fgbgp/messages" + "io" + "net" + "time" +) + +const ( + TYPE_OSPFV2 = 11 + TYPE_TABLE_DUMP = 12 + TYPE_TABLE_DUMPV2 = 13 + + TYPE_BGP4MP = 16 + TYPE_BGP4MP_ET = 17 + + TYPE_ISIS = 32 + TYPE_ISIS_ET = 33 + + TYPE_OSPFV3 = 48 + TYPE_OSPFV3_ET = 49 + + SUBT_TABLE_DUMP_AFI_IPV4 = 1 + SUBT_TABLE_DUMP_AFI_IPV6 = 2 + + SUBT_TABLE_DUMPV2_PEER_INDEX_TABLE = 1 + SUBT_TABLE_DUMPV2_RIB_IPV4_UNICAST = 2 + SUBT_TABLE_DUMPV2_RIB_IPV4_MULTICAST = 3 + SUBT_TABLE_DUMPV2_RIB_IPV6_UNICAST = 4 + SUBT_TABLE_DUMPV2_RIB_IPV6_MULTICAST = 5 + SUBT_TABLE_DUMPV2_RIB_GENERIC = 6 + + SUBT_BGP4MP_STATE_CHANGE = 0 + SUBT_BGP4MP_MESSAGE = 1 + SUBT_BGP4MP_MESSAGE_AS4 = 4 + SUBT_BGP4MP_STATE_CHANGE_AS4 = 5 + SUBT_BGP4MP_MESSAGE_LOCAL = 6 + SUBT_BGP4MP_MESSAGE_AS4_LOCAL = 7 + + STATE_IDLE = 1 + STATE_CONNECT = 2 + STATE_ACTIVE = 3 + STATE_OPENSENT = 4 + STATE_OPENCONFIRM = 5 + STATE_ESTABLISHED = 6 +) + +type Mrt interface { + Write(io.Writer) + Len() int +} + +func WriteCommonHeader(buf io.Writer, timestamp time.Time, mrttype uint16, subtype uint16, length uint32) { + binary.Write(buf, binary.BigEndian, uint32(timestamp.Unix())) + binary.Write(buf, binary.BigEndian, mrttype) + binary.Write(buf, binary.BigEndian, subtype) + binary.Write(buf, binary.BigEndian, length) +} + +type Peer struct { + Id net.IP + IP net.IP + ASN uint32 +} + +func (p *Peer) Write(buf io.Writer) { + newip := p.IP + firstbit := 1 + if tmpip := newip.To4(); tmpip != nil { + newip = tmpip + firstbit = 0 + } + + longasn := 1 + if p.ASN <= 0xffff { + longasn = 0 + } + + firstbyte := byte(longasn<<2 | firstbit) + binary.Write(buf, binary.BigEndian, firstbyte) + + binary.Write(buf, binary.BigEndian, p.Id.To4()) + + binary.Write(buf, binary.BigEndian, newip) + if longasn == 1 { + binary.Write(buf, binary.BigEndian, p.ASN) + } else { + binary.Write(buf, binary.BigEndian, uint16(p.ASN)) + } +} + +func (p *Peer) Len() int { + iplen := 4 + newip := p.IP + if tmpip := newip.To4(); tmpip == nil { + iplen = 16 + } + longasn := 4 + if p.ASN <= 0xffff { + longasn = 2 + } + return 1 + 4 + iplen + longasn +} + +type MrtTableDumpV2_PeerIndex struct { + Timestamp time.Time + CollectorId net.IP + ViewName string + Peers []*Peer +} + +func NewMrtTableDumpV2_PeerIndex(collectorid net.IP, viewname string, ts time.Time) *MrtTableDumpV2_PeerIndex { + return &MrtTableDumpV2_PeerIndex{ + Timestamp: ts, + CollectorId: collectorid, + ViewName: viewname, + Peers: make([]*Peer, 0), + } +} + +func (mrt *MrtTableDumpV2_PeerIndex) AddPeer(id net.IP, asn uint32, ip net.IP) uint16 { + peer := &Peer{ + Id: id.To4(), + IP: ip, + ASN: asn, + } + mrt.Peers = append(mrt.Peers, peer) + return uint16(len(mrt.Peers) - 1) +} + +func (mrt *MrtTableDumpV2_PeerIndex) Write(buf io.Writer) { + WriteCommonHeader(buf, mrt.Timestamp, TYPE_TABLE_DUMPV2, SUBT_TABLE_DUMPV2_PEER_INDEX_TABLE, uint32(mrt.Len())) + binary.Write(buf, binary.BigEndian, mrt.CollectorId.To4()) + binary.Write(buf, binary.BigEndian, uint16(len(mrt.ViewName))) + binary.Write(buf, binary.BigEndian, []byte(mrt.ViewName)) + binary.Write(buf, binary.BigEndian, uint16(len(mrt.Peers))) + for i := range mrt.Peers { + mrt.Peers[i].Write(buf) + } +} + +func (mrt *MrtTableDumpV2_PeerIndex) Len() int { + totallen := 4 + 2 + len(mrt.ViewName) + 2 + for i := range mrt.Peers { + totallen += mrt.Peers[i].Len() + } + return totallen +} + +type RibEntry struct { + PeerIndex uint16 + OrigTime time.Time + Attributes []messages.BGPAttributeIf +} + +func (entry *RibEntry) Write(buf io.Writer) { + binary.Write(buf, binary.BigEndian, entry.PeerIndex) + binary.Write(buf, binary.BigEndian, uint32(entry.OrigTime.Unix())) + + var size uint16 + + for i := range entry.Attributes { + switch attribute := entry.Attributes[i].(type) { + case *messages.BGPAttribute_MP_REACH: + size += attribute.LenMrt() + case *messages.BGPAttribute_MP_UNREACH: + default: + size += uint16(entry.Attributes[i].Len()) + } + } + + binary.Write(buf, binary.BigEndian, size) + + for i := range entry.Attributes { + switch attribute := entry.Attributes[i].(type) { + case *messages.BGPAttribute_MP_REACH: + attribute.WriteMrt(buf) + case *messages.BGPAttribute_MP_UNREACH: + default: + entry.Attributes[i].Write(buf) + } + } +} + +func (entry *RibEntry) Len() uint32 { + size := uint32(2 + 4 + 2) + // To optimize + for i := range entry.Attributes { + switch attribute := entry.Attributes[i].(type) { + case *messages.BGPAttribute_MP_REACH: + size += uint32(attribute.LenMrt()) + case *messages.BGPAttribute_MP_UNREACH: + default: + size += uint32(entry.Attributes[i].Len()) + } + + } + return size +} + +type MrtTableDumpV2_Rib struct { + Timestamp time.Time + SequenceNumber uint32 + Afi uint16 + Safi byte + NLRI messages.NLRI + RibEntries []*RibEntry + + WriteAsAfiSafi bool +} + +func NewMrtTableDumpV2_RibGeneric(seqnum uint32, afi uint16, safi byte, nlri messages.NLRI, ts time.Time) *MrtTableDumpV2_Rib { + return &MrtTableDumpV2_Rib{ + Timestamp: ts, + SequenceNumber: seqnum, + Afi: afi, + Safi: safi, + NLRI: nlri, + RibEntries: make([]*RibEntry, 0), + } +} + +func NewMrtTableDumpV2_RibAfiSafi(seqnum uint32, afi uint16, safi byte, nlri messages.NLRI, ts time.Time) *MrtTableDumpV2_Rib { + mrt := NewMrtTableDumpV2_RibGeneric(seqnum, afi, safi, nlri, ts) + mrt.WriteAsAfiSafi = true + return mrt +} + +func (entry *RibEntry) EntryToUpdate() *messages.BGPMessageUpdate { + update := &messages.BGPMessageUpdate{ + PathAttributes: entry.Attributes, + } + return update +} + +func (mrt *MrtTableDumpV2_Rib) ConvertToUpdateIndex(index int) *messages.BGPMessageUpdate { + re := mrt.RibEntries + if index >= len(re) { + return nil + } + entry := re[index] + update := entry.EntryToUpdate() + + if mrt.NLRI.GetAfi() == messages.AFI_IPV6 { + pa := update.PathAttributes + var hasreach bool + for i := range pa { + switch pai := pa[i].(type) { + case messages.BGPAttribute_MP_REACH: + // Check NLRI already in + var hasipinreach bool + hasreach = true + + mpnlri := pai.NLRI + for j := range mpnlri { + if mrt.NLRI.Equals(mpnlri[j]) { + hasipinreach = true + break + } + } + + if !hasipinreach { + pai.NLRI = append(pai.NLRI, mrt.NLRI) + } + + } + } + if !hasreach { + attr := &messages.BGPAttribute_MP_REACH{ + NLRI: []messages.NLRI{mrt.NLRI}, + } + update.PathAttributes = append(update.PathAttributes, attr) + } + } else { + update.NLRI = []messages.NLRI{mrt.NLRI} + } + return update +} +func (mrt *MrtTableDumpV2_Rib) ConvertToUpdate() []*messages.BGPMessageUpdate { + updates := make([]*messages.BGPMessageUpdate, 0) + re := mrt.RibEntries + for i := range re { + update := mrt.ConvertToUpdateIndex(i) + if update != nil { + updates = append(updates, update) + } + } + return updates +} + +func (mrt *MrtTableDumpV2_Rib) AddEntry(peerindex uint16, origtime time.Time, attributes []messages.BGPAttributeIf) { + entry := &RibEntry{ + PeerIndex: peerindex, + OrigTime: origtime, + Attributes: attributes, + } + mrt.RibEntries = append(mrt.RibEntries, entry) +} + +func (mrt *MrtTableDumpV2_Rib) GetSubtype() (bool, uint16) { + subt := uint16(SUBT_TABLE_DUMPV2_RIB_GENERIC) + force_generic := true + + if mrt.WriteAsAfiSafi { + if mrt.Afi == messages.AFI_IPV4 && mrt.Safi == messages.SAFI_UNICAST { + force_generic = false + subt = SUBT_TABLE_DUMPV2_RIB_IPV4_UNICAST + } else if mrt.Afi == messages.AFI_IPV4 && mrt.Safi == messages.SAFI_MULTICAST { + force_generic = false + subt = SUBT_TABLE_DUMPV2_RIB_IPV4_MULTICAST + } else if mrt.Afi == messages.AFI_IPV6 && mrt.Safi == messages.SAFI_UNICAST { + force_generic = false + subt = SUBT_TABLE_DUMPV2_RIB_IPV6_UNICAST + } else if mrt.Afi == messages.AFI_IPV6 && mrt.Safi == messages.SAFI_MULTICAST { + force_generic = false + subt = SUBT_TABLE_DUMPV2_RIB_IPV6_MULTICAST + } + } + return force_generic, subt +} + +func (mrt *MrtTableDumpV2_Rib) Write(buf io.Writer) { + force_generic, subt := mrt.GetSubtype() + + WriteCommonHeader(buf, mrt.Timestamp, TYPE_TABLE_DUMPV2, subt, uint32(mrt.Len())) + if force_generic { + binary.Write(buf, binary.BigEndian, mrt.Afi) + binary.Write(buf, binary.BigEndian, mrt.Safi) + } + + binary.Write(buf, binary.BigEndian, mrt.SequenceNumber) + binary.Write(buf, binary.BigEndian, mrt.NLRI.Bytes(false)) + binary.Write(buf, binary.BigEndian, uint16(len(mrt.RibEntries))) + for i := range mrt.RibEntries { + mrt.RibEntries[i].Write(buf) + } +} + +func (mrt *MrtTableDumpV2_Rib) Len() int { + force_generic, _ := mrt.GetSubtype() + size := 4 + len(mrt.NLRI.Bytes(false)) + 2 + if force_generic { + size += 2 + } + for i := range mrt.RibEntries { + size += int(mrt.RibEntries[i].Len()) + } + return size +} + +type MrtBGP4MP_Msg_AS4 struct { + Timestamp time.Time + PeerAS uint32 + LocalAS uint32 + IfaceIndex uint16 + PeerIP net.IP + LocalIP net.IP + Message messages.SerializableInterface +} + +type MrtBGP4MP_StateChange_AS4 struct { + Timestamp time.Time + PeerAS uint32 + LocalAS uint32 + IfaceIndex uint16 + PeerIP net.IP + LocalIP net.IP + OldState uint16 + NewState uint16 +} + +func NewMrtBGP4MP_StateChange_AS4(peeras uint32, localas uint32, iface uint16, peerip net.IP, localip net.IP, oldstate uint16, newstate uint16) *MrtBGP4MP_StateChange_AS4 { + return &MrtBGP4MP_StateChange_AS4{ + Timestamp: time.Now().UTC(), + PeerAS: peeras, + LocalAS: localas, + IfaceIndex: iface, + PeerIP: peerip, + LocalIP: localip, + OldState: oldstate, + NewState: newstate, + } +} + +func (mrt *MrtBGP4MP_StateChange_AS4) IsIPv4() bool { + return mrt.PeerIP.To4() != nil +} + +func (mrt *MrtBGP4MP_StateChange_AS4) Len() int { + ipsize := 4 + if !mrt.IsIPv4() { + ipsize = 16 + } + return 4 + 4 + 2 + 2 + 2*ipsize + 2 + 2 +} + +func (mrt *MrtBGP4MP_StateChange_AS4) Write(buf io.Writer) { + WriteCommonHeader(buf, mrt.Timestamp, TYPE_BGP4MP, SUBT_BGP4MP_STATE_CHANGE_AS4, uint32(mrt.Len())) + + binary.Write(buf, binary.BigEndian, mrt.PeerAS) + binary.Write(buf, binary.BigEndian, mrt.LocalAS) + + binary.Write(buf, binary.BigEndian, mrt.IfaceIndex) + if mrt.IsIPv4() { + binary.Write(buf, binary.BigEndian, uint16(messages.AFI_IPV4)) + binary.Write(buf, binary.BigEndian, mrt.PeerIP.To4()) + binary.Write(buf, binary.BigEndian, mrt.LocalIP.To4()) + } else { + binary.Write(buf, binary.BigEndian, uint16(messages.AFI_IPV6)) + binary.Write(buf, binary.BigEndian, mrt.PeerIP) + binary.Write(buf, binary.BigEndian, mrt.LocalIP) + } + + binary.Write(buf, binary.BigEndian, mrt.OldState) + binary.Write(buf, binary.BigEndian, mrt.NewState) +} + +func NewMrtBGP4MP_Msg_AS4(peeras uint32, localas uint32, iface uint16, peerip net.IP, localip net.IP, message messages.SerializableInterface) *MrtBGP4MP_Msg_AS4 { + return &MrtBGP4MP_Msg_AS4{ + Timestamp: time.Now().UTC(), + PeerAS: peeras, + LocalAS: localas, + IfaceIndex: iface, + PeerIP: peerip, + LocalIP: localip, + Message: message, + } +} + +func (mrt *MrtBGP4MP_Msg_AS4) IsIPv4() bool { + return mrt.PeerIP.To4() != nil +} + +func (mrt *MrtBGP4MP_Msg_AS4) Len() int { + ipsize := 4 + if !mrt.IsIPv4() { + ipsize = 16 + } + return 4 + 4 + 2 + 2 + 2*ipsize + mrt.Message.Len() +} + +func (mrt *MrtBGP4MP_Msg_AS4) Write(buf io.Writer) { + WriteCommonHeader(buf, mrt.Timestamp, TYPE_BGP4MP, SUBT_BGP4MP_MESSAGE_AS4, uint32(mrt.Len())) + + binary.Write(buf, binary.BigEndian, mrt.PeerAS) + binary.Write(buf, binary.BigEndian, mrt.LocalAS) + + binary.Write(buf, binary.BigEndian, mrt.IfaceIndex) + if mrt.IsIPv4() { + binary.Write(buf, binary.BigEndian, uint16(messages.AFI_IPV4)) + binary.Write(buf, binary.BigEndian, mrt.PeerIP.To4()) + binary.Write(buf, binary.BigEndian, mrt.LocalIP.To4()) + } else { + binary.Write(buf, binary.BigEndian, uint16(messages.AFI_IPV6)) + binary.Write(buf, binary.BigEndian, mrt.PeerIP) + binary.Write(buf, binary.BigEndian, mrt.LocalIP) + } + mrt.Message.Write(buf) +} + +func DecodeBGP4MP(buf io.Reader, timestamp time.Time, subtype uint16, length uint32) (Mrt, error) { + switch subtype { + case SUBT_BGP4MP_MESSAGE_AS4: + var peeras uint32 + var localas uint32 + var ifaceindex uint16 + var afi uint16 + binary.Read(buf, binary.BigEndian, peeras) + binary.Read(buf, binary.BigEndian, localas) + binary.Read(buf, binary.BigEndian, ifaceindex) + binary.Read(buf, binary.BigEndian, afi) + var peerip []byte + var localip []byte + sizeip := 4 + if afi == messages.AFI_IPV6 { + sizeip = 16 + } + peerip = make([]byte, sizeip) + localip = make([]byte, sizeip) + binary.Read(buf, binary.BigEndian, peerip) + binary.Read(buf, binary.BigEndian, localip) + + msgsize := length - uint32(4+4+2+2+2*(sizeip)) + if msgsize < 0 { + return nil, errors.New("DecodeBGP4MP: cannot decode message with negative length") + } + msg := make([]byte, msgsize) + // Do progressive read or replace parsepacketheader with io.Reader + binary.Read(buf, binary.BigEndian, msg) + + bgptype, bgplen, err1 := messages.ParsePacketHeader(msg) + if err1 != nil { + return nil, err1 + } + pktd, err2 := messages.ParsePacket(bgptype, msg[19:19+bgplen]) + + mrt := &MrtBGP4MP_Msg_AS4{ + Timestamp: timestamp, + PeerAS: peeras, + LocalAS: localas, + IfaceIndex: ifaceindex, + PeerIP: net.IP(peerip), + LocalIP: net.IP(localip), + Message: pktd, + } + + return mrt, err2 + case SUBT_BGP4MP_STATE_CHANGE_AS4: + var peeras uint32 + var localas uint32 + var ifaceindex uint16 + var afi uint16 + binary.Read(buf, binary.BigEndian, peeras) + binary.Read(buf, binary.BigEndian, localas) + binary.Read(buf, binary.BigEndian, ifaceindex) + binary.Read(buf, binary.BigEndian, afi) + var peerip []byte + var localip []byte + sizeip := 4 + if afi == messages.AFI_IPV6 { + sizeip = 16 + } + peerip = make([]byte, sizeip) + localip = make([]byte, sizeip) + binary.Read(buf, binary.BigEndian, peerip) + binary.Read(buf, binary.BigEndian, localip) + var oldstate uint16 + var newstate uint16 + binary.Read(buf, binary.BigEndian, oldstate) + binary.Read(buf, binary.BigEndian, newstate) + + mrt := &MrtBGP4MP_StateChange_AS4{ + Timestamp: timestamp, + PeerAS: peeras, + LocalAS: localas, + IfaceIndex: ifaceindex, + PeerIP: net.IP(peerip), + LocalIP: net.IP(localip), + OldState: oldstate, + NewState: newstate, + } + + return mrt, nil + default: + return nil, errors.New(fmt.Sprintf("Decoding of subtype %v of BGP4MP not implemented", subtype)) + } + return nil, nil +} + +func DecodeNLRI(buf io.Reader, afi uint16, safi byte) (messages.NLRI, error) { + if afi != messages.AFI_IPV4 && afi != messages.AFI_IPV6 { + return nil, errors.New(fmt.Sprintf("Could not decode NLRI for Afi: %v", afi)) + } + if safi != messages.SAFI_UNICAST && safi != messages.SAFI_MULTICAST { + return nil, errors.New(fmt.Sprintf("Could not decode NLRI for Safi: %v", safi)) + } + + var l byte + binary.Read(buf, binary.BigEndian, &l) + + size := l / 8 + if l%8 != 0 { + size++ + } + b := make([]byte, size) + binary.Read(buf, binary.BigEndian, &b) + + newb := append([]byte{l}, b...) + nlri, err := messages.ParseNLRI(newb, afi, safi, false) + + if len(nlri) == 1 { + return nlri[0], err + } else { + return nil, errors.New(fmt.Sprintf("Could not decode NLRI %v (%v/%v) (number of results != 1): %v", newb, afi, safi, err)) + } +} + +func DecodeAttributes(buf io.Reader, attrlen uint16) ([]messages.BGPAttributeIf, error) { + b := make([]byte, attrlen) + binary.Read(buf, binary.BigEndian, b) + return messages.ParsePathAttribute(b, nil, false) +} + +func DecodeRibEntries(buf io.Reader) (*RibEntry, error) { + var peerindex uint16 + var origints uint32 + var attrlen uint16 + binary.Read(buf, binary.BigEndian, &peerindex) + binary.Read(buf, binary.BigEndian, &origints) + binary.Read(buf, binary.BigEndian, &attrlen) + attrs, err := DecodeAttributes(buf, attrlen) + origintsP := time.Unix(int64(origints), 0) + re := &RibEntry{ + OrigTime: origintsP, + PeerIndex: peerindex, + Attributes: attrs, + } + return re, err +} + +func DecodeBGP4TD2RIBSpec(buf io.Reader, subtype uint16, timestamp time.Time) (Mrt, error) { + var afi uint16 + var safi byte + if subtype == SUBT_TABLE_DUMPV2_RIB_IPV4_UNICAST { + afi = messages.AFI_IPV4 + safi = messages.SAFI_UNICAST + } else if subtype == SUBT_TABLE_DUMPV2_RIB_IPV6_UNICAST { + afi = messages.AFI_IPV6 + safi = messages.SAFI_UNICAST + } else if subtype == SUBT_TABLE_DUMPV2_RIB_IPV4_MULTICAST { + afi = messages.AFI_IPV4 + safi = messages.SAFI_MULTICAST + } else if subtype == SUBT_TABLE_DUMPV2_RIB_IPV6_MULTICAST { + afi = messages.AFI_IPV6 + safi = messages.SAFI_MULTICAST + } else { + return nil, errors.New("Cannot decode as Rib Afi/Safi specific") + } + + var seqnum uint32 + var preflen uint8 + var prefix []byte + binary.Read(buf, binary.BigEndian, &seqnum) + binary.Read(buf, binary.BigEndian, &preflen) + + size := preflen / 8 + if preflen%8 != 0 { + size++ + } + prefix = make([]byte, size) + binary.Read(buf, binary.BigEndian, &prefix) + + newb := append([]byte{preflen}, prefix...) + nlri, err := messages.ParseNLRI(newb, afi, safi, false) + + mrt := &MrtTableDumpV2_Rib{ + Timestamp: timestamp, + Afi: afi, + Safi: safi, + SequenceNumber: seqnum, + WriteAsAfiSafi: true, + } + + if len(nlri) == 1 { + mrt.NLRI = nlri[0] + } else { + return mrt, errors.New(fmt.Sprintf("Could not decode NLRI %v (%v/%v) (number of results != 1): %v", newb, afi, safi, err)) + } + + if err != nil { + return mrt, err + } + + var entrycount uint16 + binary.Read(buf, binary.BigEndian, &entrycount) + entries := make([]*RibEntry, entrycount) + var errentry error + for i := 0; i < int(entrycount); i++ { + var entry *RibEntry + entry, errentry = DecodeRibEntries(buf) + entries[i] = entry + } + mrt.RibEntries = entries + return mrt, errentry +} + +func DecodeBGP4TD2(buf io.Reader, timestamp time.Time, subtype uint16, length uint32) (Mrt, error) { + switch subtype { + case SUBT_TABLE_DUMPV2_PEER_INDEX_TABLE: + collid := make([]byte, 4) + var viewnamelen uint16 + var peercount uint16 + var viewname []byte + var peers []*Peer + + binary.Read(buf, binary.BigEndian, &collid) + binary.Read(buf, binary.BigEndian, &viewnamelen) + + viewname = make([]byte, viewnamelen) + + binary.Read(buf, binary.BigEndian, &viewname) + binary.Read(buf, binary.BigEndian, &peercount) + + peers = make([]*Peer, peercount) + + for i := 0; i < int(peercount); i++ { + var peertype uint8 + bgpid := make([]byte, 4) + var asn uint32 + var peerip []byte + + binary.Read(buf, binary.BigEndian, &peertype) + binary.Read(buf, binary.BigEndian, &bgpid) + + sizeip := 4 + sizeasn := 2 + if peertype&0x2 != 0 { + sizeasn = 4 + } + if peertype&0x1 != 0 { + sizeip = 16 + } + tmpasn := make([]byte, sizeasn) + peerip = make([]byte, sizeip) + + binary.Read(buf, binary.BigEndian, &peerip) + binary.Read(buf, binary.BigEndian, &tmpasn) + + if sizeasn == 2 { + asn = uint32(binary.BigEndian.Uint16(tmpasn)) + } else if sizeasn == 4 { + asn = binary.BigEndian.Uint32(tmpasn) + } + + curpeer := &Peer{ + Id: bgpid, + IP: peerip, + ASN: asn, + } + + peers[i] = curpeer + } + + mrt := &MrtTableDumpV2_PeerIndex{ + Timestamp: timestamp, + CollectorId: collid, + ViewName: string(viewname), + Peers: peers, + } + return mrt, nil + + case SUBT_TABLE_DUMPV2_RIB_GENERIC: + var seqnum uint32 + var afi uint16 + var safi byte + binary.Read(buf, binary.BigEndian, &seqnum) + binary.Read(buf, binary.BigEndian, &afi) + binary.Read(buf, binary.BigEndian, &safi) + + nlri, err := DecodeNLRI(buf, afi, safi) + + mrt := &MrtTableDumpV2_Rib{ + Timestamp: timestamp, + Afi: afi, + Safi: safi, + SequenceNumber: seqnum, + NLRI: nlri, + } + if err != nil { + return mrt, err + } + + var entrycount uint16 + binary.Read(buf, binary.BigEndian, &entrycount) + entries := make([]*RibEntry, entrycount) + var errentry error + for i := 0; i < int(entrycount); i++ { + var entry *RibEntry + entry, errentry = DecodeRibEntries(buf) + entries[i] = entry + } + mrt.RibEntries = entries + return mrt, errentry + case SUBT_TABLE_DUMPV2_RIB_IPV4_UNICAST: + return DecodeBGP4TD2RIBSpec(buf, subtype, timestamp) + case SUBT_TABLE_DUMPV2_RIB_IPV6_UNICAST: + return DecodeBGP4TD2RIBSpec(buf, subtype, timestamp) + case SUBT_TABLE_DUMPV2_RIB_IPV4_MULTICAST: + return DecodeBGP4TD2RIBSpec(buf, subtype, timestamp) + case SUBT_TABLE_DUMPV2_RIB_IPV6_MULTICAST: + return DecodeBGP4TD2RIBSpec(buf, subtype, timestamp) + default: + return nil, errors.New(fmt.Sprintf("Decoding of subtype %v of BGP4TableDumpV2 not implemented", subtype)) + } + return nil, nil +} + +func DecodeSingle(buf io.Reader) (Mrt, error) { + var timestamp uint32 + var mrttype uint16 + var mrtsubtype uint16 + var mrtlength uint32 + + binary.Read(buf, binary.BigEndian, ×tamp) + binary.Read(buf, binary.BigEndian, &mrttype) + binary.Read(buf, binary.BigEndian, &mrtsubtype) + binary.Read(buf, binary.BigEndian, &mrtlength) + + timestampP := time.Unix(int64(timestamp), 0) + + content := make([]byte, mrtlength) + binary.Read(buf, binary.BigEndian, &content) + tmpbuf := bytes.NewBuffer(content) + + var mrt Mrt + var err error + switch mrttype { + case TYPE_BGP4MP: + mrt, err = DecodeBGP4MP(tmpbuf, timestampP, mrtsubtype, mrtlength) + case TYPE_TABLE_DUMPV2: + mrt, err = DecodeBGP4TD2(tmpbuf, timestampP, mrtsubtype, mrtlength) + default: + err = errors.New(fmt.Sprintf("Decoding of type %v not implemented", mrttype)) + } + + return mrt, err +} diff --git a/rib/rib.go b/rib/rib.go new file mode 100644 index 0000000..728fa25 --- /dev/null +++ b/rib/rib.go @@ -0,0 +1,23 @@ +package rib + +import ( + "github.com/cloudflare/fgbgp/messages" + "net" + "time" +) + +type Rib interface { + Lookup(ip net.IP) (net.IPNet, *messages.BGPMessageUpdate) + LookupPrefix(prefix net.IPNet, exact bool) (net.IPNet, *messages.BGPMessageUpdate) + + UpdateRib(*messages.BGPMessageUpdate) + + GetCounts(messages.AfiSafi) int + + DumpMrt(peerid uint16, f WalkMrt, ts time.Time) + Walk(f Walk, isv6 bool) +} + +func NewRib(typeRib int) Rib { + return NewLcRib() +} diff --git a/rib/rib_lc.go b/rib/rib_lc.go new file mode 100644 index 0000000..6f10211 --- /dev/null +++ b/rib/rib_lc.go @@ -0,0 +1,350 @@ +package rib + +import ( + "github.com/cloudflare/fgbgp/messages" + "github.com/cloudflare/fgbgp/mrt" + "github.com/lspgn/lctrie" + "net" + "sync" + "time" + //"fmt" +) + +type LcRib struct { + PrefixTable *lctrie.Trie + Prefix6Table *lctrie.Trie + SyncPrefixTable *sync.RWMutex + SyncPrefix6Table *sync.RWMutex + Countv4 int + Countv6 int +} + +func NewLcRib() *LcRib { + return &LcRib{ + PrefixTable: lctrie.New(), + Prefix6Table: lctrie.New(), + SyncPrefixTable: &sync.RWMutex{}, + SyncPrefix6Table: &sync.RWMutex{}, + } +} + +type dumpWalkArgs struct { + f WalkMrt + peerid uint16 + iter uint32 + isv6 bool + afi uint16 + safi byte + curtime time.Time +} + +type Walk func(net.IPNet, *messages.BGPMessageUpdate) bool +type WalkMrt func(*mrt.MrtTableDumpV2_Rib) bool + +func (args *dumpWalkArgs) dumpWalk(prefix net.IPNet, msg *messages.BGPMessageUpdate) bool { + dump := mrt.NewMrtTableDumpV2_RibAfiSafi(args.iter, args.afi, args.safi, messages.NLRI_IPPrefix{Prefix: prefix}, args.curtime) + dump.AddEntry(args.peerid, args.curtime, msg.PathAttributes) + + var stop bool + if args.f != nil { + stop = args.f(dump) + } + + args.iter++ + + return stop +} + +func (rib *LcRib) DumpMrt(peerid uint16, f WalkMrt, ts time.Time) { + args := dumpWalkArgs{ + f: f, + peerid: peerid, + afi: messages.AFI_IPV4, + safi: messages.SAFI_UNICAST, + curtime: ts, + } + rib.Walk(args.dumpWalk, args.isv6) + + args.afi = messages.AFI_IPV6 + args.isv6 = true + + rib.Walk(args.dumpWalk, args.isv6) +} + +func (rib *LcRib) chooseTableByType(ip net.IP) (*lctrie.Trie, *sync.RWMutex) { + if ip.To4() != nil { + return rib.PrefixTable, rib.SyncPrefixTable + } else if ip.To16() != nil { + return rib.Prefix6Table, rib.SyncPrefix6Table + } else { + return nil, nil + } +} + +type walkArgs struct { + f Walk + isv6 bool +} + +func (rib *LcRib) GetCounts(afisafi messages.AfiSafi) int { + if afisafi.Afi == messages.AFI_IPV4 { + return rib.Countv4 + } else if afisafi.Afi == messages.AFI_IPV6 { + return rib.Countv6 + } + return 0 +} + +func (args *walkArgs) walkFunc(b []byte, p byte, item interface{}) bool { + prefix := ConvertBytesToPrefix(b, p, args.isv6) + + conv, ok := item.(*messages.BGPMessageUpdate) + var stop bool + if ok && args.f != nil { + stop = args.f(prefix, conv) + } + return stop +} + +func (rib *LcRib) Walk(f Walk, isv6 bool) { + args := walkArgs{ + f: f, + isv6: isv6, + } + + if !isv6 { + rib.SyncPrefixTable.RLock() + rib.PrefixTable.ExploreFromRoot(args.walkFunc) + rib.SyncPrefixTable.RUnlock() + } else { + rib.SyncPrefix6Table.RLock() + rib.Prefix6Table.ExploreFromRoot(args.walkFunc) + rib.SyncPrefix6Table.RUnlock() + } +} + +func (rib *LcRib) chooseTableByTypePrefix(prefix net.IPNet) (*lctrie.Trie, *sync.RWMutex) { + return rib.chooseTableByType(prefix.IP) +} + +func (rib *LcRib) LookupPrefix(prefix net.IPNet, exact bool) (net.IPNet, *messages.BGPMessageUpdate) { + table, sync := rib.chooseTableByTypePrefix(prefix) + + var pconv net.IPNet + if table == nil || sync == nil { + return pconv, nil + } + + b, p := ConvertPrefixToBytes(prefix) + + var bb []byte + var pp byte + var i interface{} + sync.RLock() + + var isv6 bool + if prefix.IP.To4() == nil && prefix.IP.To16() != nil { + isv6 = true + } + + if !exact { + bb, pp, i = table.Get(b, p) + pconv = ConvertBytesToPrefix(bb, pp, isv6) + } else { + i = table.GetExact(b, p) + pconv = prefix + } + sync.RUnlock() + + update, ok := i.(*messages.BGPMessageUpdate) + if ok { + return pconv, update + } + + return pconv, nil +} +func (rib *LcRib) Lookup(ip net.IP) (net.IPNet, *messages.BGPMessageUpdate) { + var mask net.IPMask + if ip.To4() != nil { + mask = net.CIDRMask(32, 32) + } else if ip.To16() != nil { + mask = net.CIDRMask(128, 128) + } + prefix := net.IPNet{ + IP: ip, + Mask: mask, + } + + return rib.LookupPrefix(prefix, false) +} + +func ConvertBytesToPrefix(b []byte, p byte, ipv6 bool) net.IPNet { + var newip []byte + var mask net.IPMask + + var size int + + if ipv6 { + size = 16 + } else { + size = 4 + } + + if p == 0 { + mask = net.CIDRMask(8*len(b), size*8) + } else { + modb := b[len(b)-1] + + b[len(b)-1] = (modb >> (8 - p)) << (8 - p) + mask = net.CIDRMask(8*(len(b)-1)+int(p), size*8) + } + + if size-len(b) > 0 { + newip = append(b, make([]byte, size-len(b))...) + } else { + newip = append(b) + } + + return net.IPNet{IP: newip, Mask: mask} +} + +func ConvertPrefixToBytes(prefix net.IPNet) ([]byte, byte) { + var ipbytes []byte + var bytescopy net.IP + bytescopy = make([]byte, len(prefix.IP)) + copy(bytescopy, prefix.IP) + if bytescopy.To4() != nil { + ipbytes = bytescopy.To4() + } else if prefix.IP.To16() != nil { + ipbytes = bytescopy.To16() + } + if ipbytes != nil { + s, _ := prefix.Mask.Size() + if s == 0 { + return []byte{}, 0 + } + cut := s / 8 + if s%8 != 0 { + cut += 1 + } + ipbytes := ipbytes[0:cut] + s %= 8 + return ipbytes, byte(s) + } + return nil, 0 +} + +func ConvertNLRIToBytes(nlri messages.NLRI) ([]byte, byte) { + ipprefix, ok := nlri.(messages.NLRI_IPPrefix) + if ok { + return ConvertPrefixToBytes(ipprefix.Prefix) + } else { + return nil, 0 + } + +} + +func RemoveFromNLRI(nlri messages.NLRI, message *messages.BGPMessageUpdate) { + newnlri := make([]messages.NLRI, 0) + for i := range message.NLRI { + if !nlri.Equals(message.NLRI[i]) { + newnlri = append(newnlri, message.NLRI[i]) + } + } + if len(newnlri) != len(message.NLRI) { + message.NLRI = newnlri + } + + newnlri = make([]messages.NLRI, 0) + for i := range message.PathAttributes { + switch pa := message.PathAttributes[i].(type) { + case messages.BGPAttribute_MP_REACH: + for j := range pa.NLRI { + if !nlri.Equals(pa.NLRI[j]) { + newnlri = append(newnlri, pa.NLRI[j]) + } + } + if len(newnlri) != len(pa.NLRI) { + pa.NLRI = newnlri + } + } + } +} + +func (rib *LcRib) addPrefix(nlri messages.NLRI, trie *lctrie.Trie, message *messages.BGPMessageUpdate, count *int) { + b, l := ConvertNLRIToBytes(nlri) + //fmt.Printf("Converted %v %v\n", b, l) + if b != nil { + + val := trie.GetExact(b, byte(l%8)) + if val != nil { + valupdate, ok := val.(*messages.BGPMessageUpdate) + if ok && valupdate != message { + RemoveFromNLRI(nlri, valupdate) + } + } else { + (*count)++ + } + trie.Insert(b, byte(l%8), message) + } +} + +func (rib *LcRib) delPrefix(nlri messages.NLRI, trie *lctrie.Trie, count *int) int { + b, l := ConvertNLRIToBytes(nlri) + //fmt.Printf("Converted %v %v\n", b, l) + if b != nil { + + val := trie.GetExact(b, byte(l%8)) + if val != nil { + valupdate, ok := val.(*messages.BGPMessageUpdate) + if ok { + RemoveFromNLRI(nlri, valupdate) + } + (*count)-- + } + return trie.Remove(b, byte(l%8)) + } + return 0 +} + +func (rib *LcRib) UpdateRib(message *messages.BGPMessageUpdate) { + if message == nil { + return + } + rib.SyncPrefixTable.Lock() + for i := range message.NLRI { + //fmt.Printf("Adding %v (%v/%v)\n", message.NLRI[i], i+1, len(message.NLRI)) + rib.addPrefix(message.NLRI[i], rib.PrefixTable, message, &rib.Countv4) + } + rib.SyncPrefixTable.Unlock() + + rib.SyncPrefixTable.Lock() + for i := range message.WithdrawnRoutes { + //fmt.Printf("Remove %v\n", message.WidthdrawnRoutes[i]) + rib.delPrefix(message.WithdrawnRoutes[i], rib.PrefixTable, &rib.Countv4) + } + message.WithdrawnRoutes = make([]messages.NLRI, 0) + rib.SyncPrefixTable.Unlock() + + for i := range message.PathAttributes { + switch pa := message.PathAttributes[i].(type) { + case messages.BGPAttribute_MP_REACH: + rib.SyncPrefix6Table.Lock() + for j := range pa.NLRI { + //fmt.Printf("Adding %v\n", pa.NLRI[j]) + rib.addPrefix(pa.NLRI[j], rib.Prefix6Table, message, &rib.Countv6) + } + rib.SyncPrefix6Table.Unlock() + + case messages.BGPAttribute_MP_UNREACH: + rib.SyncPrefix6Table.Lock() + for j := range pa.NLRI { + //fmt.Printf("Remove %v\n", pa.NLRI[j]) + rib.delPrefix(pa.NLRI[j], rib.Prefix6Table, &rib.Countv6) + } + pa.NLRI = make([]messages.NLRI, 0) + rib.SyncPrefix6Table.Unlock() + } + } + +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..ee3b39c --- /dev/null +++ b/server/server.go @@ -0,0 +1,786 @@ +package fgbgp + +import ( + "bytes" + "errors" + "fmt" + "github.com/cloudflare/fgbgp/messages" + "github.com/cloudflare/fgbgp/rib" + log "github.com/sirupsen/logrus" + "net" + "sync" + "time" +) + +const ( + STATE_IDLE = iota + STATE_CONNECT + STATE_ACTIVE + STATE_OPENSENT + STATE_OPENCONFIRM + STATE_ESTABLISHED +) + +type State struct { + CurState int + OpenReceived bool + Time time.Time +} + +type BGPEventHandler interface { + NewNeighbor(*messages.BGPMessageOpen, *Neighbor) bool + OpenSend(*messages.BGPMessageOpen, *Neighbor) bool + DisconnectedNeighbor(*Neighbor) + + //KeepAlive(*Neighbor) (bool) + Notification(*messages.BGPMessageNotification, *Neighbor) bool + + ProcessReceived(interface{}, *Neighbor) (bool, error) + ProcessSend(interface{}, *Neighbor) (bool, error) + + //ProcessEvent(ev interface{}, *Neighbor) +} + +type MPStruct struct { + Afi string + Safi string +} + +type AddPathStruct struct { + Afi string + Safi string + RxTx byte +} + +type Neighbor struct { + Addr net.IP + Port int + State *State + + Afi uint16 + Safi byte + + Connected bool + Reconnect bool + + /* + ConnectRetryCounter + ConnectRetryTimer + ConnectRetryTime + HoldTimer + HoldTime + KeepaliveTimer + KeepaliveTime + */ + + tcpconn *net.TCPConn + s *Server + qLife chan bool + qSender chan bool + update chan interface{} + + HandlerEvent BGPEventHandler + HandlerUpdate BGPUpdateHandler + //HandlerRoute RouteHandler + + // Given by peer Open Message + LastKeepAliveSent time.Time + PeerEnableKeepAlive bool + PeerHoldTime time.Duration + PeerMultiprotocolList []messages.BGPCapability_MP + PeerAddPathList []messages.AddPath + PeerASN uint32 + PeerRouteRefresh bool + Peer2Bytes bool + PeerIdentifier net.IP + + // Local configuration sent in Open + LocalLastKeepAliveRecv time.Time + LocalEnableKeepAlive bool + LocalHoldTime time.Duration + Identifier net.IP + ASN uint32 + MultiprotocolList []messages.BGPCapability_MP + AddPathList []messages.AddPath + RouteRefresh bool + + // Populated after open + SendAddPath []messages.AfiSafi + DecodeAddPath []messages.AfiSafi + + // General configuration + ReplicateASN bool // Sends the same ASN as received + Passive bool + RemoveOnDisconnect bool + + OutQueue chan messages.SerializableInterface + + Rib rib.Rib +} + +type Manager struct { + Neighbors []*Neighbor + neighborlock *sync.RWMutex + + Servers []*Server + serverlock *sync.RWMutex + q chan bool + + HandlerEvent BGPEventHandler + HandlerUpdate BGPUpdateHandler + + ASN uint32 + Identifier net.IP + AddPath bool + HoldTime int + RouteRefresh bool + + MemPool *sync.Pool +} + +type Server struct { + Addr net.IP + Port int + Manager *Manager + + inconn *net.TCPListener + laddr *net.TCPAddr +} + +func (n *Neighbor) RefreshAll() error { + err := n.Refresh(messages.AfiSafi{Afi: messages.AFI_IPV4, Safi: messages.SAFI_UNICAST}) + if err != nil { + return err + } + err = n.Refresh(messages.AfiSafi{Afi: messages.AFI_IPV6, Safi: messages.SAFI_UNICAST}) + return err +} + +func (n *Neighbor) Refresh(afisafi messages.AfiSafi) error { + if n.PeerRouteRefresh { + log.Infof("%v: Refreshing routes for %v", n.String(), afisafi.String()) + rr := messages.BGPMessageRouteRefresh{AfiSafi: afisafi} + n.OutQueue <- rr + return nil + } else { + // Should reset session? + return errors.New(fmt.Sprintf("%v: No route refresh capability.", n.String())) + } +} + +func (n *Neighbor) GetLocalAddress() (net.IP, int) { + //return n.s.Addr, n.s.Port + laddr := n.tcpconn.LocalAddr() + if laddr != nil { + tcpaddr, _ := net.ResolveTCPAddr(laddr.Network(), laddr.String()) + if tcpaddr != nil { + return tcpaddr.IP, tcpaddr.Port + } + } + return nil, 0 +} + +func (n *Neighbor) Connect() error { + log.Infof("%v: Connecting", n.String()) + tcpaddr := net.TCPAddr{ + IP: n.Addr, + Port: n.Port, + } + var errd error + n.tcpconn, errd = net.DialTCP("tcp", nil, &tcpaddr) + if errd != nil { + return errd + } + n.Connected = true + return nil +} + +func (n *Neighbor) Disconnect() { + log.Infof("%v: Disconnected", n.String()) + n.Connected = false + n.State.OpenReceived = false + n.tcpconn.Close() + n.UpdateState(STATE_IDLE) + + if n.HandlerEvent != nil { + n.HandlerEvent.DisconnectedNeighbor(n) + } + + if n.RemoveOnDisconnect && n.s != nil && n.s.Manager != nil { + log.Infof("%v: Removing from manager", n.String()) + select { + case n.qLife <- true: + default: + } + select { + case n.qSender <- true: + default: + } + + n.s.Manager.RemoveNeighbor(n) + } +} + +func (n *Neighbor) String() string { + return fmt.Sprintf("%v:%v/%v", n.Addr.String(), n.Port, n.PeerASN) +} + +func (n *Neighbor) SendRoute( + afisafi messages.AfiSafi, + nlri []messages.NLRI, + withdraw []messages.NLRI, + nextHop net.IP, + communities []uint32, + aspath []uint32, + med uint32, + locpref uint32) { + update := n.CraftUpdate(afisafi, nlri, withdraw, nextHop, communities, aspath, med, locpref) + n.OutQueue <- update +} + +func (n *Neighbor) CraftUpdate( + afisafi messages.AfiSafi, + nlri []messages.NLRI, + withdraw []messages.NLRI, + nextHop net.IP, + communities []uint32, + aspath []uint32, + med uint32, + locpref uint32) *messages.BGPMessageUpdate { + update := &messages.BGPMessageUpdate{} + + return update +} + +func (n *Neighbor) UpdateState(newstate int) { + n.State.CurState = newstate + n.State.Time = time.Now().UTC() +} + +func CompareAddPath(local []messages.AddPath, remote []messages.AddPath) ([]messages.AfiSafi, []messages.AfiSafi) { + recv := make([]messages.AfiSafi, 0) + send := make([]messages.AfiSafi, 0) + for i := range local { + for j := range remote { + if local[i].EqualsAfiSafi(remote[j]) { + if (local[i].TxRx | remote[j].TxRx) == 3 { + if remote[i].TxRx&2 != 0 && local[i].TxRx&1 != 0 { + recv = append(recv, messages.AfiSafi{local[i].Afi, local[i].Safi}) + } + if remote[i].TxRx&1 != 0 && local[i].TxRx&2 != 0 { + send = append(send, messages.AfiSafi{local[i].Afi, local[i].Safi}) + } + } + break + } + } + } + return send, recv +} + +func (n *Neighbor) UpdateFromOpen(pkt *messages.BGPMessageOpen) { + n.PeerASN = uint32(pkt.ASN) + n.PeerIdentifier = pkt.Identifier + n.Peer2Bytes = true + for i := range pkt.Parameters { + if pkt.Parameters[i].Type == messages.PARAMETER_CAPA { + capas := pkt.Parameters[i].Data.(messages.BGPCapabilities) + for c := range capas.BGPCapabilities { + switch ct := capas.BGPCapabilities[c].(type) { + case messages.BGPCapability_MP: + n.PeerMultiprotocolList = append(n.MultiprotocolList, ct) + case messages.BGPCapability_ADDPATH: + n.PeerAddPathList = ct.AddPathList + n.SendAddPath, n.DecodeAddPath = CompareAddPath(n.AddPathList, n.PeerAddPathList) + log.Debugf("%v: Add-path: Send on %v Afi-Safi / Receive on %v Afi-Safi", n.String(), len(n.SendAddPath), len(n.DecodeAddPath)) + case messages.BGPCapability_ASN: + n.Peer2Bytes = false + n.PeerASN = ct.ASN + case messages.BGPCapability_ROUTEREFRESH: + n.PeerRouteRefresh = true + } + } + } + } +} + +func (n *Neighbor) EvolveState(pkt interface{}) { + switch pktt := pkt.(type) { + case *messages.BGPMessageOpen: + log.Info(pktt.String()) + + if pktt.HoldTime != 0 { + n.PeerHoldTime = time.Duration(time.Duration(int(pktt.HoldTime)) * time.Second) + n.PeerEnableKeepAlive = true + } + + if n.ReplicateASN { + n.ASN = n.PeerASN + } + + n.UpdateFromOpen(pktt) + n.State.OpenReceived = true + + if n.HandlerEvent != nil { + ret := n.HandlerEvent.NewNeighbor(pktt, n) + if !ret { + log.Infof("%v: handler forced disconnect.", n.String()) + n.Disconnect() + } + } + + case *messages.BGPMessageKeepAlive: + if n.State.CurState == STATE_OPENCONFIRM { + log.Debugf("%v: OpenConfirm -> Established", n.String()) + n.UpdateState(STATE_ESTABLISHED) + } + case *messages.BGPMessageNotification: + log.Errorf("%v: Received notification: %v", n.String(), pktt) + if n.HandlerEvent != nil { + n.HandlerEvent.Notification(pktt, n) + } + n.Disconnect() + } + + if pkt != nil { + n.LocalLastKeepAliveRecv = time.Now().UTC() + } + + switch n.State.CurState { + case STATE_IDLE: + // check timers + if !n.Passive { + // Change nil for when binding to specific IP or IP+port + err := n.Connect() + if err == nil { + log.Debugf("%v: Idle -> Active", n.String()) + n.UpdateState(STATE_ACTIVE) + } else { + log.Errorf("%v: Error connecting: %v", n.String(), err) + } + } + case STATE_ACTIVE: + if !n.Passive || n.State.OpenReceived { + var ht uint16 + if n.LocalEnableKeepAlive { + ht = uint16(n.LocalHoldTime / time.Second) + } + + open := messages.CraftOpenMessage(n.ASN, ht, n.Identifier.To4(), n.MultiprotocolList, n.AddPathList, n.RouteRefresh) + log.Debugf("%v: Active -> OpenSent", n.String()) + n.OutQueue <- open + + ka := messages.CraftKeepAliveMessage() + n.OutQueue <- ka + + n.UpdateState(STATE_OPENSENT) + } + + case STATE_OPENSENT: + if n.State.OpenReceived { + log.Debugf("%v: OpenSent -> OpenConfirm", n.String()) + n.UpdateState(STATE_OPENCONFIRM) + } + + case STATE_ESTABLISHED: + // Check timers send + if n.PeerEnableKeepAlive && n.LastKeepAliveSent.Add((n.PeerHoldTime/time.Second)/3).Before(time.Now().UTC()) { + ka := messages.BGPMessageKeepAlive{} + n.OutQueue <- ka + log.Debugf("Established / KeepAlive") + n.LastKeepAliveSent = time.Now().UTC() + } + case STATE_OPENCONFIRM: + // Check timers send + if n.PeerEnableKeepAlive && n.LastKeepAliveSent.Add((n.PeerHoldTime/time.Second)/3).Before(time.Now().UTC()) { + ka := messages.BGPMessageKeepAlive{} + n.OutQueue <- ka + //log.Debugf("OpenConfirm / KeepAlive") + n.LastKeepAliveSent = time.Now().UTC() + } + } + + if n.State.CurState != STATE_IDLE && n.State.CurState != STATE_ACTIVE && n.LocalEnableKeepAlive && n.LocalLastKeepAliveRecv.Add(n.LocalHoldTime).Before(time.Now().UTC()) { + // Craft error hold time message + log.Errorf("%v: no keep-alive received. Disconnecting.", n.String()) + n.Disconnect() + } + +} + +func (n *Neighbor) NeighborLifeRoutine() { + for { + select { + case msg := <-n.update: + n.EvolveState(msg) + case <-time.After(time.Duration(1 * time.Second)): + n.EvolveState(nil) + case <-n.qLife: + log.Infof("%v: NeighborLifeRoutine stopped", n.String()) + return + } + } +} + +func (n *Neighbor) SenderRoutine() { + for { + select { + case msg := <-n.OutQueue: + buf := bytes.NewBuffer([]byte{}) + msg.Write(buf) + _, err := n.tcpconn.Write(buf.Bytes()) + + if err != nil { + log.Errorf("%v: error sender %v", n.String(), err) + n.Disconnect() + } + case <-n.qSender: + log.Infof("%v: SenderRoutine stopped", n.String()) + return + } + } +} + +func ReadFromSocket(tcpconn *net.TCPConn, msg []byte) error { + var err error + var i int + var read int + for read < len(msg) && err == nil { + tmpmsg := make([]byte, len(msg)-read) + i, err = tcpconn.Read(tmpmsg) + /*if i < len(msg) { + log.Debugf("Read following from %v/%v: %v/%v/%v length", tcpconn.RemoteAddr(), tcpconn.LocalAddr(), read, i, len(msg)) + }*/ + copy(msg[read:read+i], tmpmsg[:]) + read += i + } + if err != nil { + return err + } + return nil +} + +func (n *Neighbor) NeighborReceiveRoutine() { + for { + if n.Connected { + msg := make([]byte, 19) + err := ReadFromSocket(n.tcpconn, msg) + + var toread uint16 + var bgptype byte + if err == nil { + bgptype, toread, err = messages.ParsePacketHeader(msg) + //log.Debugf("Received from %v/%v: %v bytes", n.tcpconn.RemoteAddr(), n.tcpconn.LocalAddr(), toread) + + if toread > 0 { + msg = make([]byte, toread) + err = ReadFromSocket(n.tcpconn, msg) + } + } + + if err != nil { + // Socket might not be clean enough so even if the connection is redone, this error can be raised + log.Errorf("NeighborReceiveRoutine: %v", err) + n.Disconnect() + continue + } + + if bgptype != messages.MESSAGE_NOTIFICATION { + n.LocalLastKeepAliveRecv = time.Now().UTC() + } + + if bgptype != messages.MESSAGE_UPDATE { + var p interface{} + p, err = messages.ParsePacket(bgptype, msg) + if err != nil { + log.Error(err) + n.Disconnect() + continue + } + + if n.HandlerEvent != nil { + var continueskip bool + continueskip, err = n.HandlerEvent.ProcessReceived(p, n) + if !continueskip || err != nil { + continue + } + } + + select { + case n.update <- p: + // unsure about the non-blocking function->we may loose a packet + // Better a queue and this as a refresh indication (for a loop inside the EvolveState for all the unprocessed packets) + default: + } + + } else { + if n.HandlerUpdate != nil { + n.HandlerUpdate.ProcessUpdate(msg, n) + } + } + + /*switch pkt := p.(type) { + case *BGPMessageUpdate: + // save + log.Debugf("UPDATE %v", pkt) + default: + select { + case n.update<-p: + // unsure about the non-blocking function->we may loose a packet + // Better a queue and this as a refresh indication (for a loop inside the EvolveState for all the unprocessed packets) + default: + } + }*/ + + } else { + select { + case <-time.After(time.Duration(1 * time.Millisecond)): + if !n.Connected { + //log.Errorf("%v: Neighbor not connected", n.String()) + if !n.Reconnect { + //n.Remove() + log.Infof("%v: NeighborReceiveRoutine stopped", n.String()) + return + } + } + } + } + } +} + +func (n *Neighbor) Start() { + go n.NeighborReceiveRoutine() + go n.NeighborLifeRoutine() + go n.SenderRoutine() +} + +func (s *Server) ProcessIncomingRequest(tcpconn *net.TCPConn) { + log.Debugf("Creating new neighbor from incoming connection: %v", tcpconn.RemoteAddr().String()) + n := NewNeighborFromConn(tcpconn, s.Manager.Identifier, s.Manager.ASN, s.Manager.AddPath, s.Manager.HoldTime, s.Manager.RouteRefresh) + n.HandlerEvent = s.Manager.HandlerEvent + n.HandlerUpdate = s.Manager.HandlerUpdate + n.Connected = true + n.RemoveOnDisconnect = true + n.s = s + n.Start() + s.Manager.AddNeighbor(n) +} + +func (s *Server) ServerRoutine() { + var errcreate error + s.inconn, errcreate = net.ListenTCP("tcp", s.laddr) + if errcreate != nil { + log.Fatal(errcreate) + } + + for { + tcpconn, err := s.inconn.AcceptTCP() + if err != nil { + log.Error(err) + } else { + go s.ProcessIncomingRequest(tcpconn) + } + } +} + +func NewNeighborFromConn(tcpconn *net.TCPConn, identifier net.IP, asn uint32, addpath bool, holdtime int, routerefresh bool) *Neighbor { + addr := tcpconn.RemoteAddr() + tcpaddr, _ := net.ResolveTCPAddr("tcp", addr.String()) + n := NewNeighbor(tcpaddr.IP, tcpaddr.Port, identifier, asn, addpath, holdtime, routerefresh) + n.Passive = true + n.tcpconn = tcpconn + n.UpdateState(STATE_ACTIVE) + n.Reconnect = false + return n +} + +func NewNeighbor(addr net.IP, port int, identifier net.IP, asn uint32, addpath bool, holdtime int, routerefresh bool) *Neighbor { + n := &Neighbor{ + Addr: addr, + Port: port, + State: &State{}, + qLife: make(chan bool), + qSender: make(chan bool), + update: make(chan interface{}, 5), + Identifier: identifier, + ASN: asn, + MultiprotocolList: []messages.BGPCapability_MP{ + messages.BGPCapability_MP{messages.AFI_IPV4, messages.SAFI_UNICAST}, + messages.BGPCapability_MP{messages.AFI_IPV6, messages.SAFI_UNICAST}}, + PeerMultiprotocolList: make([]messages.BGPCapability_MP, 0), + PeerAddPathList: make([]messages.AddPath, 0), + OutQueue: make(chan messages.SerializableInterface, 1000), + Rib: rib.NewLcRib(), + Reconnect: true, + RouteRefresh: routerefresh, + } + + if holdtime >= 3 { + n.LocalHoldTime = time.Duration(time.Duration(holdtime) * time.Second) + n.LocalEnableKeepAlive = true + } + + if addpath { + n.AddPathList = []messages.AddPath{ + messages.AddPath{messages.AFI_IPV4, messages.SAFI_UNICAST, 3}, + messages.AddPath{messages.AFI_IPV6, messages.SAFI_UNICAST, 3}} + } + + n.Afi = messages.AFI_IPV4 + n.Safi = messages.SAFI_UNICAST + if addr.To4() == nil { + n.Afi = messages.AFI_IPV6 + } + + return n +} + +func NewManager(asn uint32, identifier net.IP, addpath bool, routerefresh bool) *Manager { + m := &Manager{ + Neighbors: make([]*Neighbor, 0), + neighborlock: &sync.RWMutex{}, + + Servers: make([]*Server, 0), + serverlock: &sync.RWMutex{}, + + ASN: asn, + Identifier: identifier, + + AddPath: addpath, + HoldTime: 90, + + RouteRefresh: routerefresh, + + q: make(chan bool), + + MemPool: &sync.Pool{}, + } + return m +} + +func (m *Manager) RemoveNeighbor(n *Neighbor) { + log.Debugf("Removing neighbor %v", n.String()) + newlist := make([]*Neighbor, 0) + m.neighborlock.Lock() + for i := range m.Neighbors { + if m.Neighbors[i] != n { + newlist = append(newlist, m.Neighbors[i]) + } else { + m.MemPool.Put(n) + log.Debugf("Putting %v into sync pool", n) + } + } + + m.Neighbors = newlist + m.neighborlock.Unlock() +} + +func (m *Manager) AddNeighbor(n *Neighbor) { + m.neighborlock.Lock() + m.Neighbors = append(m.Neighbors, n) + m.neighborlock.Unlock() +} + +func (m *Manager) NewServer(addr string) error { + tcpaddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return err + } + s := &Server{ + Addr: tcpaddr.IP, + Port: tcpaddr.Port, + laddr: tcpaddr, + Manager: m, + } + m.serverlock.Lock() + m.Servers = append(m.Servers, s) + m.serverlock.Unlock() + return nil +} + +func (m *Manager) GetNeighbors() []*Neighbor { + m.neighborlock.RLock() + neighlist := make([]*Neighbor, len(m.Neighbors)) + for i := range m.Neighbors { + neighlist[i] = m.Neighbors[i] + } + m.neighborlock.RUnlock() + return neighlist +} + +func (m *Manager) ApplyUpdateHandlerToNeighbors() { + uh := m.HandlerUpdate + m.neighborlock.RLock() + for i := range m.Neighbors { + m.Neighbors[i].HandlerUpdate = uh + } + m.neighborlock.RUnlock() +} + +func (m *Manager) SetEventHandler(eh BGPEventHandler) { + m.HandlerEvent = eh + m.neighborlock.RLock() + for i := range m.Neighbors { + m.Neighbors[i].HandlerEvent = eh + } + m.neighborlock.RUnlock() +} + +func (m *Manager) UseDefaultUpdateHandler(workers int) { + if m.HandlerUpdate != nil { + m.HandlerUpdate.Close() + } + + uh := m.CreateDefaultUpdateHandler(workers) + + m.HandlerUpdate = uh + m.ApplyUpdateHandlerToNeighbors() +} + +// Set a more defined update handler (after processing and adding it to the Neighbor RIB). +// Must be using default event handler +func (m *Manager) SetUpdateEventHandler(eh BGPUpdateEventHandler) error { + if m.HandlerUpdate != nil { + assert, ok := m.HandlerUpdate.(*DefaultBGPUpdateHandler) + if !ok { + return errors.New("SetUpdateEventHandler: HandlerUpdate must be a Default Handler") + } else { + assert.SetUpdateEventHandler(eh) + return nil + } + } else { + return errors.New("SetUpdateEventHandler: HandlerUpdate is nil, please instanciate to Default Handler") + } +} + +func (m *Manager) Start() { + m.StartServers() + m.neighborlock.RLock() + for i := range m.Neighbors { + m.Neighbors[i].Start() + } + m.neighborlock.RUnlock() + for { + select { + case <-m.q: + // Cut connections + break + break + } + } +} +func (m *Manager) Stop() { + m.q <- true +} + +func (m *Manager) StartServers() { + m.serverlock.RLock() + for i := range m.Servers { + m.Servers[i].Start() + } + m.serverlock.RUnlock() +} + +func (s *Server) Start() { + go s.ServerRoutine() +} diff --git a/server/update.go b/server/update.go new file mode 100644 index 0000000..4942b8e --- /dev/null +++ b/server/update.go @@ -0,0 +1,178 @@ +package fgbgp + +import ( + "errors" + "fmt" + "github.com/cloudflare/fgbgp/messages" + log "github.com/sirupsen/logrus" + "sync/atomic" +) + +type BGPUpdateHandler interface { + ProcessUpdate([]byte, *Neighbor) + Close() +} + +type BGPUpdateEventHandler interface { + ProcessUpdateEvent(*messages.BGPMessageUpdate, *Neighbor) (add bool) +} + +type DefaultBGPUpdateHandler struct { + Manager *Manager + WorkerPool *Pool + + UpdateMsgCount uint64 + + RibPerNeighbor map[int]interface{} + + UpdateEventHandler BGPUpdateEventHandler +} + +type DefaultMessageUpdate struct { + Msg []byte + Neighbor *Neighbor +} + +func (n *Neighbor) UpdateRib(m *messages.BGPMessageUpdate) { + n.Rib.UpdateRib(m) +} + +func (uh *DefaultBGPUpdateHandler) ProcessUpdate(msg []byte, n *Neighbor) { + msgdispatch := &DefaultMessageUpdate{ + Msg: msg, + Neighbor: n, + } + + uh.WorkerPool.Dispatch(msgdispatch) +} + +func (uh *DefaultBGPUpdateHandler) Close() { + uh.WorkerPool.Stop() +} + +func (uh *DefaultBGPUpdateHandler) Process(id int, msg interface{}) error { + atomic.AddUint64(&(uh.UpdateMsgCount), 1) + + msgt := msg.(*DefaultMessageUpdate) + v, err := messages.ParseUpdate(msgt.Msg, msgt.Neighbor.DecodeAddPath, msgt.Neighbor.Peer2Bytes) + + if v == nil { + return errors.New(fmt.Sprintf("Null update: %v", err)) + } + + add := true + if uh.UpdateEventHandler != nil { + add = uh.UpdateEventHandler.ProcessUpdateEvent(v, msgt.Neighbor) + } + + if add { + msgt.Neighbor.UpdateRib(v) + } + + if err != nil { + return err + } + + return nil +} + +func (uh *DefaultBGPUpdateHandler) Error(id int, msg interface{}, err error) { + log.Errorf("DefaultBGPUpdateHandler: %v", err) +} + +func (uh *DefaultBGPUpdateHandler) SetUpdateEventHandler(eh BGPUpdateEventHandler) { + uh.UpdateEventHandler = eh +} + +func (m *Manager) CreateDefaultUpdateHandler(workers int) *DefaultBGPUpdateHandler { + uh := &DefaultBGPUpdateHandler{ + Manager: m, + } + + pool := CreatePool(workers, uh) + uh.WorkerPool = pool + + pool.Start() + + return uh +} + +/* +func GenerateUpdate(info *rib.BGPPathInformation, addpathlist []messages.AfiSafi) *messages.BGPMessageUpdate { + afisafi := messages.AfiSafi{ + Afi: messages.AFI_IPV4, + Safi: messages.SAFI_UNICAST, + } + + info.SyncNLRI.RLock() + nlri := make([]messages.NLRI, len(info.NLRI)) + copy(nlri, info.NLRI) + info.SyncNLRI.RUnlock() + if len(info.NLRI) == 0 { + return nil + } else { + ip := nlri[0] + if ip.Prefix.IP.To4() == nil { + afisafi = messages.AfiSafi{ + Afi: messages.AFI_IPV6, + Safi: messages.SAFI_UNICAST, + } + } + } + + addpath := messages.InAfiSafi(afisafi.Afi, afisafi.Safi, addpathlist) + + m := &messages.BGPMessageUpdate{} + + pa := []messages.BGPAttributeIf{ + messages.BGPAttribute_ORIGIN{ + Origin: info.Origin, + }, + } + + if len(info.ASPath) > 0 { + pa = append(pa, messages.BGPAttribute_ASPATH{ + SType: 2, + ASPath: info.ASPath,}) + } + if len(info.Communities) > 0 { + pa = append(pa, messages.BGPAttribute_COMMUNITIES{ + Communities: info.Communities,}) + } + if info.Med != 0 { + pa = append(pa, messages.BGPAttribute_MED{ + Med: info.Med,}) + } + if info.LocPref != 0 { + pa = append(pa, messages.BGPAttribute_LOCPREF{ + LocPref: info.LocPref,}) + } + + if afisafi.Afi == messages.AFI_IPV4 { + if info.NextHop != nil { + pa = append(pa, messages.BGPAttribute_NEXTHOP{ + NextHop: info.NextHop.To4(),}) + } + m.NLRI = nlri + + if addpath { + m.EnableAddPath = true + } + + } else if afisafi.Afi == messages.AFI_IPV6 { + attr := messages.BGPAttribute_MP_REACH{ + Afi: messages.AFI_IPV6, + Safi: messages.SAFI_UNICAST, + NextHop: info.NextHop, + NLRI: nlri, + } + if addpath { + attr.EnableAddPath = true + } + pa = append(pa, attr) + } + + m.PathAttributes = pa + return m +} +*/ diff --git a/server/workerpool.go b/server/workerpool.go new file mode 100644 index 0000000..aebe63e --- /dev/null +++ b/server/workerpool.go @@ -0,0 +1,88 @@ +package fgbgp + +import ( + log "github.com/sirupsen/logrus" +) + +type Handler interface { + Process(id int, msg interface{}) error + Error(id int, msg interface{}, err error) +} + +type Pool struct { + Workers []*Worker + Handler Handler + inchan chan chan interface{} +} + +type Worker struct { + Id int + Handler Handler + inchan chan chan interface{} + inmsg chan interface{} + q chan bool +} + +func CreatePool(nworkers int, h Handler) *Pool { + p := &Pool{ + Workers: make([]*Worker, nworkers), + //inchan: make(chan chan interface{}, nworkers), + inchan: make(chan chan interface{}), + } + for i := 0; i < nworkers; i++ { + w := CreateWorker(i, h, p.inchan) + p.Workers[i] = w + } + return p +} + +func CreateWorker(id int, h Handler, inchan chan chan interface{}) *Worker { + return &Worker{ + Id: id, + Handler: h, + inchan: inchan, + inmsg: make(chan interface{}), + q: make(chan bool), + } +} + +func (p *Pool) Start() { + for i := range p.Workers { + go p.Workers[i].Start() + } +} +func (p *Pool) Stop() { + for i := range p.Workers { + go p.Workers[i].Stop() + } +} + +func (w *Worker) Start() { + for { + w.inchan <- w.inmsg + select { + case msg := <-w.inmsg: + if w.Handler != nil { + err := w.Handler.Process(w.Id, msg) + if err != nil { + log.Error(err) + w.Handler.Error(w.Id, msg, err) + } + } + case <-w.q: + log.Infof("Stopping worker %v", w.Id) + break + break + } + } +} + +func (w *Worker) Stop() { + log.Infof("Stopping worker %v", w.Id) + w.q <- true +} + +func (p *Pool) Dispatch(msg interface{}) { + inmsg := <-p.inchan + inmsg <- msg +}