2017-08-01 2 views
0

Ich würde gerne wissen, welche anderen Optionen ich habe, um kontinuierlich von einem Named Pipe mit Golang zu lesen. Mein aktueller Code beruht auf einer unendlichen for-Schleife, die in einem Gorutine läuft; aber Hat eine CPU bei 100% Auslastung.Kontinuierlich von einem Named Pipe lesen

func main() { 
.... 

var wg sync.WaitGroup 
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600) 
defer fpipe.Close() 

f, _ := os.Create("dump.txt") 
defer f.Close() 
var buff bytes.Buffer 

wg.Add(1) 
go func() { 
     for { 
      io.Copy(&buff, fpipe) 
      if buff.Len() > 0 { 
       buff.WriteTo(f) 
      } 
     } 
    }() 

    wg.Wait() 
} 
+2

'io.Copy' "liest kontinuierlich", setzt sie nicht in einer Schleife setzen. – JimB

+0

@ JimB Ich denke, ich sollte erwähnen, dass die Schreibseite nicht ständig auf die Named Pipe schreibt. Wenn ich mich ausschließlich auf io.Copy stütze, wird die goroutine beendet, sobald EOF erreicht ist. –

Antwort

2

Ein Named Pipe Reader wird EOF empfangen, wenn keine Schreiber mehr vorhanden sind. Die Lösung außerhalb dieses Codes soll sicherstellen, dass es immer einen Writer-Prozess gibt, der den Dateideskriptor enthält, obwohl er nichts schreiben muss.

Wenn Sie innerhalb des Go-Programms auf einen neuen Writer warten möchten, müssen Sie in Ihrer for-Schleife die io.Reader abfragen. Ihr aktueller Code tut dies mit einer aktiven Schleife, die 100% von 1 CPU-Kern verbraucht. Hinzufügen eines Schlaf und eine Art und Weise auf andere Fehler zurückkehren wird um das Problem zu umgehen:

for { 
    err := io.Copy(&buff, fpipe) 
    if buff.Len() > 0 { 
     buff.WriteTo(f) 
    } 

    if err != nil { 
     // something other than EOF happened 
     return 
    } 

    time.Sleep(100 * time.Millisecond) 
} 
+0

Aus Neugier, warum runtime.Gosched macht den Job nicht? –

+0

@ Benjamin.E: Du brauchst 'runtime.GoSched' nicht, weil du anderen Gouutines schon nachgibt. Funktions-/Methodenaufrufe und Kanaloperationen werden nachgeben, so dass "io.Copy" vollkommen ausreichend ist. Das Problem ist nur, dass Sie so schnell wie möglich drehen, wenn nichts da ist. Leser werden nicht in Millisekunden-Zeitrahmen kommen und gehen, also gibt es keinen Grund, dies schnell zu wiederholen. – JimB

+0

@ JimB Bitte lesen Sie meine Kritik an Ihrer Lösung in meiner Antwort. –

0

Intro

Wie schon geschrieben, ein Named Pipe Leser wird ein EOF erhalten, wenn keine Schriftsteller bleiben.

Allerdings finde ich @ JimB Lösung suboptimal:

  1. A Named Pipe hat eine maximale Kapazität (65kB, iirc), die auch innerhalb der 100 ms Schlafperiode gefüllt erhalten kann. Wenn der Puffer gefüllt ist, blockieren alle Schreiber ohne guten Grund.
  2. Wenn ein Neustart erfolgt, verlieren Sie durchschnittlich 50ms an Daten. Wiederum ohne guten Grund.
  3. Wenn Sie einen statischen Puffer zum Kopieren verwenden möchten, wäre io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) die bessere Lösung, imho. Dies ist jedoch nicht einmal notwendig, da io.Copy (oder die zugrunde liegende Implementierung) tatsächlich einen Puffer von 32kB zuweist.

Mein Ansatz

