Ich bin neu bei SPARK und ich freue mich auf Spark Java API. Ich habe eine DateiWie berechnet man das gesamte Gehalt in Spark Java API
1201, John, 2500
1202, Alex, 2800
1203, amith, 3900
1204, javed, 2300
1205, Saminga, 23000
Jetzt muss ich das Gesamtgehalt berechnen und in einer Datei speichern. Da ich für MR/spark Java API sehr neu bin, konnte ich es nicht herausfinden. Bitte kann mir jemand dabei helfen.
Beispielcode:
import java.util.Arrays;
import java.util.Comparator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class SalarySum {
public static void main(String[] args)
{
final int k=0;
if(args.length<1)
{
System.out.println("Please provide input files for processing");
System.exit(0);
}
else
{
String inputFile=args[0];
String outputFile=args[1];
SparkConf config=new SparkConf().setAppName("Total Salary Example");
JavaSparkContext spartContext=new JavaSparkContext(config);
JavaRDD<String> inputReader=spartContext.textFile(inputFile);
JavaRDD<String> map=inputReader.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception
{
System.out.println("Flat Map Data: "+t);
return Arrays.asList(t);
}
});
JavaPairRDD<Integer, Iterable<String>> group=map.groupBy(new Function<String, Integer>() {
@Override
public Integer call(String s2) throws Exception
{
String data=s2.split(",")[2].trim();
int value=Integer.parseInt(data);
System.out.println("Tuple: "+s2 +" : "+data);
return value;
}
});
JavaPairRDD<Integer, Integer> totalSaleData = group.flatMapValues(new Function<Iterable<String>, Iterable<Integer>>() {
@Override
public Iterable<Integer> call(Iterable<String> v1)
throws Exception
{
int count=0;
for(String str:v1)
{
String data=str.split(",")[2].trim();
int value=Integer.parseInt(data);
System.out.println("Iterating Values : "+str);
System.out.println("Count: "+count);
count =count+value;
}
return Arrays.asList(count);
}
});
totalSaleData.saveAsTextFile(outputFile);
}
}
}
Ihre Eingabedatei ist txt oder csv? Möchten Sie RDD und nicht den Dataframe verwenden? Wie wird die erwartete Ausgabe in die Datei geschrieben? – abaghel
Meine Eingabedatei ist eine Textdatei, ich brauche RDD und out muss die Summe des kompletten Gehalts sein. – Navyah