[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH RFC 28/59] controller: Handle worker early death



From: George Dunlap <george.dunlap@xxxxxxxxxx>

Log raw worker output.  In the event of an unexpected worker death,
dump the output and stop further processing.

Also fix an assert that caused workers to die if the timer was too
exact.

Signed-off-by: George Dunlap <george.dunlap@xxxxxxxxxx>
---
 benchmark.go     |  7 -------
 processworker.go | 18 ++++++++++++++++--
 run.go           | 24 ++++++++++++++++++------
 xenworker.go     | 21 +++++++++++++++++++--
 4 files changed, 53 insertions(+), 17 deletions(-)

diff --git a/benchmark.go b/benchmark.go
index 5e35997..de1f650 100644
--- a/benchmark.go
+++ b/benchmark.go
@@ -74,13 +74,6 @@ type WorkerSet struct {
        Count int
 }
 
-type Worker interface {
-       SetId(WorkerId)
-       Init(WorkerParams, WorkerConfig) error
-       Shutdown()
-       Process(chan WorkerReport, chan bool)
-}
-
 const (
        USEC = 1000
        MSEC = USEC * 1000
diff --git a/processworker.go b/processworker.go
index f517321..999e76a 100644
--- a/processworker.go
+++ b/processworker.go
@@ -32,6 +32,7 @@ type ProcessWorker struct {
        c *exec.Cmd
        stdout io.ReadCloser
        jsonStarted bool
+       Log []string
 }
 
 func (w *ProcessWorker) SetId(i WorkerId) {
@@ -54,7 +55,19 @@ func (w *ProcessWorker) Shutdown() {
        w.c.Process.Kill()
 }
 
-func (w *ProcessWorker) Process(report chan WorkerReport, done chan bool) {
+func (w *ProcessWorker) DumpLog(f io.Writer) (err error) {
+       b := bufio.NewWriter(f)
+       defer b.Flush()
+       for _, line := range w.Log {
+               _, err = fmt.Println(b, line)
+               if err != nil {
+                       return
+               }
+       }
+       return
+}
+
+func (w *ProcessWorker) Process(report chan WorkerReport, done chan WorkerId) {
        w.c.Start()
 
        scanner := bufio.NewScanner(w.stdout)
@@ -63,6 +76,7 @@ func (w *ProcessWorker) Process(report chan WorkerReport, 
done chan bool) {
                s := scanner.Text()
                
                //fmt.Println("Got these bytes: ", s);
+               w.Log = append(w.Log, s)
 
                if w.jsonStarted {
                        var r WorkerReport
@@ -77,7 +91,7 @@ func (w *ProcessWorker) Process(report chan WorkerReport, 
done chan bool) {
                }
        }
 
-       done <- true
+       done <- w.id
 
        w.c.Wait()
 }
diff --git a/run.go b/run.go
index ed1957b..1b39730 100644
--- a/run.go
+++ b/run.go
@@ -26,6 +26,7 @@ import (
        "regexp"
        "strconv"
        "bufio"
+       "io"
 )
 
 type WorkerState struct {
@@ -33,6 +34,14 @@ type WorkerState struct {
        LastReport WorkerReport
 }
 
+type Worker interface {
+       SetId(WorkerId)
+       Init(WorkerParams, WorkerConfig) error
+       Shutdown()
+       Process(chan WorkerReport, chan WorkerId)
+       DumpLog(io.Writer) error
+}
+
 func Report(ws *WorkerState, r WorkerReport) {
        //fmt.Println(r)
 
@@ -57,7 +66,7 @@ func Report(ws *WorkerState, r WorkerReport) {
 
 type WorkerList map[WorkerId]*WorkerState
 
-func (ws *WorkerList) Start(report chan WorkerReport, done chan bool) (i int) {
+func (ws *WorkerList) Start(report chan WorkerReport, done chan WorkerId) (i 
int) {
        i = 0
        for j := range *ws {
                go (*ws)[j].w.Process(report, done)
@@ -160,7 +169,7 @@ func (run *BenchmarkRun) Run() (err error) {
        }
        
        report := make(chan WorkerReport)
-       done := make(chan bool)
+       done := make(chan WorkerId)
        signals := make(chan os.Signal, 1)
 
        signal.Notify(signals, os.Interrupt)
@@ -179,12 +188,16 @@ func (run *BenchmarkRun) Run() (err error) {
                                run.Results.Raw = append(run.Results.Raw, r)
                                Report(Workers[r.Id], r)
                        }
-               case <-done:
+               case did := <-done:
                        if ! stopped {
-                               fmt.Println("WARNING: Worker left early")
+                               fmt.Println("WARNING: Worker", did, "left 
early, shutting down workers")
+                               Workers.Stop()
+                               stopped = true
+                               err = fmt.Errorf("Worker %v exited early", did)
+                               Workers[did].w.DumpLog(os.Stdout)
                        }
                        i--;
-                       fmt.Println(i, "workers left");
+                       fmt.Printf("Worker %v exited; %d workers left\n", did, 
i);
                case <-timeout:
                        if ! stopped {
                                Workers.Stop()
@@ -201,7 +214,6 @@ func (run *BenchmarkRun) Run() (err error) {
                                }
                                err = fmt.Errorf("Interrupted")
                        } else {
-                               err = fmt.Errorf("Interrupted")
                                fmt.Println("SIGINT received after stop, 
exiting without cleaning up")
                                return
                        }
diff --git a/xenworker.go b/xenworker.go
index e98c970..45e0876 100644
--- a/xenworker.go
+++ b/xenworker.go
@@ -42,6 +42,7 @@ type XenWorker struct {
        consoleCmd *exec.Cmd
        console io.ReadCloser
        jsonStarted bool
+       Log []string
 }
 
 // We have to capitalize the element names so that the json class can
@@ -231,8 +232,22 @@ func (w *XenWorker) Shutdown() {
        }
 }
 
+func (w *XenWorker) DumpLog(f io.Writer) (err error) {
+       b := bufio.NewWriter(f)
+       defer b.Flush()
+       for _, line := range w.Log {
+               _, err = fmt.Fprintln(b, line)
+               if err != nil {
+                       return
+               }
+       }
+       return
+}
+
+
+
 // FIXME: Return an error
-func (w *XenWorker) Process(report chan WorkerReport, done chan bool) {
+func (w *XenWorker) Process(report chan WorkerReport, done chan WorkerId) {
        // // xl unpause [vmname]
        err := xg.Ctx.DomainUnpause(Domid(w.domid))
        if err != nil {
@@ -244,6 +259,8 @@ func (w *XenWorker) Process(report chan WorkerReport, done 
chan bool) {
 
        for scanner.Scan() {
                s := scanner.Text()
+
+               w.Log = append(w.Log, s)
                
                //fmt.Println("Got these bytes: ", s);
 
@@ -265,7 +282,7 @@ func (w *XenWorker) Process(report chan WorkerReport, done 
chan bool) {
                }
        }
 
-       done <- true
+       done <- w.id
 
        w.consoleCmd.Wait()
 }
-- 
2.7.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
https://lists.xen.org/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.