Eine bessere Lösung wäre für einen Schreib warten geschehen und die sofort den Inhalt des Named Pipe in die Zieldatei kopieren. Auf den meisten Systemen gibt es eine Art Benachrichtigung über Dateisystemereignisse. Das Paket github.com/rjeczalik/notify kann verwendet werden, um auf Ereignisse zuzugreifen, an denen wir interessiert sind, da Write-Ereignisse plattformübergreifend auf den meisten wichtigen Betriebssystemen funktionieren. Das andere Ereignis, das für uns interessant sein könnte, ist das Entfernen der Named Pipe, da wir nichts zu lesen hätten.

Daher wäre meine Lösung sein:

package main 

import (
    "flag" 
    "io" 
    "log" 
    "os" 

    "github.com/rjeczalik/notify" 
) 

const (
    MAX_CONCURRENT_WRITERS = 5 
) 

var (
    pipePath string 
    filePath string 
) 

func init() { 
    flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from") 
    flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file") 
    log.SetOutput(os.Stderr) 
} 

func main() { 
    flag.Parse() 

    var p, f *os.File 
    var err error 
    var e notify.EventInfo 

    // The usual stuff: checking wether the named pipe exists etc 
    if p, err = os.Open(pipePath); os.IsNotExist(err) { 
     log.Fatalf("Named pipe '%s' does not exist", pipePath) 
    } else if os.IsPermission(err) { 
     log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err) 
    } else if err != nil { 
     log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err) 
    } 
    // Yep, there and readable. Close the file handle on exit 
    defer p.Close() 

    // Do the same for the output file 
    if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) { 
     log.Fatalf("File '%s' does not exist", filePath) 
    } else if os.IsPermission(err) { 
     log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err) 
    } else if err != nil { 
     log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err) 
    } 
    // Again, close the filehandle on exit 
    defer f.Close() 

    // Here is where it happens. We create a buffered channel for events which might happen 
    // on the file. The reason why we make it buffered to the number of expected concurrent writers 
    // is that if all writers would (theoretically) write at once or at least pretty close 
    // to each other, it might happen that we loose event. This is due to the underlying implementations 
    // not because of go. 
    c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS) 

    // Here we tell notify to watch out named pipe for events, Write and Remove events 
    // specifically. We watch for remove events, too, as this removes the file handle we 
    // read from, making reads impossible 
    notify.Watch(pipePath, c, notify.Write|notify.Remove) 

    // We start an infinite loop... 
    for { 
     // ...waiting for an event to be passed. 
     e = <-c 

     switch e.Event() { 

     case notify.Write: 
      // If it a a write event, we copy the content of the named pipe to 
      // our output file and wait for the next event to happen. 
      // Note that this is idempotent: Even if we have huge writes by multiple 
      // writers on the named pipe, the first call to Copy will copy the contents. 
      // The time to copy that data may well be longer than it takes to generate the events. 
      // However, subsequent calls may copy nothing, but that does not do any harm. 
      io.Copy(f, p) 

     case notify.Remove: 
      // Some user or process has removed the named pipe, 
      // so we have nothing left to read from. 
      // We should inform the user and quit. 
      log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath) 
     } 
    } 
} 
+0

Um nur von oben zu wiederholen, ist der Puffer der Pipe genau das, ein Puffer, und er akzeptiert Daten unabhängig vom Status des Leseprozesses. Es gibt keine Garantie dafür, dass der Puffer im Falle eines Fehlschlags überhaupt gelesen wurde, und sich selbst zu sagen, ist nur das Schicksal verlockend. Sie müssen akzeptieren, dass es immer die Möglichkeit eines Datenverlusts gibt, wenn keine Bestätigung vorliegt. Dies macht jedoch eine reaktionsfähigere Implementierung (auf Kosten zusätzlicher Komplexität). – JimB

+0

@JimB Off, Sie können Daten verlieren, es ist jedoch weniger wahrscheinlich, und diese Lösung ist weniger anfällig für die Autoren zu blockieren. –

+0

Ich dachte auch an einen Benachrichtigungsmechanismus, um überladene Daten zu vermeiden. –