2016-05-12 14 views
0

Ich habe ein Problem beim Erstellen eines DataFrame in einer Scala-App, die ich schreibe.Spark 1.6.1: Erstellen von DataFrame aus RDD [Array [Fehler]]

Problem, das ich habe ist, dass die Kompilierung von Scala mit einem Fehler beendet, dass ToDF nicht ein Teil von RDD ist. Ich habe Antworten gesehen, die vorschlagen, die Groß- und Kleinschreibung von Groß- und Kleinschreibung nach der Deklaration von sqlContext zu übernehmen, aber selbst das hat nicht funktioniert.

Dies ist, was ich derzeit haben:

import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql._ 

object ErrorParser { 

    case class Error(time: String, status: String, statusType: String, host: String, message: String) 

    def splitError(line: String) : Array[String] = { 

     var array:Array[String] = new Array[String](5) 

     ... 

     return array 

    } 

    def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = { 

     var filteredErrors = ArrayBuffer[Array[String]]() 

     ... 

     return filteredErrors.toArray 
    } 

    def main(args: Array[String]) { 

     val conf = new SparkConf().setAppName("ErrorParserAPI") 
     val sc = new SparkContext(conf) 

     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     import sqlContext.implicits._ 

     var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*") 
     var errors = logs.filter(line => line.contains("ERROR")) 

     val errors1 = errors.map(line => splitError(line)) 
     val filteredErrors = filterErrors(errors1.collect) 

     val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4))) 
     val filteredRDD = sc.parallelize(dfErrors) 
     var errorDF = filteredRDD.toDF() 

     errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult") 

    } 

} 

ich da wie dies in funken Shell Dinge stapfte ich arbeiten.

Ich habe auch ein paar Antworten zu sehen, die RDD zu einer Instanz von RDD [Row] schlägt vor, ändern und dann

sc.createDataFrame(rdd, scheme) 

verwenden, aber ich kann meinen Kopf nicht umschlingen, wie ich gehen würde um das zu tun.

Jede Hilfe wäre sehr willkommen!

Das ist meine .sbt Datei:

name := "ErrorParserAPI" 
version := "1.0" 
scalaVersion := "2.11.7" 
libraryDependencies ++= Seq(
     "org.apache.spark" % "spark-core_2.10" % "1.6.1", 
     "org.apache.spark" % "spark-sql_2.10" % "1.6.1" 
) 

EDIT: ein Tippfehler

+0

Haben Sie versucht, die Fehlerdefinition Ihrer Fallklasse aus dem Objekt 'ErrorParser' zu entfernen? Ungeachtet des Problems wäre es eine gute Idee, da die innere Klasse den äußeren Geltungsbereich erfasst, was zu Problemen mit der Serialisierung führen kann. Versuchen Sie auch, Code wie diesen zu vermeiden 'filterErrors (errors1.collect)' - es ist besser, 'filterErrors' in Form eines Prädikats zu schreiben und es wie' errors'.filter (isError) ' –

+0

zu verwenden, danke für das Heads-up auf der filterErrors implementation, wird versuchen, Fallklasse zu verschieben und zu sehen, wie es geht – Xranna

+0

traurig Fallklasse außerhalb des Objekts verschoben wurde das Problem nicht gelöst @VitaliyKotlyarenko – Xranna

Antwort

0

ich Ihren Code einfach kopiert und in meiner Finsternis eingefügt und seine adaequat ohne Kompilierungsfehler. Wenn Sie Eclipse verwenden, können Sie versuchen, Ihr Projekt zu bereinigen und zu aktualisieren.

import scala.Array.canBuildFrom 
import scala.collection.mutable.ArrayBuffer 
import scala.reflect.runtime.universe 

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

object ErrorParser { 


    def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = { 

    var filteredErrors = ArrayBuffer[Array[String]]() 

    return filteredErrors.toArray 
    } 

    def main(args: Array[String]) { 



    val conf = new SparkConf().setAppName("ErrorParserAPI") 
    val sc = new SparkContext(conf) 

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*") 
    var errors = logs.filter(line => line.contains("ERROR")) 

    val errors1 = errors.map(line => splitError(line)) 
    val filteredErrors = filterErrors(errors1.collect) 

    val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4))) 
    val filteredRDD = sc.parallelize(dfErrors) 
    var errorDF = filteredRDD.toDF() 
    } 

    case class Error(time: String, status: String, statusType: String, host: String, message: String) 

    def splitError(line: String): Array[String] = { 

    var array: Array[String] = new Array[String](5) 

    return array 

    } 
} 
+0

Erstellen von .jar-Datei in der Eclipse und dann die Bereitstellung es zu Funken gearbeitet. Ich weiß immer noch nicht, was das Kernproblem war, aber ich werde es als eine Antwort für jetzt markieren. – Xranna

Verwandte Themen