Aktuellen Gerätenachricht an eine IoTHub auf einer Azure-Instanz Ich schicke und dann alle Nachrichten werden an einen EventHub zur Verarbeitung gesendet.Senden/Empfangen von Batch-Nachrichten zu Azure-Protokoll-Gateway
Mein Ziel ist es, eine Azure Protocol Cloud Gateway zu verwenden, um als Vermittler zu dienen, um zu empfangene Nachrichten zu empfangen und sie dann auszupacken, bevor Sie sie zur Verarbeitung senden. Indem die Nachrichten stapelweise gespeichert werden, kann ich die Menge der zu übertragenden Daten reduzieren, was die Datennutzungskosten senkt. Sobald sich die Daten in der Cloud befinden, können sie nicht komprimiert und normal verarbeitet werden.
Nach einigen Recherchen und Spielen mit dem Gateway auf meinem lokalen Rechner und mit einigen der in die Lösung integrierten Komponententests habe ich gesehen, wie die Nachrichten an den Gateway/IoTHub gesendet werden.
ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
Stopwatch sw = Stopwatch.StartNew();
await this.CleanupDeviceQueueAsync(hubConnectionStringBuilder.HostName, device);
var clientScenarios = new ClientScenarios(hubConnectionStringBuilder.HostName, this.deviceId, this.deviceSas);
var group = new MultithreadEventLoopGroup();
string targetHost = this.tlsCertificate.GetNameInfo(X509NameType.DnsName, false);
var readHandler1 = new ReadListeningHandler(CommunicationTimeout);
Bootstrap bootstrap = new Bootstrap()
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(this.ComposeClientChannelInitializer(targetHost, readHandler1));
IChannel clientChannel = await bootstrap.ConnectAsync(this.ServerAddress, protocolGatewayPort);
this.ScheduleCleanup(() => clientChannel.CloseAsync());
Task testWorkTask = Task.Run(async() =>
{ //Where the messaging actually starts and sends
Tuple<EventData, string>[] ehMessages = await CollectEventHubMessagesAsync(receivers, 2); //Async task for recieving messages back from the IoTHub
Tuple<EventData, string> qos0Event = Assert.Single(ehMessages.Where(x => TelemetryQoS0Content.Equals(x.Item2, StringComparison.Ordinal)));
Tuple<EventData, string> qos1Event = Assert.Single(ehMessages.Where(x => TelemetryQoS1Content.Equals(x.Item2, StringComparison.Ordinal)));
string qosPropertyName = ConfigurationManager.AppSettings["QoSPropertyName"];
var qos0Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS0Content));
qos0Notification.Properties[qosPropertyName] = "0";
qos0Notification.Properties["subTopic"] = "tips";
await serviceClient.SendAsync(this.deviceId, qos0Notification);
var qos1Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS1Content));
qos1Notification.Properties["subTopic"] = "firmware-update";
await serviceClient.SendAsync(this.deviceId, qos1Notification);
var qos2Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content));
qos2Notification.Properties[qosPropertyName] = "2";
qos2Notification.Properties["subTopic"] = "critical-alert";
await serviceClient.SendAsync(this.deviceId, qos2Notification);
var qos2Notification2 = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content2));
qos2Notification2.Properties[qosPropertyName] = "2";
await serviceClient.SendAsync(this.deviceId, qos2Notification2);
});
So ist das "ServiceClient" sendet 4 Nachrichten in diesem Gerät zu testen: qos0Notification, qos1Notification, qos2Notification, qos2Notification2 und verwendet eine SendAsync Methode, um die Informationen zu senden.
Die SendAsync Methode ist Teil des Basiscode für die Anwendung und ist ein ideales Ziel, Verfahren nimmt auch eine DeviceId und ein Nachrichtenobjekt nicht verfügbar. Nachricht hat 3 Überladungen für das Objekt: Base, einen Byte-Stream oder ein Byte-Array.
Sobald das Gateway initialisiert sie es Nachrichten mit dieser Methode empfängt:
public override void ChannelRead(IChannelHandlerContext context, object message)
{
var packet = message as Packet;
if (packet == null)
{
CommonEventSource.Log.Warning($"Unexpected message (only `{typeof(Packet).FullName}` descendants are supported): {message}", this.ChannelId);
return;
}
this.lastClientActivityTime = DateTime.UtcNow; // notice last client activity - used in handling disconnects on keep-alive timeout
if (this.IsInState(StateFlags.Connected) || packet.PacketType == PacketType.CONNECT)
{
this.ProcessMessage(context, packet);
}
else
{
if (this.IsInState(StateFlags.ProcessingConnect))
{
Queue<Packet> queue = this.connectPendingQueue ?? (this.connectPendingQueue = new Queue<Packet>(4));
queue.Enqueue(packet);
}
else
{
// we did not start processing CONNECT yet which means we haven't received it yet but the packet of different type has arrived.
ShutdownOnError(context, string.Empty, new InvalidOperationException($"First packet in the session must be CONNECT. Observed: {packet}, channel id: {this.ChannelId}, identity: {this.identity}"));
}
}
}
ich irgendwelche batched Nachrichten das Gefühl, das wäre der beste Ort, auszupacken. Sobald wir eine Liste von Nachrichten haben, senden wir sie an ProcessMessage, um festzustellen, um welche Art von Nachricht es sich handelt und wie sie behandelt wird.
Es scheint, als ob es nicht viele Informationen dafür gibt, da es sehr neu ist.
Was ist Ihre Frage genau? – CSharpRocks