type DiscretWork interface { DoWork() (bool, error) GetProgress() interface{} BeforeRun() error AfterStop() error }
func (mw *MonitoredWorker) wgoroute() { log.Println("info: work start", mw.GetId()) mw.wgrun.Add(1) defer func() { log.Print("info: release work guid ", mw.GetId()) mw.wgrun.Done() }() for { select { case newState := <-mw.chsig: if newState == Stopped { mw.state = newState log.Println("info: work stopped") return } default: { isdone, err := mw.Itw.DoWork() if err != nil { log.Println("error: guid", mw.guid, " work failed", err) mw.state = Failed return } if isdone { mw.state = Completed log.Println("info: work done") return } } } } } func (mw *MonitoredWorker) Start() error { mw.lc.Lock() defer mw.lc.Unlock() if mw.state == Completed { return errors.New("error: try run completed job") } if mw.state == Running { return errors.New("error: try run runing job") } if err := mw.Itw.BeforeRun(); err != nil { mw.state = Failed return err } mw.chsig = make(chan int, 1) mw.state = Running go mw.wgoroute() return nil }
select { case newState := <-mw.chsig: if newState == Stopped { mw.state = newState log.Println("info: work stopped") return }
package dtest import ( "errors" "fmt" "godownloader/monitor" "log" "math/rand" "testing" "time" ) type TestWorkPool struct { From, id, To int32 } func (tw TestWorkPool) GetProgress() interface{} { return tw.From } func (tw *TestWorkPool) BeforeRun() error { log.Println("info: exec before run") return nil } func (tw *TestWorkPool) AfterStop() error { log.Println("info: after stop") return nil } func (tw *TestWorkPool) DoWork() (bool, error) { time.Sleep(time.Millisecond * 300) tw.From += 1 log.Print(tw.From) if tw.From == tw.To { fmt.Println("done") return true, nil } if tw.From > tw.To { return false, errors.New("tw.From > tw.To") } return false, nil } func TestWorkerPool(t *testing.T) { wp := monitor.WorkerPool{} for i := 0; i < 20; i++ { mw := &monitor.MonitoredWorker{Itw: &TestWorkPool{From: 0, To: 20, id: rand.Int31()}} wp.AppendWork(mw) } wp.StartAll() time.Sleep(time.Second) log.Println("------------------Work Started------------------") log.Println(wp.GetAllProgress()) log.Println("------------------Get All Progress--------------") time.Sleep(time.Second) wp.StopAll() log.Println("------------------Work Stop-------------------") time.Sleep(time.Second) wp.StartAll() time.Sleep(time.Second * 5) wp.StopAll() wp.StartAll() wp.StopAll() }
GET /PinegrowLinux64.2.2.zip HTTP/1.1 Host: pinegrow.s3.amazonaws.com User-Agent: Go-http-client/1.1 Range: bytes=34010904-42513630
func (pd *PartialDownloader) BeforeDownload() error { //create new req r, err := http.NewRequest("GET", pd.url, nil) if err != nil { return err } r.Header.Add("Range", "bytes="+strconv.FormatInt(pd.dp.Pos, 10)+"-"+strconv.FormatInt(pd.dp.To, 10)) f,_:=iotools.CreateSafeFile("test") r.Write(f) f.Close() resp, err := pd.client.Do(r) if err != nil { log.Printf("error: error download part file%v \n", err) return err } //check response if resp.StatusCode != 206 { log.Printf("error: file not found or moved status:", resp.StatusCode) return errors.New("error: file not found or moved") } pd.req = *resp return nil } …. func (pd *PartialDownloader) DownloadSergment() (bool, error) { //write flush data to disk buffer := make([]byte, FlushDiskSize, FlushDiskSize) count, err := pd.req.Body.Read(buffer) if (err != nil) && (err.Error() != "EOF") { pd.req.Body.Close() pd.file.Sync() return true, err } //log.Printf("returned from server %v bytes", count) if pd.dp.Pos+int64(count) > pd.dp.To { count = int(pd.dp.To - pd.dp.Pos) log.Printf("warning: server return to much for me i give only %v bytes", count) } realc, err := pd.file.WriteAt(buffer[:count], pd.dp.Pos) if err != nil { pd.file.Sync() pd.req.Body.Close() return true, err } pd.dp.Pos = pd.dp.Pos + int64(realc) pd.messureSpeed(realc) //log.Printf("writed %v pos %v to %v", realc, pd.dp.Pos, pd.dp.To) if pd.dp.Pos == pd.dp.To { //ok download part complete normal pd.file.Sync() pd.req.Body.Close() pd.dp.Speed = 0 log.Printf("info: download complete normal") return true, nil } //not full download next segment return false, nil }
func TestDownload(t *testing.T) { dl, err := httpclient.CreateDownloader("http://pinegrow.s3.amazonaws.com/PinegrowLinux64.2.2.zip", "PinegrowLinux64.2.2.zip", 7) if err != nil { t.Error("failed: can't create downloader") } errs := dl.StartAll() if len(errs)>0 { t.Error("failed: can't start downloader") } …..wait for finish download }
function UpdateTable() { $("#jqGrid") .jqGrid({ url: 'http://localhost:9981/progress.json', mtype: "GET", ajaxSubgridOptions: { async: false }, styleUI: 'Bootstrap', datatype: "json", colModel: [{ label: '#', name: 'Id', key: true, width: 5 }, ….. { label: 'Speed', name: 'Speed', width: 15, formatter: FormatByte }, { label: 'Progress', name: 'Progress', formatter: FormatProgressBar }], viewrecords: true, rowNum: 20, pager: "#jqGridPager" }); }
c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c func() { gdownsrv.StopAllTask() log.Println("info: save setting ", gdownsrv.SaveSettings(getSetPath())) }() os.Exit(1) }()
Source: https://habr.com/ru/post/267943/
All Articles