2012-06-18 7 views
14

Was ich tun möchte, ist eine Reihe von Produzenten Goroutines (von denen einige möglicherweise oder nicht abgeschlossen) und eine Verbraucher Routine. Das Problem ist mit diesem Vorbehalt in Klammern - wir kennen nicht die Gesamtzahl, die eine Antwort zurückgibt.Was ist das beste Idiom für Produzenten/Konsumenten in Go?

Also, was ich will, ist, dies zu tun:

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int) { 
    // May or may not produce. 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
} 

func main() { 
    c := make(chan int, 10) 
    for i := 0; i < 10; i++ { 
    go producer(c, signal) 
    } 

    // If we include a close, then that's WRONG. Chan will be closed 
    // but a producer will try to write to it. Runtime error. 
    close(c) 

    // If we don't close, then that's WRONG. All goroutines will 
    // deadlock, since the range keyword will look for a close. 
    for num := range c { 
    fmt.Printf("Producer produced: %d\n", num) 
    } 
    fmt.Println("All done.") 
} 

So ist die Frage, ob ich es falsch ist zu schließen, wenn ich nicht in der Nähe zu tun - es ist immer noch falsch (Kommentare im Code sehen).

Nun wäre die Lösung ein Out-of-Band-Signalkanal sein, dass alle Produzenten zu schreiben:

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int, signal chan bool) { 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
    signal <- true 
} 

func main() { 
    c := make(chan int, 10) 
    signal := make(chan bool, 10) 
    for i := 0; i < 10; i++ { 
    go producer(c, signal) 
    } 

    // This is basically a 'join'. 
    num_done := 0 
    for num_done < 10 { 
    <- signal 
    num_done++ 
    } 
    close(c) 

    for num := range c { 
    fmt.Printf("Producer produced: %d\n", num) 
    } 
    fmt.Println("All done.") 
} 

Und das völlig tut, was ich will! Aber mir scheint es wie ein Bissen. Meine Frage ist: Gibt es einen Idiom/Trick, der es mir leichter macht, etwas Ähnliches zu tun?

hatte ich einen Blick hier: http://golang.org/doc/codewalk/sharemem/ Und es scheint, wie die complete chan (zu Beginn der main initialisiert) in einem Bereich verwendet wird, aber nie geschlossen. Ich verstehe nicht wie.

Wenn jemand irgendwelche Einsichten hat, würde ich es sehr schätzen. Prost!


Edit: fls0815 hat die Antwort und beantwortet auch die Frage, wie die enge losen Kanalbereich arbeitet.

Mein Code oben zu arbeiten modifed (done vor fls0815 freundlicherweise Code):

package main 

import (
    "fmt" 
    "math/rand" 
    "sync" 
) 

var wg_prod sync.WaitGroup 
var wg_cons sync.WaitGroup 

func producer(c chan int) { 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
    wg_prod.Done() 
} 

func main() { 
    c := make(chan int, 10) 
    wg_prod.Add(10) 
    for i := 0; i < 10; i++ { 
    go producer(c) 
    } 

    wg_cons.Add(1) 
    go func() { 
    for num := range c { 
     fmt.Printf("Producer produced: %d\n", num) 
    } 
    wg_cons.Done() 
    }() 

    wg_prod.Wait() 
    close(c) 
    wg_cons.Wait() 
    fmt.Println("All done.") 
} 

Antwort

14

Nur Hersteller sollten Kanäle schließen. Sie können Ihr Ziel erreichen, indem Sie Verbraucher aufrufen, die nach dem Start Ihrer Produzenten über den resultierenden Kanal iterieren (range). In Ihrem Haupt-Thread warten Sie (siehe sync.WaitGroup) bis Ihre Konsumenten/Produzenten ihre Arbeit beendet haben. Nachdem die Produzenten fertig sind, schließen Sie den resultierenden Kanal, der Ihre Kunden zum Beenden zwingt (range wird beendet, wenn die Kanäle geschlossen sind und kein gepufferter Gegenstand übrig ist).

Beispielcode:

package main 

import (
    "log" 
    "sync" 
    "time" 
    "math/rand" 
    "runtime" 
) 

func consumer() { 
    defer consumer_wg.Done() 

    for item := range resultingChannel { 
     log.Println("Consumed:", item) 
    } 
} 

func producer() { 
    defer producer_wg.Done() 

    success := rand.Float32() > 0.5 
    if success { 
     resultingChannel <- rand.Int() 
    } 
} 

var resultingChannel = make(chan int) 
var producer_wg sync.WaitGroup 
var consumer_wg sync.WaitGroup 

func main() { 
    rand.Seed(time.Now().Unix()) 

    for c := 0; c < runtime.NumCPU(); c++ { 
     producer_wg.Add(1) 
     go producer() 
    } 

    for c := 0; c < runtime.NumCPU(); c++ { 
     consumer_wg.Add(1) 
     go consumer() 
    } 

    producer_wg.Wait() 

    close(resultingChannel) 

    consumer_wg.Wait() 
} 

