2017-01-23 1 views
1

Zu Beginn ist erwähnenswert, dass Serialisierung und Deserialisierung von Bond Nachrichten innerhalb einer einzelnen F # -Lösung funktioniert. Ich habe jedoch Probleme, das Senden und/oder Empfangen der Nachricht über ZeroMQ richtig zu handhaben.Laufzeit "EndOfStreamException" Fehler in MS Bond Serialisierung über ZeroMQ

Es liegt ein Laufzeitfehler auf der Teilnehmerseite des folgenden Programms vor. Eine .bond-Datei wird mit dem Bond-Compiler definiert und kompiliert. Dann wird eine DLL von C# erstellt, um von F # aufgerufen zu werden. Ich habe dann zwei F # -Programme. Eine, die serialisierte Daten über einen TCP-Socket und einen anderen, der ein Abonnent ist, veröffentlicht. Wenn die Nachricht auf der Unterstation empfangen wird, ist die Zeile, die versucht, die Rohdaten zu unmarshalen, diejenige, die den Laufzeitfehler verursacht. Kann jemand den Grund dafür sehen?

[EDIT] Per Fyodor Kommentar, habe ich eine Änderung auf der Publisher-Seite, die den Fehler auf der Teilnehmerseite ändert. Der Fehler hat wahrscheinlich etwas damit zu tun, wie ich die Informationen packe und entpacke.

Dies ist die .bond Datei

namespace Examples 

struct Record 
{ 
    0: map<string, double> payload; 
} 

Hier ist der Verlag:

// publisher 

open System 
open Bond 
open Bond.Protocols 
open Bond.IO.Safe 
open ZeroMQ 

let ctx = new ZContext() 
let publisher = new ZSocket(ctx, ZSocketType.PUB) 
publisher.Bind("tcp://*:5556") 

let src = new Examples.Record() 
src.payload.Add("a", 1.) 
src.payload.Add("b", 2.) 

let output = new OutputBuffer() 
let writer = new CompactBinaryWriter<OutputBuffer>(output) 

while true do 
    Marshal.To(writer, src) 
    //let input = new InputBuffer(output.Data) 
    //let byteArr = input.ReadBytes(int(input.Length - 1L)) 
    let updateFrame = new ZFrame(System.Text.Encoding.ASCII.GetString output.Data.Array) 
    publisher.Send(updateFrame) 

Hier wird der Teilnehmer:

// subscriber 

open Bond 
open Bond.Protocols 
open Bond.IO.Safe 
open System 
open System.Text 
open ZeroMQ 

let ctx = new ZContext() 
let subscriber = new ZSocket(ctx, ZSocketType.SUB) 
subscriber.Connect("tcp://127.0.0.1:5556") 
subscriber.SubscribeAll() 

let output = new OutputBuffer()  
while true do  
    let received = subscriber.ReceiveFrame() 
    let byteArr = Encoding.ASCII.GetBytes (received.ReadString()) 
    let arrSeg = ArraySegment<byte>(byteArr) 
    let input = new InputBuffer(arrSeg) 
    let dst = Unmarshal<Examples.Record>.From(input) 
    for KeyValue(k, v) in dst.payload do 
     printfn "%A %A" k v 
+3

Ich sehe, Sie erstellen ein 'ZFrame' aus' byteArr.ToString() '. Das tut nicht, was du denkst. Versuchen Sie, das Ergebnis von 'byteArr.ToString()' auszudrucken, um zu sehen, was Sie tatsächlich senden. –

Antwort

4

Auf der Empfangsseite, wenn Sie versuchen, Wenn Sie das Marshalled Bond Compact Binary als ASCII-Zeichenfolge decodieren, verlieren Sie einen Teil der Nutzlast. Beim Marshaling einer Struktur wie Record zu Compact Binary sind die ersten vier Bytes der Payload 0x43 0x42 0x10 0x00. Beim Lesen einer Zeichenfolge aus einem ZFrame signalisiert the first embedded NUL (0x00) das Ende der Zeichenfolge, unabhängig von der Größe des Rahmens. Also, die Leseseite sieht nur 0x43 0x42 0x10 anstelle der gesamten Nutzlast (29 Bytes beim Testen).

