2015-02-05 9 views
8

Ich bin sehr neu in Apache Spark. Ich möchte mich eigentlich auf die grundlegende Spark-API-Spezifikation konzentrieren und einige Programme mit Spark-API verstehen und schreiben. Ich habe ein Java-Programm mit Apache Spark geschrieben, um das Joins-Konzept zu implementieren.Apache Spark Joins Beispiel mit Java

Wenn ich Left Outer Join - leftOuterJoin() oder Right Outer Join - rightOuterJoin() verwende, geben beide Methoden ein JavaPairRDD zurück, das einen speziellen Typ Google Options enthält. Aber ich weiß nicht, wie man die ursprünglichen Werte vom optionalen Typ extrahiert.

Egal, ich würde gerne wissen, kann ich die gleichen Join-Methoden verwenden, die die Daten in meinem eigenen Format zurückgeben. Ich habe keinen Weg gefunden, das zu tun. Das heißt, wenn ich Apache Spark verwende, kann ich den Code nicht in meinem eigenen Stil anpassen, da sie bereits alle vordefinierten Dinge gegeben haben.

finden Sie den Code unten

my 2 sample input datasets 

customers_data.txt: 
4000001,Kristina,Chung,55,Pilot 
4000002,Paige,Chen,74,Teacher 
4000003,Sherri,Melton,34,Firefighter 

and 

trasaction_data.txt 
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit 
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit 
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash 
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit 
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit 

Hier ist mein Java-Code

**SparkJoins.java:** 

public class SparkJoins { 

    @SuppressWarnings("serial") 
    public static void main(String[] args) throws FileNotFoundException { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local")); 
     JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt"); 
     JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] customerSplit = s.split(","); 
       return new Tuple2<String, String>(customerSplit[0], customerSplit[1]); 
      } 
     }).distinct(); 

     JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt"); 
     JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] transactionSplit = s.split(","); 
       return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]); 
      } 
     }); 

     //Default Join operation (Inner join) 
     JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs); 
     System.out.println("Joins function Output: "+joinsOutput.collect()); 

     //Left Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect()); 

     //Right Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect()); 

     sc.close(); 
    } 
} 

Und hier die Ausgabe, die ich erhalte

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))] 

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])] 

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])] 

ich dieses Programm leite auf Windows-Plattform

Bitte geben Sie die obige Ausgabe beobachten und mir helfen, die Werte aus Optional Typ

Vielen Dank im Voraus bei der Extraktion

+0

Warum nicht Scala verwenden statt? – maasg

+0

Hallo @maasg, ich bin im Grunde ein Java-Entwickler .. Ich weiß wirklich nicht, Scala .. Aber ich denke, Apache Spark ist am besten geeignet für Scala Programmierung dann Java. –

+0

@ShekarPatel können Sie bitte aktualisieren Sie Ihren Code mit, wie Sie das optionale entfernt .. das wird für andere hilfreich sein. – Shankar

Antwort

8

Wenn Sie linke äußere trete und rechte äußere Verknüpfung, können Sie NULL-Werte haben. Recht!

So Funke gibt optionales Objekt zurück. Nachdem Sie dieses Ergebnis erhalten haben, können Sie dieses Ergebnis Ihrem eigenen Format zuordnen.

Sie können IsPresent() -Methode von Optional verwenden, um Ihre Daten zuzuordnen. Hier

ist das Beispiel:

JavaPairRDD<String,String> firstRDD = .... 
JavaPairRDD<String,String> secondRDD =.... 
// join both rdd using left outerjoin 
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD); 


// mapping of join result 
JavaPairRDD<String, String> mappedRDD = rddWithJoin 
      .mapToPair(tuple -> { 
       if (tuple._2()._2().isPresent()) { 
        //do your operation and return 
        return new Tuple2<String, String>(tuple._1(), tuple._2()._1()); 
       } else { 
        return new Tuple2<String, String>(tuple._1(), "not present"); 
       } 
      }); 
+0

Danke Kumpel .. Es funktioniert gut .. –

+0

@ sms_1190 wie das Ergebnis zu unserem eigenen Format zuordnen? Ich habe auch das selbe Problem. – Shankar

+0

@Shankar: Ich habe das Beispiel in der obigen Antwort hinzugefügt. mappedRDD ist dein eigenes Format. –