Initial
This commit is contained in:
88
server/workerpool.go
Normal file
88
server/workerpool.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package fgbgp
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
Process(id int, msg interface{}) error
|
||||
Error(id int, msg interface{}, err error)
|
||||
}
|
||||
|
||||
type Pool struct {
|
||||
Workers []*Worker
|
||||
Handler Handler
|
||||
inchan chan chan interface{}
|
||||
}
|
||||
|
||||
type Worker struct {
|
||||
Id int
|
||||
Handler Handler
|
||||
inchan chan chan interface{}
|
||||
inmsg chan interface{}
|
||||
q chan bool
|
||||
}
|
||||
|
||||
func CreatePool(nworkers int, h Handler) *Pool {
|
||||
p := &Pool{
|
||||
Workers: make([]*Worker, nworkers),
|
||||
//inchan: make(chan chan interface{}, nworkers),
|
||||
inchan: make(chan chan interface{}),
|
||||
}
|
||||
for i := 0; i < nworkers; i++ {
|
||||
w := CreateWorker(i, h, p.inchan)
|
||||
p.Workers[i] = w
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func CreateWorker(id int, h Handler, inchan chan chan interface{}) *Worker {
|
||||
return &Worker{
|
||||
Id: id,
|
||||
Handler: h,
|
||||
inchan: inchan,
|
||||
inmsg: make(chan interface{}),
|
||||
q: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) Start() {
|
||||
for i := range p.Workers {
|
||||
go p.Workers[i].Start()
|
||||
}
|
||||
}
|
||||
func (p *Pool) Stop() {
|
||||
for i := range p.Workers {
|
||||
go p.Workers[i].Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Start() {
|
||||
for {
|
||||
w.inchan <- w.inmsg
|
||||
select {
|
||||
case msg := <-w.inmsg:
|
||||
if w.Handler != nil {
|
||||
err := w.Handler.Process(w.Id, msg)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
w.Handler.Error(w.Id, msg, err)
|
||||
}
|
||||
}
|
||||
case <-w.q:
|
||||
log.Infof("Stopping worker %v", w.Id)
|
||||
break
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Stop() {
|
||||
log.Infof("Stopping worker %v", w.Id)
|
||||
w.q <- true
|
||||
}
|
||||
|
||||
func (p *Pool) Dispatch(msg interface{}) {
|
||||
inmsg := <-p.inchan
|
||||
inmsg <- msg
|
||||
}
|
||||
Reference in New Issue
Block a user