2017-06-27 3 views
0

Ich möchte einen "Crawler" mit n Arbeiter implementieren, wo jeder Arbeiter zusätzliche Jobs hinzufügen kann. Das Programm sollte aufhören, wenn keine Jobs mehr verfügbar sind und alle Arbeiter ihre Arbeit beendet haben.Synchronisieren Sie Arbeiter für rekursive Crawl

Ich habe den folgenden Code (Sie damit bei https://play.golang.org/p/_j22p_OfYv spielen können):

package main 

import (
    "fmt" 
    "sync" 
) 

func main() { 
    pathChan := make(chan string) 
    fileChan := make(chan string) 
    workers := 3 
    var wg sync.WaitGroup 

    paths := map[string][]string{ 
     "/":  {"/test", "/foo", "a", "b"}, 
     "/test": {"aa", "bb", "cc"}, 
     "/foo": {"/bar", "bbb", "ccc"}, 
     "/bar": {"aaaa", "bbbb", "cccc"}, 
    } 

    for i := 0; i < workers; i++ { 
     wg.Add(1) 
     go func() { 
      for { 
       path, ok := <-pathChan 
       if !ok { 
        break 
       } 

       for _, f := range paths[path] { 
        if f[0] == '/' { 
         pathChan <- f 
        } else { 
         fileChan <- f 
        } 
       } 
      } 

      wg.Done() 
     }() 
    } 

    pathChan <- "/" 

    for { 
     filePath, ok := <-fileChan 
     if !ok { 
      break 
     } 

     fmt.Println(filePath) 
    } 

    wg.Wait() 
    close(pathChan) 
} 

Leider ist dies endet in einem Deadlock. Wo genau ist das Problem? Was ist die beste Vorgehensweise, um solche Funktionen zu schreiben? Sind Kanäle die richtige Funktion?

EDIT:

ich meinen Code aktualisiert haben zwei Wartegruppen zu verwenden, eine für die Arbeitsplätze und eine für die Arbeiter (siehe https://play.golang.org/p/bueUJzMhqj):

package main 

import (
    "fmt" 
    "sync" 
) 

func main() { 
    pathChan := make(chan string) 
    fileChan := make(chan string) 
    jobs := new(sync.WaitGroup) 
    workers := new(sync.WaitGroup) 
    nworkers := 2 

    paths := map[string][]string{ 
     "/":  {"/test", "/foo", "a", "b"}, 
     "/test": {"aa", "bb", "cc"}, 
     "/foo": {"/bar", "bbb", "ccc"}, 
     "/bar": {"aaaa", "bbbb", "cccc"}, 
    } 

    for i := 0; i < nworkers; i++ { 
     workers.Add(1) 
     go func() { 
      defer workers.Done() 
      for { 
       path, ok := <-pathChan 
       if !ok { 
        break 
       } 

       for _, f := range paths[path] { 
        if f[0] == '/' { 
         jobs.Add(1) 
         pathChan <- f 
        } else { 
         fileChan <- f 
        } 
       } 

       jobs.Done() 
      } 

     }() 
    } 

    jobs.Add(1) 
    pathChan <- "/" 

    go func() { 
     jobs.Wait() 
     close(pathChan) 
     workers.Wait() 
     close(fileChan) 
    }() 

    for { 
     filePath, ok := <-fileChan 
     if !ok { 
      break 
     } 

     fmt.Println(filePath) 
    } 

} 

Diese in der Tat scheint zu funktionieren, aber offensichtlich wird immer noch ein Deadlock passieren, wenn nworkers auf 1 gesetzt ist, weil der einzelne Arbeiter ewig warten wird, wenn er etwas zum Kanal pathChan hinzufügt. Um dieses Problem zu lösen, kann der Kanalpuffer erhöht werden (z. B. pathChan := make(chan string, 2)), aber dies funktioniert nur, solange zwei Puffer nicht vollständig voll sind. Natürlich könnte die Puffergröße auf eine große Zahl gesetzt werden, sagen wir 10000, aber der Code könnte immer noch einen Deadlock treffen. Darüber hinaus scheint dies für mich keine saubere Lösung zu sein.

Hier erkannte ich, dass es einfacher wäre, eine Art Queue anstelle eines Kanals zu verwenden, wo Elemente ohne Blockierung hinzugefügt und entfernt werden können und die Größe der Queue nicht festgelegt ist. Existieren solche Warteschlangen in der Go-Standardbibliothek?

+0

Wenn Sie eine * neue * Frage haben, ** eine neue Frage stellen **, bearbeiten Sie nicht nur Ihre Frage. Ihre neue Lösung wird weiterhin blockiert, wenn mehr Stammverzeichnisse als Worker vorhanden sind.Die einzige threadsichere Warteschlange in Go ist der Kanal, soweit ich weiß, obwohl Sie Ihre eigenen mit Slices und Mutexes schreiben könnten. Warum interessiert es dich, wie viele Arbeiter es gibt? – Adrian

Antwort

0

Wenn Sie auf eine beliebige Anzahl von Arbeitern warten möchten, enthält die Standardbibliothek sync.WaitGroup für genau diesen Zweck.

Es gibt andere Concurrency Probleme auch:

  • Sie sind Kanalverschluss-Signalisierung, aber Sie haben mehrere goroutines auf demselben Kanal senden. Dies ist im Allgemeinen eine schlechte Übung: Da jede Routine nie wissen kann, wann die anderen Routinen mit dem Kanal fertig sind, können Sie den Kanal niemals korrekt schließen.
  • Das Schließen eines Kanals wartet auf den anderen zuerst geschlossen zu sein, aber es wird nie geschlossen, so dass es blockiert.
  • Der einzige Grund, warum es nicht sofort Deadlock ist Ihr Beispiel passiert, mehr Arbeiter als Verzeichnisse unter "/" zu haben. Fügen Sie zwei weitere Verzeichnisse unter "/" hinzu und es werden sofort Deadlocks ausgeführt.

Es gibt einige Lösungen:

  • den Arbeiter Pool Dump und nur ein goroutine für jedes Unterverzeichnis drehen, und lassen Sie die Scheduler Sorgen um den Rest: https://play.golang.org/p/ck2DkNFnyF
  • Verwenden eines Arbeiter pro Stammebene Verzeichnis, und jeder Worker verarbeitet sein Verzeichnis rekursiv, anstatt Unterverzeichnisse, die es findet, in einem Kanal anzulegen.
+0

Ich habe meinen Code aktualisiert, um 'sync.WaitGroup' zu verwenden. Leider endet der Code immer noch in einem Dead-Lock. – watain

+0

Ich fügte auch einige weitere Überprüfungen und die fehlende for-Schleife innerhalb des Arbeiters hinzu. – watain

+0

In Ihrer Ergebnisschleife warten Sie, dass 'fileChan' geschlossen wird, aber Sie schließen es nie (und können es nicht, weil es unter mehreren goroutines geteilt wird, von denen keiner weiß, wann es nicht mehr benötigt wird). Es wird dort sitzen und auf immer mehr Ergebnisse warten. Da dies passiert, bevor Sie 'pathChan' schließen, blockieren die Goroutines auch das Warten auf diesen Kanal für immer. – Adrian