Der Grund, warum ich legte den close -Anweisung in die Hauptfunktion ist es, weil wir mehr als ein Produzent haben. Das Schließen des Kanals in einem Producer in dem obigen Beispiel würde zu dem Problem führen, das Sie bereits kennengelernt haben (Schreiben auf geschlossenen Kanälen; der Grund ist, dass ein Produzent übrig bleiben könnte, der immer noch Daten produziert). Kanäle sollten nur geschlossen werden, wenn kein Produzent mehr übrig ist (daher mein Vorschlag, den Kanal nur vom Produzenten zu schließen). So werden Kanäle in Go erstellt. Here finden Sie weitere Informationen zum Schließen von Kanälen.


zum ShareMem Beispiel Verwandte: AFAICS dieses Beispiel endlos läuft durch die Ressourcen wieder neu zu Warteschlangen und wieder (aus schwebend -> komplett -> anhängig -> Komplette ... und so weiter). Dies ist die Iteration am Ende der Hauptfunktion. Er empfängt die abgeschlossenen Ressourcen und reiht sie mithilfe von Resource.Sleep() in den Status "Ausstehend" ein. Wenn keine abgeschlossene Ressource vorhanden ist, wartet und blockiert sie, dass neue Ressourcen abgeschlossen werden. Daher müssen die Kanäle nicht geschlossen werden, da sie ständig verwendet werden.

+0

Hallo, danke für deine Antwort - es ist in der Tat, was ich suchte. Könnten Sie Ihren Vorschlag, "Nur Produzenten sollten Kanäle schließen", erweitern? - Es klingt wie eine Common-Sense/Code-macht-Sinn-Regel, aber ich frage mich, ob es auch einen technischen Grund gab (da das Codebeispiel, das du auflistet, die Hauptfunktion hat, den Kanal zu schließen). Danke noch einmal! – Will

+1

Ich habe weitere Informationen hinzugefügt. HTH. – fls0815

+0

Ahhh, das macht Sinn. Ich dachte, dass es vielleicht eine harte Regel sein könnte - in der jeder Produzent prüfen müsste, ob es erlaubt ist, den Kanal zu schließen (daher schließt der letzte, der ihn beendet). Das ist offensichtlich viel unordentlicher (mit mehr unnötigen Überprüfungen), als es einfach in main() in unseren Beispielen zu schließen, aber ich war besorgt, dass es die Art und Weise war, Dinge zu tun (aus irgendeinem Grund war mir das nicht bewusst). Ich versuche immer noch, ein Gefühl für Stil Best Practices zu bekommen. – Will

0

Es gibt immer viele Möglichkeiten, diese Probleme zu lösen. Hier ist eine Lösung, die die einfachen synchronen Kanäle verwendet, die in Go grundlegend sind. Keine gepufferten Kanäle, keine schließenden Kanäle, keine WaitGroups.

Es ist wirklich nicht so weit von Ihrem "Mund voll" -Lösung, und - Entschuldigung zu enttäuschen - nicht viel kleiner. Es bringt den Verbraucher in seine eigene Goroutine, so dass der Verbraucher Zahlen konsumieren kann, wie der Produzent sie produziert. Es macht auch den Unterschied, dass ein "Versuch" der Produktion in entweder Erfolg oder Misserfolg enden kann. Wenn die Produktion fehlschlägt, wird der Versuch sofort ausgeführt. Wenn es erfolgreich ist, wird der Versuch nicht durchgeführt, bis die Nummer verbraucht ist.

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int, fail chan bool) { 
    if success := rand.Float32() > 0.5; success { 
     c <- rand.Int() 
    } else { 
     fail <- true 
    } 
} 

func consumer(c chan int, success chan bool) { 
    for { 
     num := <-c 
     fmt.Printf("Producer produced: %d\n", num) 
     success <- true 
    } 
} 

func main() { 
    const nTries = 10 
    c := make(chan int) 
    done := make(chan bool) 
    for i := 0; i < nTries; i++ { 
     go producer(c, done) 
    } 
    go consumer(c, done) 

    for i := 0; i < nTries; i++ { 
     <-done 
    } 
    fmt.Println("All done.") 
} 
0

Ich füge das hinzu, weil die vorhandenen Antworten ein paar Dinge nicht klar machen. Erstens ist die Bereichsschleife im Codewalk-Beispiel nur eine unendliche Ereignisschleife, um die URL-Liste für immer zu überprüfen und zu aktualisieren.

Als nächstes ist ein Kanal, allein schon die idiomatische Consumer-Producer-Warteschlange in Go. Die Größe des Async-Puffers, der den Kanal unterstützt, bestimmt, wie viel Erzeuger produzieren können, bevor Rückstau entsteht. Stellen Sie N = 0 ein, um den Konsumenten des Lock-Step-Producers zu sehen, ohne dass jemand vorauseilt oder zurückkommt. Mit N = 10 kann der Hersteller bis zu 10 Produkte produzieren, bevor er blockiert.

