2017-11-22 4 views
0

Ich habe mehrere Themen mit Paho mqtt Client abonniert. Beim Empfang der Nachrichten vom Broker möchte ich die Nachrichten in der MySQL-Datenbank speichern. Ich möchte die Nachrichten zusammen vor dem Einfügen in DB sammeln. Ich habe den Schwellenwert sagen 1000 Nachrichten. Erst wenn der Schwellenwert erreicht ist, müssen die Nachrichten gleichzeitig in die DB eingefügt werden. Ich überprüfe die row_count nach cursor.execute(). Aber es zeigt die Zählung als 1. Also ist die Masseneinfügung nicht passiert. hier ist die Code-Schnipsel meiner ProbePython: MQTT Broker Nachrichten Bulk einfügen in MySQL-Datenbank

//main.py 

#mysql database class 
db = MySQLDBClass() 

#mqtt client class where subscription,connection to broker,some callbacks 
mqttclient = MyMQTTClient() 
mqttclient.on_message = db.onMessage 
mqttclient.loop_forever() 

//MySQLDBClass.py 

def __init__(self): 

     self.insertcounter = 0 
     self.insertStatement = '' 
     self.bulkpayload = '' 
     self.maxInsert = 1000 

    def onMessage(self, client, userdata, msg): 

     if msg.topic.startswith("topic1/"): 
      self.bulkpayload += "(" + msg.payload.decode("utf-8") + "," + datetime + ")," 
     elif msg.topic.startswith("topic2/"): 
      self.insertStatement += "INSERT INTO mydatabase.table1 VALUES (" + msg.payload.decode("utf-8") + "," + datetime + ");" 
     elif msg.topic.startswith("topic3/") 
      self.insertStatement += "INSERT INTO mydatabase.table2 VALUES (" +msg.payload.decode("utf-8") + "," + datetime + ");" 
     elif msg.topic.startswith("messages"): 
      self.insertStatement += "INSERT INTO mydatabase.table3 VALUES ('" + msg.topic + "'," + msg.payload.decode("utf-8") + "," + datetime + ");" 
     else: 
      return # do not store in DB 

     self.insertcounter += 1 

     if (self.insertcounter > self.maxInsert): 
      if (self.bulkpayload != ''): 
       self.insertStatement += "INSERT INTO mydatabase.table4 VALUES" + self.bulkpayload + ";"  
       self.bulkpayload = '' 

      cursor.execute(self.insertStatement) 
      cursor.commit() 
      print (cursor.rowcount) #prints always count as one , expecting bulk count 
      self.insertcounter = 0 
      self.insertStatement = '' 

Antwort