2017-11-03 5 views
0

Ich arbeite an einem Echtzeit-Daten-Streaming-Projekt zum Parsen und Speichern der Daten jede N-te Minute. Mein Ziel ist es, die allererste Minute der Daten (als Puffer) wegzuwerfen und alle 4 Minuten Daten vom Server zu speichern. Die Daten werden dann zu anderen Funktionen geclustert, um sie zu gruppieren und zu berechnen (Funktionen, die hier nicht enthalten sind).wie zu speichern und zu analysieren Echtzeit-Streaming-Daten in MQT Paho für jede N-ten Minuten (Python)

Ich habe die Bedingung bei 'on_message' Funktion und Daten parse es innerhalb dieser Funktion. Ich denke nicht, dass meine Strukturierung und Berufung der richtige Weg ist, um mein Ziel zu erreichen. Bitte lassen Sie mich wissen, wenn Sie weitere Details benötigen.

ON_MESSAGE

def on_message(r_c_client, userdata, message): 

    if (message.topic == "scanning"): 

    c = datetime.now().time() 
    current = (c.hour * 60 + c.minute) * 60 + c.second 

    time.sleep(60) #initial delay 

    data = json.loads(message.payload.decode("utf-8")) 
    x = data['host'] 
    y = data['data'] 

    hostList = store(x, y) 

    while (current>=total_Time): 
     #time.sleep(60) #initial delay 


     nodeList = listToDf(hostList) 


     nodeDf= df_reformat(nodeList) 
     print clustering_results_reformat(process_startTime, nodeDf) 

Speicherfunktion

def store(host, data): 





    if host in hostList: 
     hostList[host].append(data) 

    else: 
     hostList[host] = [data] 

    return hostList 

Haupt

global process_startTime 

t = datetime.now().time() 

process_startTime = (t.hour * 60 + t.minute) * 60 + t.second 

total_Time = process_startTime + 300 #4 minutes + 1 minute 

print t , process_startTime 

broker_address = '10.10.0.100' 
c_client = mqtt.Client("trilateration") 
c_client.on_connect = on_connect 


c_client.on_message = on_message 
c_client.on_subscribe = on_subscribe 


c_client.connect(broker_address, 1883) 

c_client.loop_forever() 

Antwort

0

Als erstes sollten Sie nie (Schlaf) in der on_message Funktion blockieren, wird diese Funktion für jeden aufgerufen Nachricht, die empfangen wird, wenn Sie dann schlafen, muss das System diese Zeitspanne von voro warten Weiter zur nächsten Nachricht.

Als nächstes müssen Sie die Startzeit außerhalb der on_message Funktion verfolgen, Sie können dann die aktuelle Zeit mit diesem Wert für jede Nachricht vergleichen und entscheiden, ob Sie sie behalten/verarbeiten wollen oder nicht.

def on_message(r_c_client, userdata, message): 
    global process_startTime 

    if (message.topic == "scanning"): 
    c = datetime.now().time() 
    current = (c.hour * 60 + c.minute) * 60 + c.second 

    if (current<=total_Time and current>=(process_startTime + 60)): 
    data = json.loads(message.payload.decode("utf-8")) 
    x = data['host'] 
    y = data['data'] 

    hostList = store(x, y) 

Der Haupt sollte wie folgt aussehen:

global process_startTime 

t = datetime.now().time() 

process_startTime = (t.hour * 60 + t.minute) * 60 + t.second 
total_Time = process_startTime + 300 #4 minutes + 1 minute 
print t , process_startTime 

broker_address = '10.10.0.100' 
c_client = mqtt.Client("trilateration") 
c_client.on_connect = on_connect 

c_client.on_message = on_message 
c_client.on_subscribe = on_subscribe 
c_client.connect(broker_address, 1883) 

while (True): 
    c_client.loop() 
    c = datetime.now().time() 
    current = (c.hour * 60 + c.minute) * 60 + c.second 
    if (current >= total_Time): 
    nodeList = listToDf(hostList) 
    nodeDf= df_reformat(nodeList) 
    print clustering_results_reformat(process_startTime, nodeDf) 
    time.sleep(1) 
+0

Ich kann nur rufen die Funktion nach 4 Minuten zu bearbeiten, Ihre Bedingung meine Funktion aufrufen, bevor die Daten über 4 Minuten hat passieren. nach 4 Minuten ruft er meine Funktionen an und ruft alle 4 Minuten erneut an. – jayen

+0

Das ist nicht klar aus Ihrer Frage und der Code, den Sie gepostet haben, macht dasselbe. – hardillb

+0

Ich habe die Antwort bearbeitet, es ist nicht 100% richtig, aber es sollte Ihnen genug von einem Punkt in die richtige Richtung geben. – hardillb

Verwandte Themen