Ich habe ein Problem beim Erstellen eines DataFrame in einer Scala-App, die ich schreibe.Spark 1.6.1: Erstellen von DataFrame aus RDD [Array [Fehler]]
Problem, das ich habe ist, dass die Kompilierung von Scala mit einem Fehler beendet, dass ToDF nicht ein Teil von RDD ist. Ich habe Antworten gesehen, die vorschlagen, die Groß- und Kleinschreibung von Groß- und Kleinschreibung nach der Deklaration von sqlContext zu übernehmen, aber selbst das hat nicht funktioniert.
Dies ist, was ich derzeit haben:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object ErrorParser {
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String) : Array[String] = {
var array:Array[String] = new Array[String](5)
...
return array
}
def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
...
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult")
}
}
ich da wie dies in funken Shell Dinge stapfte ich arbeiten.
Ich habe auch ein paar Antworten zu sehen, die RDD zu einer Instanz von RDD [Row] schlägt vor, ändern und dann
sc.createDataFrame(rdd, scheme)
verwenden, aber ich kann meinen Kopf nicht umschlingen, wie ich gehen würde um das zu tun.
Jede Hilfe wäre sehr willkommen!
Das ist meine .sbt Datei:
name := "ErrorParserAPI"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.10" % "1.6.1",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)
EDIT: ein Tippfehler
Haben Sie versucht, die Fehlerdefinition Ihrer Fallklasse aus dem Objekt 'ErrorParser' zu entfernen? Ungeachtet des Problems wäre es eine gute Idee, da die innere Klasse den äußeren Geltungsbereich erfasst, was zu Problemen mit der Serialisierung führen kann. Versuchen Sie auch, Code wie diesen zu vermeiden 'filterErrors (errors1.collect)' - es ist besser, 'filterErrors' in Form eines Prädikats zu schreiben und es wie' errors'.filter (isError) ' –
zu verwenden, danke für das Heads-up auf der filterErrors implementation, wird versuchen, Fallklasse zu verschieben und zu sehen, wie es geht – Xranna
traurig Fallklasse außerhalb des Objekts verschoben wurde das Problem nicht gelöst @VitaliyKotlyarenko – Xranna