2017-12-21 7 views
1

Innerhalb dieses Codes haben wir zwei Dateien: athletes.csv, die Namen enthält, und twitter.test, die die Tweet-Nachricht enthält. Wir möchten für jede einzelne Zeile im twitter.test einen Namen finden, der mit dem Namen in athleten.csv übereinstimmt. Wir haben eine Kartenfunktion verwendet, um den Namen von athleters.csv zu speichern und den gesamten Namen auf alle Zeilen des Tests zu übertragen Datei.Looping durch Karte Spark Scala

object twitterAthlete { 

    def loadAthleteNames() : Map[String, String] = { 

    // Handle character encoding issues: 
    implicit val codec = Codec("UTF-8") 
    codec.onMalformedInput(CodingErrorAction.REPLACE) 
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE) 

    // Create a Map of Ints to Strings, and populate it from u.item. 
    var athleteInfo:Map[String, String] = Map() 
    //var movieNames:Map[Int, String] = Map() 
    val lines = Source.fromFile("../athletes.csv").getLines() 
    for (line <- lines) { 
     var fields = line.split(',') 
     if (fields.length > 1) { 
     athleteInfo += (fields(1) -> fields(7)) 
     } 
    } 

    return athleteInfo 
    } 

    def parseLine(line:String): (String)= { 
    var athleteInfo = loadAthleteNames() 
    var hello = new String 
    for((k,v) <- athleteInfo){ 
     if(line.toString().contains(k)){ 
     hello = k 
     } 
    } 
    return (hello) 
    } 


    def main(args: Array[String]){ 
    Logger.getLogger("org").setLevel(Level.ERROR) 

    val sc = new SparkContext("local[*]", "twitterAthlete") 

    val lines = sc.textFile("../twitter.test") 
    var athleteInfo = loadAthleteNames() 

    val splitting = lines.map(x => x.split(";")).map(x => if(x.length == 4 && x(2).length <= 140)x(2)) 

    var hello = new String() 
    val container = splitting.map(x => for((key,value) <- athleteInfo)if(x.toString().contains(key)){key}).cache 


    container.collect().foreach(println) 

    // val mapping = container.map(x => (x,1)).reduceByKey(_+_) 
    //mapping.collect().foreach(println) 
    } 
} 

die erste Datei wie folgt aussehen:

id,name,nationality,sex,height........ 
001,Michael,USA,male,1.96 ... 
002,Json,GBR,male,1.76 .... 
003,Martin,female,1.73 . ... 

die zweite Datei aussehen mag:

time, id , tweet ..... 
12:00, 03043, some message that contain some athletes names , ..... 
02:00, 03023, some message that contain some athletes names , ..... 

einige so denkt ...

aber ich habe leeres Ergebnis nach Wenn Sie diesen Code ausführen, werden alle Vorschläge sehr geschätzt

i

Ergebnis bekam leer ist:

().... 
()... 
()... 

aber das Ergebnis, dass ich so etwas wie erwartet:

(name,1) 
(other name,1) 
+0

Können Sie eine Probe der beiden Dateien und Ihre erwartete Ausgabe veröffentlichen? – philantrovert

+0

Just edited question pls look thanks – amprie286

+0

Können Sie versuchen, 'yield key' anstatt nur' key' mit Ihrer for-Schleife zu verwenden? – philantrovert

Antwort

1

Sie müssen yield verwenden Wert zurückgeben zu Ihrem map

val container = splitting.map(x => for((key,value) <- athleteInfo ; if(x.toString().contains(key))) yield (key, 1)).cache 
+1

danke für die Antwort es funktioniert – amprie286

1

Ich denke, Sie sollten nur mit einfachsten Option beginnen zuerst ...

I würde DataFrames verwenden, so dass Sie das integrierte CSV-Parsing nutzen und Catalyst, Tungsten, etc. nutzen können.

Dann können Sie das integrierte To verwenden Kenizer, um die Tweets in Wörter zu teilen, zu explodieren und einen einfachen Join zu machen. Abhängig davon, wie groß/klein die Daten mit Athletennamen sind, erhalten Sie einen optimierten Broadcast-Join und vermeiden ein Shuffle.

import org.apache.spark.sql.functions._ 
import org.apache.spark.ml.feature.Tokenizer 

val tweets = spark.read.format("csv").load(...) 
val athletes = spark.read.format("csv").load(...) 

val tokenizer = new Tokenizer() 
tokenizer.setInputCol("tweet") 
tokenizer.setOutputCol("words") 

val tokenized = tokenizer.transform(tweets) 

val exploded = tokenized.withColumn("word", explode('words)) 

val withAthlete = exploded.join(athletes, 'word === 'name) 

withAthlete.select(exploded("id"), 'name).show() 
+0

Danke für Ihre Antwort wird dies versuchen – amprie286

+0

Ich denke, Sie werden feststellen, dass wenn Sie die DF-Lösung mit Ihrer handcodierten Lösung zu vergleichen performanter und skalierbarer, insbesondere wenn das Datenvolumen zunimmt. Bitte upvote, wenn Sie es nützlich finden. – Silvio