89 lines
1.5 KiB
Go
89 lines
1.5 KiB
Go
package fgbgp
|
|
|
|
import (
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type Handler interface {
|
|
Process(id int, msg interface{}) error
|
|
Error(id int, msg interface{}, err error)
|
|
}
|
|
|
|
type Pool struct {
|
|
Workers []*Worker
|
|
Handler Handler
|
|
inchan chan chan interface{}
|
|
}
|
|
|
|
type Worker struct {
|
|
Id int
|
|
Handler Handler
|
|
inchan chan chan interface{}
|
|
inmsg chan interface{}
|
|
q chan bool
|
|
}
|
|
|
|
func CreatePool(nworkers int, h Handler) *Pool {
|
|
p := &Pool{
|
|
Workers: make([]*Worker, nworkers),
|
|
//inchan: make(chan chan interface{}, nworkers),
|
|
inchan: make(chan chan interface{}),
|
|
}
|
|
for i := 0; i < nworkers; i++ {
|
|
w := CreateWorker(i, h, p.inchan)
|
|
p.Workers[i] = w
|
|
}
|
|
return p
|
|
}
|
|
|
|
func CreateWorker(id int, h Handler, inchan chan chan interface{}) *Worker {
|
|
return &Worker{
|
|
Id: id,
|
|
Handler: h,
|
|
inchan: inchan,
|
|
inmsg: make(chan interface{}),
|
|
q: make(chan bool),
|
|
}
|
|
}
|
|
|
|
func (p *Pool) Start() {
|
|
for i := range p.Workers {
|
|
go p.Workers[i].Start()
|
|
}
|
|
}
|
|
func (p *Pool) Stop() {
|
|
for i := range p.Workers {
|
|
go p.Workers[i].Stop()
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Start() {
|
|
for {
|
|
w.inchan <- w.inmsg
|
|
select {
|
|
case msg := <-w.inmsg:
|
|
if w.Handler != nil {
|
|
err := w.Handler.Process(w.Id, msg)
|
|
if err != nil {
|
|
log.Error(err)
|
|
w.Handler.Error(w.Id, msg, err)
|
|
}
|
|
}
|
|
case <-w.q:
|
|
log.Infof("Stopping worker %v", w.Id)
|
|
break
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Stop() {
|
|
log.Infof("Stopping worker %v", w.Id)
|
|
w.q <- true
|
|
}
|
|
|
|
func (p *Pool) Dispatch(msg interface{}) {
|
|
inmsg := <-p.inchan
|
|
inmsg <- msg
|
|
}
|