2014-11-18 4 views
7

ich ein einfaches Programm haben:scalaz-Stream-Eingabeprozesse Merging scheint zu "warten" auf stdin

import scalaz._ 
import stream._ 

object Play extends App { 
    val in1 = io.linesR("C:/tmp/as.txt") 
    val in2 = io.linesR("C:/tmp/bs.txt") 

    val p = (in1 merge in2) to io.stdOutLines 
    p.run.run 
} 

Die Datei as.txt enthält fünf a s und die Datei s enthalten. Ich sehe diese Art der Ausgabe:

a 
b 
b 
a 
a 
b 
a 
a 
a 

Allerdings, wenn ich die Erklärung von in2 wie folgt ändern:

val in2 = io.stdInLines 

Dann bekomme ich, was ich denke unerwartetes Verhalten. Gemäß der Dokumentation 1 sollte das Programm Daten nicht-deterministisch aus jedem Datenstrom ziehen, je nachdem, welcher Datenstrom schneller ist. Das sollte bedeuten, dass ich eine Reihe von a s sofort auf der Konsole gedruckt sehe, aber das passiert gar nicht.

In der Tat passiert nichts, bis ich ENTER drücke. Es ist ziemlich klar, dass das Verhalten sehr ähnlich ist, was ich erwarten würde, wenn ich zufällig einen Stream auswähle, um das nächste Element zu erhalten, und wenn der Stream blockiert, blockiert der gemischte Prozess ebenfalls (auch wenn der andere Stream Daten enthält)).

Was ist los?

1 - na ja, OK, gibt es sehr wenig Dokumentation, aber Dan Spiewak sagte sehr deutlich in his talk, dass es greifen, wer der erste war, Daten liefern

Antwort

6

Das Problem ist bei der Umsetzung der stdInLines. Es blockiert, es nie Task.fork s ein Thread.

Versuchen Sie, die implentation von stdInLines zu diesem Wechsel:

def stdInLines: Process[Task,String] = 
    Process.repeatEval(Task.apply { 
    Option(scala.Console.readLine()) 
    .getOrElse(throw Cause.Terminated(Cause.End)) 
}) 

Die ursprüngliche io.stdInLines ist die readLine() im selben Thread ausgeführt wird, so wartet er immer dort, bis Sie etwas eingeben.