Ich arbeite an einem Projekt mit Spark und Scala und ich bin neu für beide, aber mit viel Hilfe von Stackoverflow Ich habe die gesamte Datenverarbeitung und gespeichert die verarbeiteten Daten in MySQL. Jetzt habe ich endlich ein Problem und ich verstehe nicht, wie ich damit umgehen soll. Beim ersten Mal, als ich die Daten verarbeitet habe, habe ich den Datenrahmen mit dieser Methode gespeichert und die erste Zeitentabelle ist leer.Aktualisieren von Daten in der Datenbank in Spark mit Scala
Lassen Sie uns sagen, dass meine verarbeiteten Daten in der Datenbank wie folgt aussehen.
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 24 2016-04-27 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
Jetzt Person mit dem Namen ‚Root‘ mit eid Besuch Büro 2 mal ‚C111‘ auf ‚2016.08.30 00.00.00‘ jetzt nach dieser neuen Datenverarbeitung ich nur diese Person Datensatz aktualisieren müssen Datenbank. Wie ich das machen werde. Jetzt sollte die aktualisierte Tabelle so aussehen.
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 26 2016-08-30 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
ich Millionen von Daten in dieser Tabelle habe, und wenn ich die volle Tabelle in Funkendatenrahmen laden und den gewünschten Datensatz aktualisieren, dann wird es mehr Zeit in Anspruch nehmen und auch macht es keinen Sinn machen, weil, warum ich die volle Tabelle laden wenn ich nur eine Zeile aktualisieren möchte. Ich habe diesen Code ausprobiert, aber er hat die neue Zeile zur Tabelle hinzugefügt, anstatt die Zeile zu aktualisieren.
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
Gibt es eine Möglichkeit, das in Funken zu tun?
Ich habe dies im Internet gesehen kann ich das für Update tun.
val numParallelInserts = 10
val batchSize = 1000
new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
val db = connect()
val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
val stmt = db.prepareStatement(sql)
iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
batch foreach { session =>
stmt.setString(1, session.id)
stmt.setString(2, TimestampFormat.print(session.ts))
stmt.addBatch()
}
stmt.executeBatch()
db.commit();
logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
}
db.close();
haben Sie versucht, mit "überschreiben" Modus? – dsr301
Überschreiben Sie die Tabelle mit nicht exakten Datentypen neu und löschen Sie alle älteren Daten und fügen Sie nur neue verarbeitete Daten ein. –