2016-12-12 1 views
0

Ich bin neu bei SPARK und ich freue mich auf Spark Java API. Ich habe eine DateiWie berechnet man das gesamte Gehalt in Spark Java API

1201, John, 2500 
1202, Alex, 2800 
1203, amith, 3900 
1204, javed, 2300 
1205, Saminga, 23000 

Jetzt muss ich das Gesamtgehalt berechnen und in einer Datei speichern. Da ich für MR/spark Java API sehr neu bin, konnte ich es nicht herausfinden. Bitte kann mir jemand dabei helfen.

Beispielcode:

import java.util.Arrays; 
import java.util.Comparator; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.DoubleFunction; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 

import scala.Tuple2; 
public class SalarySum { 

    public static void main(String[] args) 
    { 
     final int k=0; 

     if(args.length<1) 
     { 
      System.out.println("Please provide input files for processing"); 
      System.exit(0); 
     } 
     else 
     { 
      String inputFile=args[0]; 
      String outputFile=args[1]; 
      SparkConf config=new SparkConf().setAppName("Total Salary Example"); 
      JavaSparkContext spartContext=new JavaSparkContext(config); 

      JavaRDD<String> inputReader=spartContext.textFile(inputFile); 

      JavaRDD<String> map=inputReader.flatMap(new FlatMapFunction<String, String>() { 
       @Override 
       public Iterable<String> call(String t) throws Exception 
       { 
        System.out.println("Flat Map Data: "+t); 
        return Arrays.asList(t); 
       } 
      }); 

      JavaPairRDD<Integer, Iterable<String>> group=map.groupBy(new Function<String, Integer>() { 

       @Override 
       public Integer call(String s2) throws Exception 
       { 
        String data=s2.split(",")[2].trim(); 
        int value=Integer.parseInt(data); 
        System.out.println("Tuple: "+s2 +" : "+data); 
        return value; 
       } 
      }); 


      JavaPairRDD<Integer, Integer> totalSaleData = group.flatMapValues(new Function<Iterable<String>, Iterable<Integer>>() { 

       @Override 
       public Iterable<Integer> call(Iterable<String> v1) 
         throws Exception 
       { 
        int count=0; 
        for(String str:v1) 
        { 
         String data=str.split(",")[2].trim(); 
         int value=Integer.parseInt(data); 
         System.out.println("Iterating Values : "+str); 
         System.out.println("Count: "+count); 
         count =count+value; 
        } 
        return Arrays.asList(count); 
       } 
      }); 

      totalSaleData.saveAsTextFile(outputFile); 

     } 
    } 

} 
+0

Ihre Eingabedatei ist txt oder csv? Möchten Sie RDD und nicht den Dataframe verwenden? Wie wird die erwartete Ausgabe in die Datei geschrieben? – abaghel

+0

Meine Eingabedatei ist eine Textdatei, ich brauche RDD und out muss die Summe des kompletten Gehalts sein. – Navyah

Antwort

1

Sie können es wie unter Funken 1.6 verwenden.

public class SparkSalarySum { 
public static void main(String[] args) { 
    SparkConf conf = new SparkConf().setAppName("SparkSalarySum").setMaster("local[2]"); 
    JavaSparkContext jsc = new JavaSparkContext(conf); 
    JavaRDD<String> lines = jsc.textFile("c:\\temp\\test.txt"); 
    JavaPairRDD<String, Integer> total = lines.flatMap(line -> Arrays.asList(Integer.parseInt(line.split(",")[2].trim()))) 
      .mapToPair(sal -> new Tuple2<String, Integer>("Total", sal)) 
      .reduceByKey((x, y) -> x + y); 
    total.foreach(data -> { 
     System.out.println(data._1()+"-"+data._2()); 
    }); 
    total.coalesce(1).saveAsTextFile("c:\\temp\\testOut"); 
    jsc.stop(); 
    } 
} 
+0

Hallo, können wir den gleichen Code mit JavaSparkContext API schreiben. Ich benutze Spark 1.6 Version – Navyah

+0

Aktualisiert meine Antwort für Spark 1.6. – abaghel

+0

Aber wie wir das Ergebnis in eine Textdatei mit diesem Code speichern, konnte ich keine Methoden sehen, um ein Ergebnis in einer Datei zu speichern – Navyah

Verwandte Themen