2017-02-22 5 views
0

Aktuellen Gerätenachricht an eine IoTHub auf einer Azure-Instanz Ich schicke und dann alle Nachrichten werden an einen EventHub zur Verarbeitung gesendet.Senden/Empfangen von Batch-Nachrichten zu Azure-Protokoll-Gateway

Mein Ziel ist es, eine Azure Protocol Cloud Gateway zu verwenden, um als Vermittler zu dienen, um zu empfangene Nachrichten zu empfangen und sie dann auszupacken, bevor Sie sie zur Verarbeitung senden. Indem die Nachrichten stapelweise gespeichert werden, kann ich die Menge der zu übertragenden Daten reduzieren, was die Datennutzungskosten senkt. Sobald sich die Daten in der Cloud befinden, können sie nicht komprimiert und normal verarbeitet werden.

Nach einigen Recherchen und Spielen mit dem Gateway auf meinem lokalen Rechner und mit einigen der in die Lösung integrierten Komponententests habe ich gesehen, wie die Nachrichten an den Gateway/IoTHub gesendet werden.

 ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString); 

     Stopwatch sw = Stopwatch.StartNew(); 

     await this.CleanupDeviceQueueAsync(hubConnectionStringBuilder.HostName, device); 

     var clientScenarios = new ClientScenarios(hubConnectionStringBuilder.HostName, this.deviceId, this.deviceSas); 

     var group = new MultithreadEventLoopGroup(); 
     string targetHost = this.tlsCertificate.GetNameInfo(X509NameType.DnsName, false); 

     var readHandler1 = new ReadListeningHandler(CommunicationTimeout); 
     Bootstrap bootstrap = new Bootstrap() 
      .Group(group) 
      .Channel<TcpSocketChannel>() 
      .Option(ChannelOption.TcpNodelay, true) 
      .Handler(this.ComposeClientChannelInitializer(targetHost, readHandler1)); 
     IChannel clientChannel = await bootstrap.ConnectAsync(this.ServerAddress, protocolGatewayPort); 
     this.ScheduleCleanup(() => clientChannel.CloseAsync()); 

     Task testWorkTask = Task.Run(async() => 
     { //Where the messaging actually starts and sends 
      Tuple<EventData, string>[] ehMessages = await CollectEventHubMessagesAsync(receivers, 2); //Async task for recieving messages back from the IoTHub 
      Tuple<EventData, string> qos0Event = Assert.Single(ehMessages.Where(x => TelemetryQoS0Content.Equals(x.Item2, StringComparison.Ordinal))); 
      Tuple<EventData, string> qos1Event = Assert.Single(ehMessages.Where(x => TelemetryQoS1Content.Equals(x.Item2, StringComparison.Ordinal))); 

      string qosPropertyName = ConfigurationManager.AppSettings["QoSPropertyName"]; 

      var qos0Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS0Content)); 
      qos0Notification.Properties[qosPropertyName] = "0"; 
      qos0Notification.Properties["subTopic"] = "tips"; 
      await serviceClient.SendAsync(this.deviceId, qos0Notification); 

      var qos1Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS1Content)); 
      qos1Notification.Properties["subTopic"] = "firmware-update"; 
      await serviceClient.SendAsync(this.deviceId, qos1Notification); 

      var qos2Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content)); 
      qos2Notification.Properties[qosPropertyName] = "2"; 
      qos2Notification.Properties["subTopic"] = "critical-alert"; 
      await serviceClient.SendAsync(this.deviceId, qos2Notification); 

      var qos2Notification2 = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content2)); 
      qos2Notification2.Properties[qosPropertyName] = "2"; 
      await serviceClient.SendAsync(this.deviceId, qos2Notification2); 
     }); 

So ist das "ServiceClient" sendet 4 Nachrichten in diesem Gerät zu testen: qos0Notification, qos1Notification, qos2Notification, qos2Notification2 und verwendet eine SendAsync Methode, um die Informationen zu senden.

Die SendAsync Methode ist Teil des Basiscode für die Anwendung und ist ein ideales Ziel, Verfahren nimmt auch eine DeviceId und ein Nachrichtenobjekt nicht verfügbar. Nachricht hat 3 Überladungen für das Objekt: Base, einen Byte-Stream oder ein Byte-Array.

Sobald das Gateway initialisiert sie es Nachrichten mit dieser Methode empfängt:

