2016-12-09 2 views
-1

Ich versuche, den Beispielbetrieb unter Dataramezu betreiben, wobei Klausel.Ausnahmebedingung während der Verwendung von Dataframe where-Klausel

Hier ist mein Beispieltabellendaten:

address  district 
hyderabad  001 
delhi   002 
mumbai   003 

Jetzt muss ich Adresse auszuwerten, max (Kreis) von Datenrahmen verwenden.

Das Ergebnis wird sein, wie:

mumbai 003

Abhilfe:

dies der Code ist, was ich .. bisher versucht,

SparkConf conf = new SparkConf(); 
     conf.set("spark.app.name", "max"); 
     conf.set("spark.master", "local"); 
     conf.set("spark.ui.port", "7077"); 

     SparkContext ctx=new SparkContext(conf);  
     SQLContext sqlContext = new SQLContext(ctx); 
     DataFrame df = sqlContext.read() 
      .format("com.databricks.spark.csv") 
      .option("inferSchema", "true") 
      .option("header", "true") 
      .load("/Users/hadoop/Downloads/SacramentocrimeJanuary2006.csv"); 
     //df.registerTempTable("consumer"); 
     //Row[] result = df.orderBy("cdatetime").select("cdatetime","address").collect(); 
     //DataFrame a = df.select("address","district").agg(functions.count("district"),functions.col("address")).orderBy("address"); 
     DataFrame b =df.select("address","district").where("district=max(district)"); 
     b.show(); 
     } 

Hier ist mein Ausnahme:

Cannot evaluate expression: (max(input[1, IntegerType]),mode=Complete,isDistinct=false) 
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.genCode(Expression.scala:233) 
    at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.genCode(interfaces.scala:73) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:419) 
    at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:401) 
    at org.apache.spark.sql.catalyst.expressions.EqualTo.genCode(predicates.scala:379) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:42) 
    at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:33) 
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:635) 
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:632) 
    at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:242) 
    at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:71) 
    at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:70) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/12/09 10:50:57 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) 
+0

Sie können die, wo nicht („district = max (Kreis)“) Dieser Ausdruck erforderlich, um eine Spalte Sting und es kann der Wert nicht den Weg weisen Sie auszuführen versuchen .Wenn Sie gehen das offizielle Dokument .. peopleDs.filter ($ "age"> 15) peopleDs.where ($ "age"> 15), peopleDs.where ("Alter> 15"). Es kann den Wert nicht zuweisen. –

Antwort

0

sollten Sie verwenden „Aggregation“ und „Join“ Ihr problem.like zu lösen diese:

data.agg(max($"district").as("maxd")).as("d1").join(data.as("d2"), $"d1.maxd" === $"d2.district").select($"address",$"district").show() 

die Daten Ihrer DataFrame.It hilfreich sein wird für Sie

0

Sie sortieren auf Funktion nutzen zu können Datenrahmen & bestellen Sie als absteigend. Dann verwenden Sie einfach die Kopffunktion, um die gewünschte Ausgabe zu erhalten.

Hier ist Codebeispiel.

import org.apache.spark.sql.functions._ 
val df = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load("/user/userapp/sample.csv"); 
val a = df.sort(desc("district")).head 

hier ist ein Ausgang.

enter image description here

Verwandte Themen