Files
fgbgp/server/workerpool.go
2018-03-23 18:59:10 +01:00

89 lines
1.5 KiB
Go

package fgbgp
import (
log "github.com/sirupsen/logrus"
)
type Handler interface {
Process(id int, msg interface{}) error
Error(id int, msg interface{}, err error)
}
type Pool struct {
Workers []*Worker
Handler Handler
inchan chan chan interface{}
}
type Worker struct {
Id int
Handler Handler
inchan chan chan interface{}
inmsg chan interface{}
q chan bool
}
func CreatePool(nworkers int, h Handler) *Pool {
p := &Pool{
Workers: make([]*Worker, nworkers),
//inchan: make(chan chan interface{}, nworkers),
inchan: make(chan chan interface{}),
}
for i := 0; i < nworkers; i++ {
w := CreateWorker(i, h, p.inchan)
p.Workers[i] = w
}
return p
}
func CreateWorker(id int, h Handler, inchan chan chan interface{}) *Worker {
return &Worker{
Id: id,
Handler: h,
inchan: inchan,
inmsg: make(chan interface{}),
q: make(chan bool),
}
}
func (p *Pool) Start() {
for i := range p.Workers {
go p.Workers[i].Start()
}
}
func (p *Pool) Stop() {
for i := range p.Workers {
go p.Workers[i].Stop()
}
}
func (w *Worker) Start() {
for {
w.inchan <- w.inmsg
select {
case msg := <-w.inmsg:
if w.Handler != nil {
err := w.Handler.Process(w.Id, msg)
if err != nil {
log.Error(err)
w.Handler.Error(w.Id, msg, err)
}
}
case <-w.q:
log.Infof("Stopping worker %v", w.Id)
break
break
}
}
}
func (w *Worker) Stop() {
log.Infof("Stopping worker %v", w.Id)
w.q <- true
}
func (p *Pool) Dispatch(msg interface{}) {
inmsg := <-p.inchan
inmsg <- msg
}