2017-02-23 3 views
1

Wir haben eine Reihe von Dateien nach der Verarbeitung zum Remote-Blob-Speicher hochgeladen werden.Go Queue-Verarbeitung mit Wiederholung bei Fehler

Momentan erstellt das Frontend (PHP) eine Redis-Liste solcher Dateien und gibt ihr eine eindeutige ID namens JobID. Es übergibt dann die eindeutige ID an eine Beanstalk-Röhre, die von einem Go-Prozess empfangen wird. Es verwendet eine Bibliothek namens Go workers, um jede Job-ID in der Art und Weise zu verarbeiten, die net/http tut. Er empfängt die Job-ID, ruft die Redis-Liste ab und beginnt mit der Verarbeitung von Dateien.

Derzeit wird jedoch jeweils nur eine Datei verarbeitet. Da die Operation hier I/O-gebunden ist, nicht CPU-gebunden, schlägt die Intuition vor, dass es vorteilhaft wäre, eine Goroutine pro Datei zu verwenden.

Wir möchten jedoch das Hochladen bei Fehler wiederholen sowie die Anzahl der pro Auftrag verarbeiteten Elemente nachverfolgen. Wir können keine ungebundene Anzahl von Goroutines starten, da ein einzelner Job ca. 10k Dateien zur Verarbeitung enthalten kann und 100s solcher Jobs können zu Spitzenzeiten pro Sekunde gesendet werden. Was wäre der richtige Ansatz dafür?

NB: Wir können die Technologie ändern etwas stapeln, wenn nötig (wie Auslagern beanstalkd für etwas)

Antwort

2

Sie die Anzahl der goroutines unter Verwendung einer gepufferten chan mit einer Größe der maximalen Anzahl von goroutines begrenzen Sie wollen. Sie können dann diese chan blockieren, wenn die maximale Kapazität erreicht ist. Wenn deine Goroutinen fertig sind, werden sie Plätze frei machen, damit neue Goroutinen laufen können.

Beispiel:

package main 

import (
    "fmt" 
    "sync" 
) 

var (
    concurrent = 5 
    semaphoreChan = make(chan struct{}, concurrent) 
) 

func doWork(wg *sync.WaitGroup, item int) { 
    // block while full 
    semaphoreChan <- struct{}{} 

    go func() { 
     defer func() { 
      // read to release a slot 
      <-semaphoreChan 
      wg.Done() 
     }() 
     // This is where your work actually gets done 
     fmt.Println(item) 
    }() 
} 

func main() { 
    // we need this for the example so that we can block until all goroutines finish 
    var wg sync.WaitGroup 
    wg.Add(10) 

    // start the work 
    for i := 0; i < 10; i++ { 
     doWork(&wg, i) 
    } 

    // block until all work is done 
    wg.Wait() 
} 

Go Spielplatz Link: https://play.golang.org/p/jDMYuCe7HV

Inspiriert von dieser Golang UK Conference sprechen: https://youtu.be/yeetIgNeIkc?t=1413

+0

Es half mir auf die Einschränkung der Gleichzeitigkeit zu beginnen. Das Problem, das jetzt noch besteht, ist jedoch, wie man den Erfolg oder Misserfolg eines Jobs im Auge behält. Ein Job enthält N Unteraufgaben, die alle erfolgreich verarbeitet werden müssen, andernfalls müssen Fehler gemeldet werden. Wie gehe ich damit um? – agathver

+0

Erstellen Sie einen Kanal, den Sie an die Goroutine übergeben. Die Goroutine kann das Ergebnis der Operation einschließlich Fehlern in diesen Kanal schreiben. Der Anrufer kann die Informationen aus diesem Kanal abrufen, um den Fehler nach Bedarf zu behandeln (z. B. den Fehler protokollieren oder die Operation wiederholen). Wenn Sie die Operation wiederholen müssen, erstellen Sie den Kanal mit einem benutzerdefinierten Strukturtyp mit dem erforderlichen Kontext für den erneuten Versuch (z. B. die Eingabe, die die goroutine für einen erneuten Versuch benötigt) und einem Fehler. – MahlerFive