2016-09-01 3 views
0

Ich arbeite an einem Projekt mit Spark und Scala und ich bin neu für beide, aber mit viel Hilfe von Stackoverflow Ich habe die gesamte Datenverarbeitung und gespeichert die verarbeiteten Daten in MySQL. Jetzt habe ich endlich ein Problem und ich verstehe nicht, wie ich damit umgehen soll. Beim ersten Mal, als ich die Daten verarbeitet habe, habe ich den Datenrahmen mit dieser Methode gespeichert und die erste Zeitentabelle ist leer.Aktualisieren von Daten in der Datenbank in Spark mit Scala

Lassen Sie uns sagen, dass meine verarbeiteten Daten in der Datenbank wie folgt aussehen.

 id  name  eid  number_of_visitis last_visit_date 
     1  John  C110  12     2016-01-13 00:00:00 
     2  Root  C111  24     2016-04-27 00:00:00 
     3  Michel  C112  8     2016-07-123 00:00:00 
     4  Jonny  C113  45     2016-06-10 00:00:00 

Jetzt Person mit dem Namen ‚Root‘ mit eid Besuch Büro 2 mal ‚C111‘ auf ‚2016.08.30 00.00.00‘ jetzt nach dieser neuen Datenverarbeitung ich nur diese Person Datensatz aktualisieren müssen Datenbank. Wie ich das machen werde. Jetzt sollte die aktualisierte Tabelle so aussehen.

 id  name  eid  number_of_visitis last_visit_date 
     1  John  C110  12     2016-01-13 00:00:00 
     2  Root  C111  26     2016-08-30 00:00:00 
     3  Michel  C112  8     2016-07-123 00:00:00 
     4  Jonny  C113  45     2016-06-10 00:00:00 

ich Millionen von Daten in dieser Tabelle habe, und wenn ich die volle Tabelle in Funkendatenrahmen laden und den gewünschten Datensatz aktualisieren, dann wird es mehr Zeit in Anspruch nehmen und auch macht es keinen Sinn machen, weil, warum ich die volle Tabelle laden wenn ich nur eine Zeile aktualisieren möchte. Ich habe diesen Code ausprobiert, aber er hat die neue Zeile zur Tabelle hinzugefügt, anstatt die Zeile zu aktualisieren.

 df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 

Gibt es eine Möglichkeit, das in Funken zu tun?

Ich habe dies im Internet gesehen kann ich das für Update tun.

val numParallelInserts = 10 
val batchSize = 1000 

new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) => 
    val db = connect() 

    val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)" 
    val stmt = db.prepareStatement(sql) 

    iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) => 
    batch foreach { session => 
     stmt.setString(1, session.id) 
     stmt.setString(2, TimestampFormat.print(session.ts)) 
     stmt.addBatch() 
    } 
    stmt.executeBatch() 
    db.commit(); 
    logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements") 
    } 

    db.close(); 
+0

haben Sie versucht, mit "überschreiben" Modus? – dsr301

+1

Überschreiben Sie die Tabelle mit nicht exakten Datentypen neu und löschen Sie alle älteren Daten und fügen Sie nur neue verarbeitete Daten ein. –

Antwort

0

Sie können versuchen, mit sql das zu tun. Speichern Sie die aktualisierten (und sogar neuen) Daten in einer neuen temporären Tabelle und führen Sie dann die temporäre Tabelle in die Haupttabelle zusammen.

Eine Möglichkeit, das zu tun ist -

  1. aktualisieren alle Datensätze in der Haupttabelle die temporäre Tabelle mit

    update main_table set visits = main_table.visits + temp_table.visits from temp_table where main_table.eid = temp_table.eid;

  2. Löschen Sie alle doppelte Datensätze aus temporären Tabelle (die Blätter nur neue Datensätze in der temporären Tabelle)

    delete from temp_table where main_table.eid = temp_table.eid;

  3. Legen Sie alle Datensätze aus temporären Tabelle in die Haupttabelle

    insert into main_table select * from temp_table;

  4. Tropfen die temporäre Tabelle

    drop table temp_table;

+0

dies ist auf db Ebene und habe mehr Schritte ich möchte es auf kürzestmögliche Weise tun. Ich habe Millionen von Daten, so dass das Kopieren, Entfernen und Einfügen Zeit braucht. Ich aktualisiere die Frage bitte schau dir das an, magst du verstehen, was ich machen will. –

+0

Ich verwende dies auf einer Rotverschiebung Datenbank mit buchstäblich Milliarden von Zeilen. – Kakaji

Verwandte Themen