2016-12-14 1 views
0

Meine CSV-Datei anzuwenden:Wie Map-Funktion auf in Funken java RDD Operationen

YEAR,UTILITY_ID,UTILITY_NAME,OWNERSHIP,STATE_CODE,AMR_METERING_RESIDENTIAL,AMR_METERING_COMMERCIAL,AMR_METERING_INDUSTRIAL,AMR_METERING_TRANS,AMR_METERING_TOTAL,AMI_METERING_RESIDENTIAL,AMI_METERING_COMMERCIAL,AMI_METERING_INDUSTRIAL,AMI_METERING_TRANS,AMI_METERING_TOTAL,ENERGY_SERVED_RESIDENTIAL,ENERGY_SERVED_COMMERCIAL,ENERGY_SERVED_INDUSTRIAL,ENERGY_SERVED_TRANS,ENERGY_SERVED_TOTAL 
2011,34,City of Abbeville - (SC),M,SC,880,14,,,894,,,,,,,,,, 
2011,84,A & N Electric Coop,C,MD,135,25,,,160,,,,,,,,,, 
2011,84,A & N Electric Coop,C,VA,31893,2107,0,,34000,,,,,,,,,, 
2011,97,Adams Electric Coop,C,IL,8334,190,,,8524,,,,,0,,,,,0 
2011,108,Adams-Columbia Electric Coop,C,WI,33524,1788,709,,36021,,,,,,,,,, 
2011,118,Adams Rural Electric Coop, Inc,C,OH,7457,20,,,7477,,,,,,,,,, 
2011,122,Village of Arcade,M,NY,3560,498,100,,4158,,,,,,,,,, 
2011,155,Agralite Electric Coop,C,MN,4383,227,315,,4925,,,,,,,,,, 

Hier unten den Spark-Code, um die CSV-Datei zu lesen:

import org.apache.spark.api.java.JavaSparkContext; 

public class RddCsv 
{ 
    public static void main(String[] args) 
    { 
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> allRows = sc.textFile("file:///home/kumar/Desktop/Eletricaldata/file8_2011.csv");//read csv file 
    System.out.println(allRows.take(5)); 
    } 
} 

Ich bin Learner sparkJava, Auswählen von Perticuler-Feldwert aus diesem CsvDataset und Ausführen von Aggregationsoperationen und Verwenden von Transformationen und Aktionen für das Dataset. und wie perticular Feldwert

+0

Mögliches Duplikat von [So wird CSV- oder JSON-Datei mit Apache Spark analysiert] (http://stackoverflow.com/questions/25362942/how-to-parsing-csv-or-json-file-with-apache-spark) – Jobin

Antwort

0
public static void main(String[] args) 
{ 
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> allRows = sc.textFile("file:///home/abhishek/Desktop/file8_2011.csv"); 
    System.out.println(allRows.take(5)); 
    List<String> headers= Arrays.asList(allRows.take(1).get(0).split(",")); 
    String field="YEAR"; 
    //Skip Header 
    JavaRDD<String>dataWithoutHeaders=allRows.filter(x -> !(x.split(",")[headers.indexOf(field)]).equals(field)); 
    //Take one field as integer 
    JavaRDD<Integer> years=dataWithoutHeaders.map(x -> Integer.valueOf(x.split(",")[headers.indexOf(field)])); 
    //Aggregate operation getTotal aggregate() arguments are initial value for a partition,aggregating function for a partition 
    //and aggregating function for results from different partition 
    int total=years.aggregate(0,RddCsv::sum,RddCsv::sum); 
    for (Integer i:years.collect()){ 
     System.out.println("year :: "+i); 
    } 
    System.out.println(total); 
} 

private static int sum(int a,int b){ 
    return a+b; 
} 

Dies ist eine grundlegende program.You wählen sollte Funken der Java-APIs für detaillierte Informationen lesen.

+0

funktioniert nicht, compitime Fehler kommen diese Zeile .aggregate (0, RddCsv :: Summe, RddCsv :: Summe); – kumar

+0

Ich habe es gerade jetzt ausgeführt. –

+0

Ausgabe: 16/12/14 16:26:31 INFO TaskSetManager: Beendete Aufgabe 0.0 in Stufe 3.0 (TID 3) in 25 ms auf localhost (1/1) 16/12/14 16:26:31 INFO TaskSchedulerImpl: entfernt taskset 3.0, deren Aufgaben alle abgeschlossen haben, vom Pool 16/12/14 16.26.31 INFO DAGScheduler: Job 3 beendet: sammeln bei RDDCsv.java:32, nahm 0,049729 s Jahr :: 2011 Jahr :: 2011 Jahr :: 2011 Jahr :: 2011 Jahr :: 2011 Jahr :: 2011 Jahr :: 2011 Jahr :: 2011 16/12/14 16:26:31 INFO SparkContext: Aufrufen Anschlag () von shutdown hook –

Verwandte Themen