2017-04-15 2 views
0

Bitte werfen Sie einen Blick auf diesen Code unten. Ich erhalte einen Fehler für den folgenden Code, wenn ich den Wert für die Anzahl der Partitionen übergebe.Partitionierung unvollständig angegebenen Fehler in meiner Spark-Anwendung

 def loadDataFromPostgress(sqlContext: SQLContext, tableName: String, 
     columnName: String, dbURL: String, userName: String, pwd: String, 
     partitions: String): DataFrame = { 
     println("the no of partitions are : "+partitions) 
     var dataDF = sqlContext.read.format("jdbc").options(
     scala.collection.Map("url" -> dbURL, 
          "dbtable" -> tableName, 
         "driver" -> "org.postgresql.Driver", 
        "user" -> userName, 
       "password" -> pwd, 
        "partitionColumn" -> columnName, 
       "numPartitions" -> "1000")).load() 
       return dataDF 
         } 

Fehler:

   java.lang.RuntimeException: Partitioning incompletely specified 
       App > at scala.sys.package$.error(package.scala:27) 
       App > at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:38) 
       App > at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:315) 
       App > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149) 
    App > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122) 
       App > at Test$.loadDataFromGreenPlum(script.scala:28) 
       App > at Test$.loadDataFrame(script.scala:15) 
       App > at Test$.main(script.scala:59) 
       App > at Test.main(script.scala) 
       App > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
        Method) 
       App > at 

Antwort

3

Sie können Code unten überprüfen, wie genau Sie verwenden können.

def loadDataFromPostgress(sqlContext: SQLContext, tableName: String, 
          columnName: String, dbURL: String, userName: String, 
          pwd: String, partitions: String): DataFrame = { 
    println("the no of partitions are : " + partitions) 
    var dataDF = sqlContext.read.format("jdbc").options(
     scala.collection.Map("url" -> dbURL, 
     "dbtable" -> "(select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp) as t", 
     "driver" -> "org.postgresql.Driver", 
     "user" -> userName, 
     "password" -> pwd, 
     "partitionColumn" -> hash_code, 
     "lowerBound" -> 0, 
     "upperBound" -> 10 
    "numPartitions" -> "10" 
    )).load() 
    return dataDF 
    } 

Der obige Code erstellt 10 Aufgaben mit 10 Abfragen, wie unten gezeigt. Davor Job

offset = (upperBound-lowerBound)/numPartitions

Hier offset = (10-0)/10 = 1

select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp where hash_code between 0 between 1 
select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp where hash_code between 1 between 2 
. 
. 
select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp where hash_code between 9 between 10 

Diese schaffen 10 Partitionen und

empid endet mit 0 gehen werden immer herausfinden, wird eine Partition als mod (empid, 10) gleich 0

empid endet mit 1 wird eine Partition gehen als mod (empid, 10) immer gleich 1

so werden alle Angestelltenzeilen in 10 Partitionen gespuckt.

Sie müssen die Werte partitionSpalte, obererBund, untererBund, numPartitionen entsprechend Ihren Anforderungen ändern.

Ich hoffe, meine Antwort hilft Ihnen.

0

Partitioning erfordert:

  • Partitionierungsspalte (integer).
  • Anzahl der Spalten
  • niedriger für die

Die letzten beiden fehlen,

  • obere Schranke für die Spalte Säule gebunden, und das ist, warum Sie den Fehler.