Da Compact Binary ist ein binäre Protokoll, werden Sie den ZFrame Konstruktor verwenden möchten, die einen Puffer auf der Publisher-Seite nimmt:

let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count) 

Auf der Teilnehmerseite, möchten Sie einfach lesen Sie den Puffer:

let byteArr = received.Read() 

auch auf der Publisher-Seite, sind Sie ständig Daten in den gleichen Output akkumulieren. Sie werden output.Position-0 bevor Sie marshall Ihren nächsten Datensatz zurücksetzen möchten den Puffer wiederverwenden, anstatt es zu wachsen:

while true do 
    Marshal.To(writer, src) 
    let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)output.Data.Array) 
    publisher.Send(updateFrame) 
    output.Position <- 0 

Eine andere Sache zu beachten: Der Standardpuffer für einen OutputBuffer zugeordnet ist 65KiB. Überlegen Sie, ob Sie diese Größe reduzieren möchten, sobald Sie wissen, wie groß Ihre Nutzlast sein wird.

Hinweis: Ich debuggte dies in einer C# -Anwendung, die ähnliche Semantik hatte. Hier ist, was ich verwendet habe:

namespace so_q_zmq 
{ 
    using System; 
    using System.Collections.Generic; 
    using System.Text; 
    using System.Threading.Tasks; 
    using Bond; 
    using Bond.IO.Safe; 
    using Bond.Protocols; 
    using ZeroMQ; 

    [Schema] 
    class Record 
    { 
     [Id(0)] 
     public Dictionary<string, double> payload = new Dictionary<string, double>(); 
    } 

    class Program 
    { 
     static void Main(string[] args) 
     { 
      var pTask = Task.Run(() => 
      { 
       try 
       { 
        Publisher(); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("Publisher failed: {0}", ex); 
       } 
      }); 

      var sTask = Task.Run(() => 
      { 
       try 
       { 
        Subscriber(); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("Subscriber failed: {0}", ex); 
       } 
      }); 

      Task.WaitAll(pTask, sTask); 
      Console.WriteLine("Done"); 
      Console.ReadLine(); 
     } 

     static void Publisher() 
     { 
      var ctx = new ZContext(); 
      var publisher = new ZSocket(ctx, ZSocketType.PUB); 
      publisher.Bind("tcp://127.0.0.1:12345"); 

      var src = new Record(); 
      src.payload.Add("a", 1.0); 
      src.payload.Add("b", 2.0); 

      var output = new OutputBuffer(); 
      var writer = new CompactBinaryWriter<OutputBuffer>(output); 

      for (;;) 
      { 
       Marshal.To(writer, src); 
       // INCORRECT: 
       // var str = Encoding.ASCII.GetString(output.Data.Array); 
       // var updateFrame = new ZFrame(str); 
       var updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count); 
       publisher.Send(updateFrame); 
       output.Position = 0; 
      } 
     } 

     static void Subscriber() 
     { 
      var ctx = new ZContext(); 
      var subscriber = new ZSocket(ctx, ZSocketType.SUB); 
      subscriber.Connect("tcp://127.0.0.1:12345"); 
      subscriber.SubscribeAll(); 

      for (;;) 
      { 
       var received = subscriber.ReceiveFrame(); 
       // INCORRECT 
       // var str = received.ReadString(); 
       // var byteArr = Encoding.ASCII.GetBytes(str); 
       var byteArr = received.Read(); 
       var arrSeg = new ArraySegment<byte>(byteArr); // There's an InputBuffer ctor that takes a byte[] directly 
       var input = new InputBuffer(arrSeg); 
       var dst = Unmarshal<Record>.From(input); 
       foreach (var kvp in dst.payload) 
       { 
        Console.WriteLine("{0} {1}", kvp.Key, kvp.Value); 
       } 
      } 
     } 
    } 
}