Zuletzt gibt es einige nette Idiome zum Schreiben kommunizierender sequentieller Prozesse in Go (z. B. Funktionen, die Routinen für Sie starten und das for/select-Muster zur Kommunikation und zum Akzeptieren von Steuerbefehlen verwenden). Ich halte WaitGroups für ungeschickt und würde stattdessen gerne idiomatische Beispiele sehen.

package main 

import (
    "fmt" 
    "time" 
) 

type control int 
const (
    sleep control = iota 
    die // receiver will close the control chan in response to die, to ack. 
) 

func (cmd control) String() string { 
    switch cmd { 
    case sleep: return "sleep" 
    case die: return "die" 
    } 
    return fmt.Sprintf("%d",cmd) 
} 

func ProduceTo(writechan chan<- int, ctrl chan control, done chan bool) { 
    var product int 
    go func() { 
     for { 
      select { 
     case writechan <- product: 
      fmt.Printf("Producer produced %v\n", product) 
      product++ 
     case cmd:= <- ctrl: 
      fmt.Printf("Producer got control cmd: %v\n", cmd) 
      switch cmd { 
      case sleep: 
       fmt.Printf("Producer sleeping 2 sec.\n") 
       time.Sleep(2000 * time.Millisecond) 
      case die: 
       fmt.Printf("Producer dies.\n") 
       close(done) 
       return 
      } 
      } 
     } 
    }() 
} 

func ConsumeFrom(readchan <-chan int, ctrl chan control, done chan bool) { 
    go func() { 
     var product int 
     for { 
      select { 
      case product = <-readchan: 
       fmt.Printf("Consumer consumed %v\n", product) 
      case cmd:= <- ctrl: 
       fmt.Printf("Consumer got control cmd: %v\n", cmd) 
       switch cmd { 
       case sleep: 
        fmt.Printf("Consumer sleeping 2 sec.\n") 
        time.Sleep(2000 * time.Millisecond) 
       case die: 
        fmt.Printf("Consumer dies.\n") 
        close(done) 
        return 
       } 

      } 
     } 
    }() 
} 

func main() { 

    N := 10 
    q := make(chan int, N) 

    prodCtrl := make(chan control) 
    consCtrl := make(chan control) 

    prodDone := make(chan bool) 
    consDone := make(chan bool) 


    ProduceTo(q, prodCtrl, prodDone) 
    ConsumeFrom(q, consCtrl, consDone) 

    // wait for a moment, to let them produce and consume 
    timer := time.NewTimer(10 * time.Millisecond) 
    <-timer.C 

    // tell producer to pause 
    fmt.Printf("telling producer to pause\n") 
    prodCtrl <- sleep 

    // wait for a second 
    timer = time.NewTimer(1 * time.Second) 
    <-timer.C 

    // tell consumer to pause 
    fmt.Printf("telling consumer to pause\n") 
    consCtrl <- sleep 


    // tell them both to finish 
    prodCtrl <- die 
    consCtrl <- die 

    // wait for that to actually happen 
    <-prodDone 
    <-consDone 
} 
0

Sie können einfache ungepufferte Kanäle ohne Wartegruppen verwenden, wenn Sie das Generatormuster mit einer fanIn-Funktion verwenden.

Im Generatormuster gibt jeder Produzent einen Kanal zurück und ist dafür verantwortlich, ihn zu schließen. Eine fanIn-Funktion iteriert dann über diese Kanäle und leitet die für sie zurückgegebenen Werte einen einzelnen Kanal zurück, den sie zurückgibt.

Das Problem ist natürlich, dass die Funktion fanIn den Nullwert des Kanaltyps (int) weiterleitet, wenn jeder Kanal geschlossen ist.

Sie können umgehen, indem Sie den Nullwert Ihres Kanaltyps als Sentinel-Wert verwenden und nur die Ergebnisse aus dem fanIn-Kanal verwenden, wenn sie nicht den Wert Null haben.

Hier ist ein Beispiel:

package main 

import (
    "fmt" 
    "math/rand" 
) 

const offset = 1 

func producer() chan int { 
    cout := make(chan int) 
    go func() { 
     defer close(cout) 
     // May or may not produce. 
     success := rand.Float32() > 0.5 
     if success { 
      cout <- rand.Int() + offset 
     } 
    }() 
    return cout 
} 

func fanIn(cin []chan int) chan int { 
    cout := make(chan int) 
    go func() { 
     defer close(cout) 
     for _, c := range cin { 
      cout <- <-c 
     } 
    }() 
    return cout 
} 

func main() { 
    chans := make([]chan int, 0) 
    for i := 0; i < 10; i++ { 
     chans = append(chans, producer()) 
    } 

    for num := range fanIn(chans) { 
     if num > offset { 
      fmt.Printf("Producer produced: %d\n", num) 
     } 
    } 
    fmt.Println("All done.") 
} 
Verwandte Themen