2015-05-20 9 views
5

Ich versuche Kanäle/STM zu verwenden, um Message Passing in Haskell zu implementieren. Vielleicht ist das eine schreckliche Idee, und es gibt eine bessere Möglichkeit, Nachrichtenübermittlung in Haskell zu implementieren/zu verwenden. Wenn das der Fall ist, lass es mich wissen; Meine Suche hat jedoch einige grundlegende Fragen zum gleichzeitigen Haskell aufgeworfen.Haskell, Kanäle, STM, Threaded, Message Passing

Ich habe große Dinge über STM gehört, und insbesondere die Implementierung in Haskell. Da es das Lesen und Schreiben unterstützt und einige Sicherheitsvorteile bietet, dachte ich mir, dass man dort anfangen würde. Das bringt meine größte Frage: hat

msg <- atomically $ readTChan chan 

wo chan ist ein Tchan Int, verursachen eine Wartezeit, die für den Kanal wartet auf einen Wert haben?

Betrachten Sie das folgende Programm:

p chan = do 
    atomically $ writeTChan chan 1 
    atomically $ writeTChan chan 2 

q chan = do 
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan 
    -- for testing purposes 
    putStrLn $ show msg1 
    putStrLn $ show msg2 

main = do 
    chan <- atomically $ newTChan 
    p chan 
    q chan 

das Kompilieren mit ghc --make Threaded, und dann das Programm ausführen, und in der Tat erhalten Sie 1, gefolgt von 2 gedruckt zu trösten. Nehmen wir jetzt an, wir machen

main = do 
    chan <- atomically $ newTChan 
    forkIO $ p chan 
    forkIO $ q chan 

stattdessen. Wenn wir nun - threaded verwenden, wird entweder nichts, 1 oder 1 gefolgt von 2 zum Terminal gedruckt; Wenn Sie jedoch nicht mit einem Thread kompilieren, wird immer 1 gefolgt von 2 gedruckt. Frage 2: Was ist der Unterschied zwischen-Threaded und nicht? Ich stelle mir vor, dass sie nicht wirklich als gleichlaufende Dinge laufen, und sie werden nur nacheinander ausgeführt. Dies steht im Einklang mit dem Folgenden.

Nun, in meinem Denken, wenn ich p und q gleichzeitig ausgeführt hätte; Das heißt, ich hätte sie gequatscht, sie sollten in der entgegengesetzten Reihenfolge laufen können. Jetzt

main = do 
    chan <- atomically newTChan 
    forkIO $ q chan 
    forkIO $ p chan 

Gesetzt, wenn ich dies ohne Threaded kompilieren, bekomme ich nie etwas gedruckt zu trösten. Wenn ich mit Thread kompiliere, mache ich manchmal. Es ist jedoch sehr selten, dass 1 von 2 gefolgt wird - normalerweise nur 1 oder nichts. Ich habe dies auch mit Control.Concurrent.Chan versucht und konsistente Ergebnisse erhalten.

Zweite große Frage: Wie spielen Kanäle und Gabel zusammen, und was ist in dem obigen Programm los?

Auf jeden Fall scheint es, dass ich die Nachrichtenübergabe mit STM nicht naiv simulieren kann. Vielleicht ist Cloud Haskell eine Option, die diese Probleme löst - ich weiß es wirklich nicht. Irgendwelche Informationen darüber, wie man Nachrichtenübergabe knapp werden lässt, um zu serialisieren ~~> in den Socket schreiben ~~> aus dem Socket lesen ~~> deserialize würde sehr geschätzt werden.

+0

Re wechseln: „Was ist der Unterschied zwischen Threaded und nicht“, Du [ meine Darstellung zu Haskells Threading-Modell] (http://dmwit.com/gtk2hs). Ignoriere die gtk-spezifischen Bits. –

Antwort

8

Nein Ihre Idee ist richtig - das kindof ist, was TChan s sind für - Sie verfehlten nur einen kleinen Punkt von forkIO:

Das Problem ist, dass Ihr Haupt-Thread wird nicht für die Beendigung des mit erstellten Threads warten forkIO (see here for reference)

so, wenn ich die Hinweis verwenden in der Referenz angegeben:

import Control.Concurrent 
import Control.Concurrent.STM 

p :: Num a => TChan a -> IO() 
p chan = do 
    atomically $ writeTChan chan 1 
    atomically $ writeTChan chan 2 

q chan = do 
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan 
    -- for testing purposes 
    putStrLn $ show msg1 
    putStrLn $ show msg2 

main :: IO() 
main = do 
    children <- newMVar [] 
    chan <- atomically $ newTChan 
    _ <- forkChild children $ p chan 
    _ <- forkChild children $ q chan 
    waitForChildren children 
    return() 

waitForChildren :: MVar [MVar()] -> IO() 
waitForChildren children = do 
    cs <- takeMVar children 
    case cs of 
    [] -> return() 
    m:ms -> do 
     putMVar children ms 
     takeMVar m 
     waitForChildren children 

forkChild :: MVar [MVar()] -> IO() -> IO ThreadId 
forkChild children io = do 
    mvar <- newEmptyMVar 
    childs <- takeMVar children 
    putMVar children (mvar:childs) 
    forkFinally io (\_ -> putMVar mvar()) 

es Arbeit s wie erwartet:

d:/Temp $ ghc --make -threaded tchan.hs 
[1 of 1] Compiling Main    (tchan.hs, tchan.o) 
Linking tchan.exe ... 
d:/Temp $ ./tchan.exe 
1 
2 
d:/Temp $ 

und natürlich wird es auch weiterhin arbeiten, wenn Sie die Anrufe p und q zu

+1

Gibt es ein Modul/eine Bibliothek, die diese 'forkChild' /' waitForChildren' Sache vereinfachen können? – Bergi