type JobDispatcher interface { Dispatch(interface{}) (interface{}, error) } type ErrDispatcher interface { DispatchError(FaJob) error } type CompDispatcher interface { DispatchSuccess(CompJob) error }
srv.jbBal.Init(&srv.dDisp, srv, srv) // type JobBallancer struct { jChan chan interface{} // acJob map[string]Job // slJob map[string]Job // errDisp ErrDispatcher // error jobDisp JobDispatcher // compDisp CompDispatcher // JbDone sync.WaitGroup // aJobC int // () } // func (jbal *JobBallancer) Init(jdis JobDispatcher, cmd CompDispatcher, erd ErrDispatcher) { jbal.errDisp = erd jbal.jobDisp = jdis jbal.compDisp = cmd jbal.acJob = make(map[string]Job) jbal.slJob = make(map[string]Job) jbal.aJobC = 10 jbal.jChan = make(chan interface{}) go jbal.takeJob() // // log.Println("info: job ballancer inited") } . , .. , takeJob . func (jbal *JobBallancer) PushJob(jdat interface{}) error { if jbal.checkInit() { return errors.New("error: JobChan is not inited") } uid := genUid() job := Job{JobId: uid, Data: jdat} jbal.jChan <- job return nil } func (jbal *JobBallancer) takeJob() { for { // recivedTask := <-jbal.jChan log.Println("info: job taken") switch job := recivedTask.(type) { case TermJob: // log.Println("info: recive terminate dispatch singal") return case Job: // ( ) if len(jbal.acJob) < jbal.aJobC { jbal.JbDone.Add(1) jbal.addActiveJob(job) go jbal.startJob(job) log.Println("info: normal dispatch") } else { jbal.addSleepJob(job) jbal.JbDone.Add(1) log.Println("info: attend maximum active job") } case CompJob: // if err := jbal.compDisp.DispatchSuccess(job); err != nil { log.Println("error: failed dispatch success" + job.Job.JobId) } // jbal.removeJob(job.Job.JobId) jbal.JbDone.Done() jbal.resumeJobs() case FaJob: // if err := jbal.errDisp.DispatchError(job); err != nil { log.Println("error: failed dispatch error" + job.Job.JobId) } // jbal.removeJob(job.Job.JobId) jbal.JbDone.Done() jbal.resumeJobs() default: log.Fatalln("error: unknown job type") jbal.JbDone.Done() } } } // func (jbal *JobBallancer) removeJob(jid string) error { if _, isFind := jbal.acJob[jid]; isFind { delete(jbal.acJob, jid) } else { return errors.New("error: can't remove job because job with id not found") } return nil } // , , func (jbal *JobBallancer) TerminateTakeJob() error { if jbal.checkInit() { return errors.New("error: is not inited") } jbal.JbDone.Wait() jbal.jChan <- TermJob{} close(jbal.jChan) if len(jbal.acJob) > 0 { return errors.New("error: list job is not empty") } log.Println("info: greacefully terminate take job") return nil }
testJobDispatcher := TestJobDispatcher{} testErrorDispatcher := TestErrorDispatcher{} testSuccessDispatcher := TestCompletedDispatcher{} jobBallancer := JobBallancer{} jobBallancer.Init(&testJobDispatcher, &testSuccessDispatcher, &testErrorDispatcher) for i := 0; i < 40; i++ { jobBallancer.PushJob("data: " + strconv.Itoa(i)) } jobBallancer.TerminateTakeJob()
Source: https://habr.com/ru/post/254581/
All Articles