2010-12-03 8 views
10

Ich experimentiere mit ZeroMQ und versuchen, etwas arbeiten zu bekommen. Mein erster Gedanke war, einen REP/REQ mit dem inproc-Transport einzurichten, um zu sehen, ob ich Nachrichten zwischen zwei Threads senden könnte. Der Großteil des folgenden Codes stammt aus den clclmq-Beispielen, scheint aber nicht zu funktionieren.Verwenden von ZeroMQ mit C# mit Inproc-Transport

Sowohl der Server als auch der Client sind an den Transport gebunden, aber wenn der Client versucht, eine Send zu tun, blockiert es und sitzt nur dort. Ich habe keine ZeroMQ-Erfahrung, daher bin ich mir nicht sicher, wo ich zuerst hinschauen soll. Jede Hilfe wäre sehr willkommen. Hier ist der säumige (offensive) Code:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     [Test] 
     public void Should() 
     { 
      var clientThread = new Thread(StartClient); 
      clientThread.Start(); 

      var serverThread = new Thread(StartServer); 
      serverThread.Start(); 

      clientThread.Join(); 
      serverThread.Join(); 

      Console.WriteLine("Done with life"); 
     } 

     private void StartServer() 
     { 


      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 
      } 

      Console.WriteLine("Done with server"); 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
         1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
         latency.ToString("f2")); 
       } 
      } 

      Console.WriteLine("Done with client"); 
     } 

    } 
} 

Edit:

Ich habe arbeiten diese mit Hilfe der unten Antwort, aber es musste ich auch eine Bind zu einem Connect ändern, was Sinn macht, wenn Sie denken darüber nach, da ein Server an einen lokalen Transport und ein Client an einen Remotetransport gebunden ist. Hier ist der aktualisierte Code:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     private static Context ctx; 

     [Test] 
     public void Should() 
     { 
      using (ctx = new Context(1)) 
      { 
       var clientThread = new Thread(StartClient); 
       clientThread.Start(); 

       var serverThread = new Thread(StartServer); 
       serverThread.Start(); 

       clientThread.Join(); 
       serverThread.Join(); 

       Console.WriteLine("Done with life"); 
      } 
     } 

     private void StartServer() 
     { 
      try 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 

       Console.WriteLine("Done with server"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      try 
      { 
       // Initialise 0MQ infrastructure 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Connect(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
            1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
             latency.ToString("f2")); 
       } 

       Console.WriteLine("Done with client"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

    } 
} 

Antwort

14

Ich glaube, beide Threads müssen den gleichen Kontext verwenden. Der Zeromq-Guide empfiehlt, nicht mehr als einen Kontext in einem Prozess zu verwenden. Erstellen Sie einen Kontext, teilen Sie diesen Kontext zwischen den beiden Threads. Das sollte funktionieren.

Von http://zguide.zeromq.org/chapter:all

Sie MÜSSEN ein ‚Kontext‘ Objekt für Ihren Prozess erstellen, und übergeben Sie das zu alle Threads. Der Kontext erfasst den Status von ØMQ. Um eine Verbindung über die Inproc: Transport zu erstellen, müssen Server- und Client-Thread das gleiche Kontextobjekt teilen.

+0

Das war sehr nützlich, danke! – jonnii

2

Nur ein Ende kann binden, das andere muss verbinden, Sie können mehrere Verbindungen haben.