Ich habe mehrere Themen mit Paho mqtt Client abonniert. Beim Empfang der Nachrichten vom Broker möchte ich die Nachrichten in der MySQL-Datenbank speichern. Ich möchte die Nachrichten zusammen vor dem Einfügen in DB sammeln. Ich habe den Schwellenwert sagen 1000 Nachrichten. Erst wenn der Schwellenwert erreicht ist, müssen die Nachrichten gleichzeitig in die DB eingefügt werden. Ich überprüfe die row_count nach cursor.execute(). Aber es zeigt die Zählung als 1. Also ist die Masseneinfügung nicht passiert. hier ist die Code-Schnipsel meiner ProbePython: MQTT Broker Nachrichten Bulk einfügen in MySQL-Datenbank
//main.py
#mysql database class
db = MySQLDBClass()
#mqtt client class where subscription,connection to broker,some callbacks
mqttclient = MyMQTTClient()
mqttclient.on_message = db.onMessage
mqttclient.loop_forever()
//MySQLDBClass.py
def __init__(self):
self.insertcounter = 0
self.insertStatement = ''
self.bulkpayload = ''
self.maxInsert = 1000
def onMessage(self, client, userdata, msg):
if msg.topic.startswith("topic1/"):
self.bulkpayload += "(" + msg.payload.decode("utf-8") + "," + datetime + "),"
elif msg.topic.startswith("topic2/"):
self.insertStatement += "INSERT INTO mydatabase.table1 VALUES (" + msg.payload.decode("utf-8") + "," + datetime + ");"
elif msg.topic.startswith("topic3/")
self.insertStatement += "INSERT INTO mydatabase.table2 VALUES (" +msg.payload.decode("utf-8") + "," + datetime + ");"
elif msg.topic.startswith("messages"):
self.insertStatement += "INSERT INTO mydatabase.table3 VALUES ('" + msg.topic + "'," + msg.payload.decode("utf-8") + "," + datetime + ");"
else:
return # do not store in DB
self.insertcounter += 1
if (self.insertcounter > self.maxInsert):
if (self.bulkpayload != ''):
self.insertStatement += "INSERT INTO mydatabase.table4 VALUES" + self.bulkpayload + ";"
self.bulkpayload = ''
cursor.execute(self.insertStatement)
cursor.commit()
print (cursor.rowcount) #prints always count as one , expecting bulk count
self.insertcounter = 0
self.insertStatement = ''