Easy worker setup for your code
Easy worker setup for your code.
Have some draught and let this repo manage your load using go routines.
Checkout NSQ repo for msg queuing - drift
go get github.com/mayur-tolexo/sworker/draught
sworker draught is fast enough as compared to the other worker pools available.
Basic print job using two workers.
//print : function which worker will call to execute
func print(ctx context.Context, value ...interface{}) error {
fmt.Println(value)
return nil
}
func main() {
pool := draught.NewSimplePool(n) //new job pool created
pool.AddWorker(2, print, true) //adding 2 workers
for i := 0; i < 100; i++ {
pool.AddJob(i) //adding jobs
}
pool.Close() //closing the job pool
}
//print : function which worker will call to execute
func print(ctx context.Context, value ...interface{}) error {
fmt.Println(value)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return nil
}
type logger struct{}
//Print will implement draught.Logger interface
func (logger) Print(pool *draught.Pool, value []interface{}, err error) {
fmt.Println(value, err)
}
//main function
func main() {
rand.Seed(time.Now().UnixNano())
handler := print //handler function which the go routine will call
n := 10 //no of jobs
pool := draught.NewPool(n, "", logger{}) //new job pool created
pool.SetTag("PRINTER") //added tag to the pool
pool.SetBatchProfiler(5) //added profiler batch size
pool.SetConsoleLog(true) //enable console log
pool.AddWorker(2, handler, true) //adding 2 workers
for i := 0; i < n; i++ {
pool.AddJob(i) //adding jobs
}
pool.Close() //closing the job pool
pool.Stats() //pool stats
}
//Handler function which will be called by the go routine
type Handler func(context.Context, ...interface{}) error
Here print is a handler function.
Define your own handler and pass it to the workers and you are ready to go.
Each job contains it's own recovery.
If any panic occured while processing the job
then that panic will be logged and worker will continue doing next job.
There are two ways to set logger in the pool.
type Logger struct{}
//Implementing Logger interface
func (l Logger)Print(pool *Pool, value []interface{}, err error){
}
// While creating the pool
NewPool(size int, tag string, Logger{})
// Using SetLogger() method after pool creation
pool.SetLogger(Logger{})
Console log will enable pool error and close notification logging in console
pool.GetErrorPool()
This will return a channel of workerJob which contains error occured and job value.
pool.SetMaxRetry(2)
To set maximum number of retires to be done if error occured while processing the job.
Default is 0. Retry won't work if panic occured while processing the job.
pool.SetRetryExponent(2)
If error occured then that job will be delayed exponentially.
Default exponent base is 10.
pool.Stats()
Pool's complete status.
pool.TotalCount()
Total job processed by the pool workers.
pool.SuccessCount()
Successfully processed jobs count
pool.ErrorCount()
Error count while processing job
pool.RetryCount()
Retry count while processing job
pool.WorkerCount()
No of Worker added on pool
for i := 1; i <= n; i++ {
go pool.AddJob(i)
}
You can add job in go routines as it is thread safe.
pool.AddWorker(2, handler, true)
You can add worker in go routines as well.
pool.SetTag("PRINTER")
To uniquely identify the pool logs.
pool.SetBatchProfiler(1000)
To log pool status after every specified batch of jobs complition.
pool.SetTimeProfiler(500 * time.Millisecond)
To log pool status after every specified time.
If the pool worker got stuck at same process for more than
thrice of the profiler time then it will log the worker current status.
PRINTER: Processed:3 jobs(total:10 success:1 error:2 retry:2) in 0.00179143 SEC
pool.SetConsoleLog(true)
Console log will enable pool error and close notification logging in console