0

Ich lese mehrere HTML-Dateien in einen Datenrahmen in Spark. Ich Elemente der HTML-Spalten in dem Datenrahmen mit einem benutzerdefinierten UDFHinzufügen von Bereichsvariablen pro Zeileniteration in Apache Spark

val dataset = spark 
    .sparkContext 
    .wholeTextFiles(inputPath) 
    .toDF("filepath", "filecontent") 
    .withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent)) 
    .withColumn("biz_website", parseDocValue(".biz-website a")('filecontent)) 

    ... 

    def parseDocValue(cssSelectorQuery: String) = 
    udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text()) 

Umwandlung, die perfekt funktioniert, aber jeder withColumn Anruf wird in der Parsen der HTML-Zeichenfolge führen, was überflüssig ist.

Gibt es eine Möglichkeit (ohne Lookup-Tabellen oder so), die ich erzeugen kann 1 analysiert Dokument (Jsoup.parse(html)) auf der Grundlage der „filecontent“ Spalte pro Zeile und dass für alle withColumn Anrufe in dem Datenrahmen zur Verfügung stellen?

Oder sollte ich nicht einmal versuchen, DataFrames zu verwenden und nur RDDs zu verwenden?

+0

Können Sie mit Beispieltextfolgen aktualisieren? –

+0

Ich habe Probleme mit der Nicht-Parallelisierung von 'wholeTextFiles' im Wesentlichen (z. B. 2 Executoren auf 64-Core-Cluster, bevor ich sogar neu partitionieren kann), also werde ich wahrscheinlich das Ganze neu schreiben. Ich werde die Vorschläge aktualisieren und betrachten, wenn ich dieses Problem anpackt. Sorry für die Unannehmlichkeiten –

+0

hast du es gelöst oder etwas anderes? –

Antwort

0

So ist die endgültige Antwort war in der Tat ganz einfach:

Einfach über die Zeilen mappen und dort die Objekte erstellen

def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = { 
    val domObject = document.select(cssSelectorQuery) 

    val domValue = attr match { 
     case Some(a) => domObject.attr(a) 
     case None => domObject.text() 
    } 

    domValue match { 
     case x if x == null || x.isEmpty => None 
     case y => Some(y) 
    } 
    } 

val dataset = spark 
     .sparkContext 
     .wholeTextFiles(inputPath, minPartitions = 265) 
     .map { 
     case (filepath, filecontent) => { 
      implicit val document = Jsoup.parse(filecontent) 

      val customDataJson = docJson(filecontent, customJsonRegex) 


      DataEntry(
      biz_name = docValue(".biz-page-title"), 
      biz_website = docValue(".biz-website a"), 
      url = docValue("meta[property=og:url]", attr = Some("content")), 
      ... 
      filename = Some(fileName(filepath)), 
      fileTimestamp = Some(fileTimestamp(filepath)) 
     ) 
     } 
     } 
     .toDS() 
0

würde ich es wahrscheinlich wie folgt umschreiben, die Analyse und die Auswahl in einem Rutsch zu tun und sie in eine temporäre Spalte setzen:

val dataset = spark 
    .sparkContext 
    .wholeTextFiles(inputPath) 
    .withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent)) 
    .withColumn("biz_name", col("temp")(0)) 
    .withColumn("biz_website", col("temp")(1)) 
    .drop("temp") 

def parseDocValue(cssSelectorQueries: Array[String]) = 
udf((html: String) => { 
    val j = Jsoup.parse(html) 
    cssSelectorQueries.map(query => j.select(query).text())})