public override void ChannelRead(IChannelHandlerContext context, object message) 
    { 
     var packet = message as Packet; 
     if (packet == null) 
     { 
      CommonEventSource.Log.Warning($"Unexpected message (only `{typeof(Packet).FullName}` descendants are supported): {message}", this.ChannelId); 
      return; 
     } 

     this.lastClientActivityTime = DateTime.UtcNow; // notice last client activity - used in handling disconnects on keep-alive timeout 

     if (this.IsInState(StateFlags.Connected) || packet.PacketType == PacketType.CONNECT) 
     { 
      this.ProcessMessage(context, packet); 
     } 
     else 
     { 
      if (this.IsInState(StateFlags.ProcessingConnect)) 
      { 
       Queue<Packet> queue = this.connectPendingQueue ?? (this.connectPendingQueue = new Queue<Packet>(4)); 
       queue.Enqueue(packet); 
      } 
      else 
      { 
       // we did not start processing CONNECT yet which means we haven't received it yet but the packet of different type has arrived. 
       ShutdownOnError(context, string.Empty, new InvalidOperationException($"First packet in the session must be CONNECT. Observed: {packet}, channel id: {this.ChannelId}, identity: {this.identity}")); 
      } 
     } 
    } 

ich irgendwelche batched Nachrichten das Gefühl, das wäre der beste Ort, auszupacken. Sobald wir eine Liste von Nachrichten haben, senden wir sie an ProcessMessage, um festzustellen, um welche Art von Nachricht es sich handelt und wie sie behandelt wird.

Es scheint, als ob es nicht viele Informationen dafür gibt, da es sehr neu ist.

+0

Was ist Ihre Frage genau? – CSharpRocks

Antwort

0

Nach einigen Recherchen und mit dem Gateway spielt auf meinem lokalen Rechner und in die Lösung gebaut einige der Unit-Tests verwenden, habe ich gesehen, wie die Nachrichten an das Gateway/IoTHub gesendet werden.

Hier sendet ServiceClient Nachrichten an Geräte nicht an IoTHub. Diese Art von Nachrichten sind Cloud-To-Device messages. Sie können verwenden, um Device-To-Cloud-Nachrichten zu senden.

Mein Ziel ist es, ein Azure-Protokoll Cloud Gateway zu verwenden, als Vermittler zu fungieren batched Nachrichten zu empfangen und sich dann auspacken, bevor sie weg zur Verarbeitung zu senden. Durch die Zusammenfassung der Nachrichten kann ich die Menge der zu übertragenden Daten reduzieren, wodurch die Kosten für die Datennutzung um gesenkt werden.

Von Sie können benutzerdefinierten Prozess injizieren, um die Daten zu reduzieren, die an IoTHub übergeben werden. Nun, Sie können benutzerdefinierte Channel-Handler hinzufügen.

Protocol gateway uses pipeline-based message processing using the following handlers: TLS encryption/decryption, MQTT codec (encoder and decoder), and MqttIotHubAdapter. One way for customizing the gateway behavior is by injecting additional handlers in this pipeline. By adding a custom handler between the MQTT codec and the MqttIotHubAdapter you can inspect, modify, discard, delay, or split the passing messages.

Weitere Informationen und Beispielcode können Sie "Microsoft Azure IoT Protocol Gateway Developer Guide" verweisen.

+0

Die Verwendung des 'DeviceClient' gibt also die Option für' SendEventBatchAsync'.Sendet diese Methode alle Nachrichten im Stapel zusammen oder als separate Nachrichten? Ich habe meinen eigenen Komponententest mit 'DeviceClient' erstellt und einen Stapel von Nachrichten gesendet, aber ich kann derzeit nicht feststellen, ob die Nachrichten auf der Gateway-Seite als einzelner Stapel oder als einzelne Nachrichten empfangen werden. Der 'Geräte-Explorer' zeigt die Daten vom Gerät an, die den IoTHub betreten und alle haben den Zeitstempel, aber ich konnte die Nachrichten nicht abfangen, wenn sie in das Gateway gelangen. –

+0

Ja, die Methode ['SendEventBatchAsync'] (https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.devices.client.deviceclient#Microsoft_Azure_Devices_Client_DeviceClient_SendEventBatchAsync_System_Collections_Generic_IEnumerable_Microsoft_Azure_Devices_Client_Message__) sendet Nachrichten im Batch. Azure iot sdk ist Open Source und Sie finden die Implementierung [hier] (https://github.com/Azure/azure-iot-sdk-csharp/blob/master/device/Microsoft.Azure.Devices.Client/DeviceClient. cs/# L664). –

+0

Wenn ich 'SendEventBatchAsync' verwende, wird es direkt zum Protocol Gateway oder zum IotHub gesendet? Ich habe mithilfe von 'DeviceClient' einen Komponententest mit dieser Methode erstellt und die Nachrichten im Geräte-Explorer angezeigt, aber der IoTHub zeigte nur eine Nachricht an, die im Azure-Portal empfangen wurde. Wenn Sie vom Gateway empfangen werden, wissen Sie, wo die Nachrichten empfangen werden? Die 'ReadChannel' Methode im' MqttAdapter' scheint die Nachrichten nicht zu empfangen. –