2016-02-25 11 views
11

Ich versuche, einige Daten auf MySql mit Spark SQL DataFrames und JDBC-Verbindung einzufügen und zu aktualisieren.SPARK SQL - Update MySql Tabelle mit DataFrames und JDBC

Es ist mir gelungen, neue Daten mit dem SaveMode.Append einzufügen. Gibt es eine Möglichkeit, die bereits in MySql Table vorhandenen Daten von Spark SQL zu aktualisieren?

Mein Code einzufügen ist:

myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)

Wenn ich es löscht die vollständige Tabelle SaveMode.Overwrite ändern und einen neuen erstellt, ich bin auf der Suche nach so etwas wie der „ON DUPLICATE KEY UPDATE "Verfügbar in MySql

Antwort

14

Es ist nicht möglich. Was jetzt (Funke 1.6.0/2.2.0 SNAPSHOT) Funken DataFrameWriter Modi nur vier Schreibvorgänge unterstützt:

  • SaveMode.Overwrite: überschreiben die vorhandenen Daten.
  • SaveMode.Append: fügen Sie die Daten an.
  • SaveMode.Ignore: ignorieren Sie die Operation (d. H. No-Op).
  • SaveMode.ErrorIfExists: Standardoption, eine Ausnahme zur Laufzeit auslösen.

Sie können beispielsweise manuell einfügen mapPartitions mit (da Sie eine UPSERT Operation wollen, sollten idempotent und als solche leicht zu implementieren sein), um temporäre Tabelle und führen Upsert manuell oder verwenden Sie Trigger schreiben.

Im Allgemeinen ist das Erzielen eines Upsert-Verhaltens für Batch-Operationen und die Beibehaltung einer anständigen Leistung alles andere als trivial. Sie müssen daran denken, dass im Allgemeinen mehrere gleichzeitige Transaktionen vorhanden sind (eine pro Partition). Sie müssen also sicherstellen, dass keine Schreibkonflikte auftreten (in der Regel durch anwendungsspezifische Partitionierung) oder geeignete Wiederherstellungsverfahren bereitstellen. In der Praxis kann es besser sein, Schreibvorgänge in einer temporären Tabelle auszuführen und stapelweise zu verarbeiten und den Upsert-Teil direkt in der Datenbank aufzulösen.

0

zero323 Antwort ist richtig, ich wollte nur hinzufügen, dass Sie JayDeBeApi Paket, dies zu umgehen verwenden: https://pypi.python.org/pypi/JayDeBeApi/

auf Daten in Ihrer MySQL-Tabelle aktualisieren. Es könnte eine tiefhängende Frucht sein, da Sie bereits den mysql jdbc-Treiber installiert haben.

Das JayDeBeApi-Modul ermöglicht die Verbindung von Python-Code zu Datenbanken mit Java JDBC. Es bietet eine Python DB-API v2.0 für diese Datenbank.

Wir verwenden Anaconda-Distribution von Python, und JayDeBeApi Python-Paket kommt Standard.

Siehe Beispiele in diesem Link oben.

0

Ein schade, dass es in Spark keinen SaveMode.Upsert Modus für so recht häufige Fälle wie Upsert gibt.

Zero322 ist im Allgemeinen richtig, aber ich denke, es sollte möglich sein (mit Kompromissen in der Leistung), eine solche Funktion zu ersetzen.

Ich wollte auch einige Java-Code für diesen Fall bereitstellen. Natürlich ist es nicht so leistungsfähig wie der eingebaute Funken, aber es sollte eine gute Basis für Ihre Anforderungen sein. Ändern Sie es einfach an Ihre Bedürfnisse:

myDF.repartition(20); //one connection per partition, see below 

myDF.foreachPartition((Iterator<Row> t) -> { 
      Connection conn = DriverManager.getConnection(
        Constants.DB_JDBC_CONN, 
        Constants.DB_JDBC_USER, 
        Constants.DB_JDBC_PASS); 

      conn.setAutoCommit(true); 
      Statement statement = conn.createStatement(); 

      final int batchSize = 100000; 
      int i = 0; 
      while (t.hasNext()) { 
       Row row = t.next(); 
       try { 
        // better than REPLACE INTO, less cycles 
        statement.addBatch(("INSERT INTO mytable " + "VALUES (" 
          + "'" + row.getAs("_id") + "', 
          + "'" + row.getStruct(1).get(0) + "' 
          + "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';")); 
        //conn.commit(); 

        if (++i % batchSize == 0) { 
         statement.executeBatch(); 
        } 
       } catch (SQLIntegrityConstraintViolationException e) { 
        //should not occur, nevertheless 
        //conn.commit(); 
       } catch (SQLException e) { 
        e.printStackTrace(); 
       } finally { 
        //conn.commit(); 
        statement.executeBatch(); 
       } 
      } 
      int[] ret = statement.executeBatch(); 

      System.out.println("Ret val: " + Arrays.toString(ret)); 
      System.out.println("Update count: " + statement.getUpdateCount()); 
      conn.commit(); 

      statement.close(); 
      conn.close();