2016-11-09 1 views
-1

Ich bin neu bei Scala und Spark und versuche, auf einige Beispiele zu bauen, die ich gefunden habe. Im Wesentlichen versuche ich, eine Funktion innerhalb eines Datenrahmens zu rufen, um Staat von der Postleitzahl zu erhalten, die Google API verwendet. Ich habe den Code, der separat aber nicht zusammenarbeitet; ( ) Hier ist das Stück Code, der nicht funktioniert ...Spark DF: Schema für Typ Einheit wird nicht unterstützt

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:716) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:654) 
    at org.apache.spark.sql.functions$.udf(functions.scala:2837) 
    at MovieRatings$.getstate(MovieRatings.scala:51) 
    at MovieRatings$$anonfun$4.apply(MovieRatings.scala:48) 
    at MovieRatings$$anonfun$4.apply(MovieRatings.scala:47)... 
Line 51 starts with def getstate = udf {(zipcode:String)... 
... 

Code:..

userDF.createOrReplaceTempView("Users") 
    // SQL statements can be run by using the sql methods provided by Spark 
    val zipcodesDF = spark.sql("SELECT distinct zipcode, zipcode as state FROM Users") 
// zipcodesDF.map(zipcodes => "zipcode: " + zipcodes.getAs[String]("zipcode") + getstate(zipcodes.getAs[String]("zipcode"))).show() 
    val colNames = zipcodesDF.columns 
val cols = colNames.map(cName => zipcodesDF.col(cName)) 
val theColumn = zipcodesDF("state") 
val mappedCols = cols.map(c => 
    if (c.toString() == theColumn.toString()) getstate(c).as("transformed") else c) 
    val newDF = zipcodesDF.select(mappedCols:_*).show() 
    } 
def getstate = udf {(zipcode:String) => { 
val url = "http://maps.googleapis.com/maps/api/geocode/json?address="+zipcode 
val result = scala.io.Source.fromURL(url).mkString 
val address = parse(result) 
val shortnames = for { 
     JObject(address_components) <- address 
     JField("short_name", short_name) <- address_components 
      } yield short_name 
val state = shortnames(3) 
//return state.toString() 
val stater = state.toString() 

} 
    } 
+0

Ihre 'UDF' gibt nichts zurück, weil Sie den' return state.toString() 'Teil auskommentiert haben. – cheseaux

Antwort

0

Danke für die Antworten .. ich glaube, ich es herausgefunden Hier ist der Code, das funktioniert Eine Sache zu beachten ist Google API Einschränkung so einige gültige Postleitzahlen hat habe keine Statusinformationen .. kein Problem für mich.

  private def loaduserdata(spark: SparkSession): Unit = { 
      import spark.implicits._ 
      // Create an RDD of User objects from a text file, convert it to a Dataframe 
      val userDF = spark.sparkContext 
      .textFile("examples/src/main/resources/users.csv") 
      .map(_.split("::")) 
      .map(attributes => users(attributes(0).trim.toInt, attributes(1), attributes(2).trim.toInt, attributes(3), attributes(4))) 
      .toDF() 
      // Register the DataFrame as a temporary view 
      userDF.createOrReplaceTempView("Users") 
      // SQL statements can be run by using the sql methods provided by Spark 
     val zipcodesDF = spark.sql("SELECT distinct zipcode, substr(zipcode,1,5) as state FROM Users ORDER BY zipcode desc") // zipcodesDF.map(zipcodes => "zipcode: " + zipcodes.getAs[String]("zipcode") + getstate(zipcodes.getAs[String]("zipcode"))).show() 
     val colNames = zipcodesDF.columns 
     val cols = colNames.map(cName => zipcodesDF.col(cName)) 
     val theColumn = zipcodesDF("state") 
     val mappedCols = cols.map(c => 
     if (c.toString() == theColumn.toString()) getstate(c).as("state") else c) 
     val geoDF = zipcodesDF.select(mappedCols:_*)//.show() 
     geoDF.createOrReplaceTempView("Geo") 
     } 
     val getstate = udf {(zipcode: String) => 
      val url = "http://maps.googleapis.com/maps/api/geocode/json?address="+zipcode 
      val result = scala.io.Source.fromURL(url).mkString 
      val address = parse(result) 
      val statenm = for { 
         JObject(statename) <- address 
         JField("types", JArray(types)) <- statename 
         JField("short_name", JString(short_name)) <- statename 
        if types.toString().equals("List(JString(administrative_area_level_1), JString(political))") 
        // if types.head.equals("JString(administrative_area_level_1)") 
        } yield short_name 
      val str = if (statenm.isEmpty.toString().equals("true")) "N/A" else statenm.head   
      } 
+0

Wenn Sie interessiert sind, finden Sie diese Daten auf Tableau Public: https://public.tableau.com/views/MovieRatings_7/Geo?:embed=y&:display_count=yes –

Verwandte Themen