2015-09-16 5 views
6

Ich versuche herauszufinden, wie man mit dem neuen DataFrameWriter Daten in eine JDBC-Datenbank zurückschreibt. Ich kann keine Dokumentation dafür finden, obwohl es scheint, dass es möglich sein sollte, den Quellcode anzuschauen.Speichern/Exportieren von transformiertem DataFrame zurück zu JDBC/MySQL

Ein triviales Beispiel dafür, was ich wie folgt aussieht versuchen:

sqlContext.read.format("jdbc").options(Map(
    "url" -> "jdbc:mysql://localhost/foo", "dbtable" -> "foo.bar") 
).select("some_column", "another_column") 
.write.format("jdbc").options(Map(
    "url" -> "jdbc:mysql://localhost/foo", "dbtable" -> "foo.bar2") 
).save("foo.bar2") 

Dies funktioniert nicht - ich mit diesem Fehler am Ende:

java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select. 
    at scala.sys.package$.error(package.scala:27) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:200) 

Ich bin mir nicht sicher, ob Ich mache etwas falsch (warum löst es zum Beispiel auf DefaultSource anstatt auf JDBCRDD auf?) Oder wenn das Schreiben in eine bestehende MySQL-Datenbank einfach nicht möglich ist, indem man Sparks DataFrames API benutzt.

+1

Es ist nicht die Ursache des Problems, aber Ihr Beispiel fehlt Ladebefehl. – zero323

Antwort

6

aktualisieren

Aktuelle Spark-Version (2.0 oder höher) unterstützt das Erstellen von Tabellen auf Schreib.

Die ursprüngliche Antwort

Es ist möglich, eine vorhandene Tabelle zu schreiben, aber es sieht aus wie in diesem Moment (Spark-1.5.0) Erstellen Tabelle JDBC-Datenquelle noch * nicht unterstützt werden. Sie können SPARK-7646 als Referenz überprüfen.

Wenn Tabelle bereits vorhanden Sie einfach DataFrameWriter.jdbc Methode verwenden können:

val prop: java.util.Properties = ??? 
df.write.jdbc("jdbc:mysql://localhost/foo", "foo.bar2", prop) 

* Interessant PySpark ist, scheint das Erstellen von Tabellen mit jdbc Methode zu unterstützen.

+2

Danke, das hat mit dem Hinzufügen eines Speichermodus perfekt funktioniert; h. 'df.write.mode (SaveMode.Overwrite) .jdbc (" jdbc: mysql: // localhost/foo "," foo.bar2 ", prop)' –

Verwandte Themen