2013-05-16 5 views
5

Bin ich richtig Ich nehme an, dass innerhalb der Grenzen des gleichen Prozesses mit 2 Threads Lesen/Schreiben in eine Named Pipe Reader überhaupt nicht blockiert? Also mit falschen Timings ist es möglich, einige Daten zu verpassen?Benannte Pipes in Java und Multithreading

Und im Falle von mehreren Prozessen - Reader wird warten, bis einige Daten verfügbar sind, und Schriftsteller wird blockiert werden, bis der Leser alle vom Leser gelieferten Daten lesen wird?

Ich plane, Named Pipe zu verwenden, um mehrere (Dutzende, Hunderte) Dateien von externen Prozess zu übergeben und in meiner Java-Anwendung konsumieren. Schreiben von einfachen Komponententests, um einen Thread zum Schreiben in die Pipe zu verwenden, und ein anderer zum Lesen aus der Pipe führte zu sporadischen Testfehlern aufgrund fehlender Datenblöcke.

Ich denke, es ist wegen der Threading und des gleichen Prozesses, so ist mein Test im Allgemeinen nicht korrekt. Ist diese Annahme richtig?

Hier ist eine Art Beispiel, das den Fall veranschaulicht:

import java.io.{FileOutputStream, FileInputStream, File} 
import java.util.concurrent.Executors 
import org.apache.commons.io.IOUtils 
import org.junit.runner.RunWith 
import org.scalatest.FlatSpec 
import org.scalatest.junit.JUnitRunner 

@RunWith(classOf[JUnitRunner]) 
class PipeTest extends FlatSpec { 

    def md5sum(data: Array[Byte]) = { 
    import java.security.MessageDigest 
    MessageDigest.getInstance("MD5").digest(data).map("%02x".format(_)).mkString 
    } 

    "Pipe" should "block here" in { 
    val pipe = new File("/tmp/mypipe") 
    val srcData = new File("/tmp/random.10m") 
    val md5 = "8e0a24d1d47264919f9d47f5223c913e" 
    val executor = Executors.newSingleThreadExecutor() 
    executor.execute(new Runnable { 
     def run() { 
     (1 to 10).foreach { 
      id => 
      val fis = new FileInputStream(pipe) 
      assert(md5 === md5sum(IOUtils.toByteArray(fis))) 
      fis.close() 
     } 
     } 
    }) 
    (1 to 10).foreach { 
     id => 
     val is = new FileInputStream(srcData) 
     val os = new FileOutputStream(pipe) 
     IOUtils.copyLarge(is, os) 
     os.flush() 
     os.close() 
     is.close() 
     Thread.sleep(200) 
    } 
    } 

} 

ohne Thread.sleep (200) der Test aus Gründen

  • Rohrbruch Ausnahme
  • passieren versagt
  • falsche MD5 Summe

mit dieser Verzögerung eingestellt - es funktioniert einfach großartig. Ich verwende eine Datei mit 10 Megabyte zufälligen Daten.

+1

Sie müssen einen Kontext bereitstellen. Wo hast du gelesen, dass Named Pipes niemals blockieren? Das klingt widersinnig, sie sind ... naja ... Pfeifen, keine überlaufenden Eimer. – millimoose

+1

Persönlich, fehlende Daten Chunks klingt wie Ihre Tests falsch sind, könnten Sie irgendwo einen Stream-Puffer Clobbering sein. Verwenden Sie mehrere * Prozesse *, keine Threads zum Testen.Stellen Sie alternativ eine Frage mit Ihrer Testkonfiguration, um zu fragen, warum Datenblöcke fehlen. (Was würde dies eine "XY-Problem" -Frage machen. Sie sollten nach dem Problem fragen, das Sie haben, nicht Ihre Spekulation auf die Ursache oder Workaround.) – millimoose

+0

@millimoose Ich behaupte nicht, dass Named Pipes nie blockieren, aber das ist was Ich sehe in den Testergebnissen. Der "Writer" -Thread kann 3 Dateien in die Pipe schreiben, und "Reader" kann nur einen von ihnen lesen. Das ist verwirrend. – jdevelop

Antwort

5

Dies ist eine sehr einfache Race-Bedingung in Ihrem Code: Sie schreiben Nachrichten mit fester Größe an die Pipe und nehmen an, dass Sie die gleichen Nachrichten zurück lesen können. Sie haben jedoch keine Ahnung, wie viele Daten in der Pipe für einen bestimmten Lesevorgang verfügbar sind.

Wenn Sie Ihre Schreibvorgänge mit der Anzahl der geschriebenen Bytes voranstellen und sicherstellen, dass jeder Lesezugriff nur die Anzahl der Bytes liest, sehen Sie, dass die Pipes genau wie angekündigt funktionieren.

Wenn Sie eine Situation mit mehreren Writern und/oder mehreren Lesern haben, empfehle ich eine tatsächliche Nachrichtenwarteschlange zu verwenden. Eigentlich empfehle ich in jedem Fall eine Nachrichtenwarteschlange zu verwenden, da sie das Problem der Abgrenzung der Nachrichtengrenze löst; Es hat wenig Sinn, dieses spezielle Rad neu zu erfinden.

+0

Ich denke, das ist nicht der Fall, weil ich Pipe vor dem Lese-/Schreibvorgang öffne und danach wieder schließe. Also sollte überhaupt kein Problem mit den Rennbedingungen sein. – jdevelop

+0

Ich gehe davon aus, dass es den zweiten Versuch verhindern sollte, ein Rohr vor dem vollständigen Lesen zu öffnen. – jdevelop

+0

@jdevelop - Das Öffnen und Schließen des Rohres ändert nichts am Inhalt der Rohrleitung. Und es dient sicherlich nicht als Synchronisationspunkt zwischen Ihren Lese- und Schreibvorgängen. Aber selbst wenn es so wäre, hättest du ein Rennen zwischen den Lesern und den Autoren. Wenn Sie der Meinung sind, dass die Dokumentation etwas anderes sagt, aktualisieren Sie bitte Ihre Frage entsprechend. – parsifal

1

Bin ich richtig Ich nehme an, dass innerhalb der Grenzen des gleichen Prozesses mit 2 Threads Lesen/Schreiben zu einem Named Pipe Reader überhaupt nicht blockiert?

Nicht, wenn Sie nicht blockierende E/A verwenden, was nicht der Fall ist.

Also mit falschen Timings ist es möglich, einige Daten zu verpassen?

Nicht, wenn Sie nicht blockierende E/A verwenden, was nicht der Fall ist.

+0

Ich glaube, das ist subtil ungenau. Eine "erschöpfte" Named Pipe würde das Warten auf mehr Daten nicht blockieren, sondern stattdessen EOF zurückgeben, nein? – millimoose

+0

@millimoose Keine Ungenauigkeit, nur ein Missverständnis von Ihrer Seite. Ich habe nicht gesagt, ein Lesen blockiert immer. Ich sagte, der OP habe sich geirrt, um zu sagen, dass er nie blockt. – EJP