2015-06-16 17 views
5

I Eingangsdaten mit dem folgenden Format verwendet:Concurrent Job Execution in Spark-

0 
1 
2 
3 
4 
5 
… 
14 

Input Location: hdfs://localhost:9000/Input/datasource 

Ich habe den folgenden Code-Schnipsel speichern RDD als Textdatei mit mehreren Threads verwendet:

package org.apache.spark.examples; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.Comparator; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import org.apache.avro.ipc.specific.Person; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

import scala.Tuple2; 

class RunnableDemo implements Runnable 
{ 

    private Thread t; 
    private String threadName; 
    private String path; 
    private JavaRDD<String> javaRDD; 
// private JavaSparkContext javaSparkContext; 

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path) 
    { 
     this.threadName=threadName; 
     this.javaRDD=javaRDD; 
     this.path=path; 
//  this.javaSparkContext=javaSparkContext; 
    } 


    @Override 
    public void run() { 
     System.out.println("Running " + threadName);  
     try { 
      this.javaRDD.saveAsTextFile(path); 
//   System.out.println(this.javaRDD.count()); 
      Thread.sleep(50); 
      } catch (InterruptedException e) { 
       System.out.println("Thread " + threadName + " interrupted."); 
       } 
     System.out.println("Thread " + threadName + " exiting."); 
//  this.javaSparkContext.stop(); 
    } 

    public void start() 
     { 
      System.out.println("Starting " + threadName); 
      if (t == null) 
      { 
      t = new Thread (this, threadName); 
      t.start(); 
      } 
     } 

} 

public class SparkJavaTest { 



    public static void main(String[] args) { 

     //Spark Configurations: 

     SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest"); 

     JavaSparkContext ctx=new JavaSparkContext(sparkConf); 

     SQLContext sqlContext = new SQLContext(ctx);   

     JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource"); 




     List<StructField> fields= new ArrayList<StructField>(); 
     fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true)); 

     JavaRDD<Row> rowRDD =dataCollection.map(
       new Function<String, Row>() { 
        @Override 
        public Row call(String record) throws Exception { 
         String[] fields = record.split("\u0001");      
         return RowFactory.create(Integer.parseInt(fields[0].trim())); 
        }     
       });  

     StructType schema = DataTypes.createStructType(fields); 

     DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);   
     dataFrame.registerTempTable("data"); 

     long recordsCount=dataFrame.count();   
     long splitRecordsCount=5; 
     long splitCount =recordsCount/splitRecordsCount; 
     List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>(); 

     for(int i=0;i<splitCount;i++) 
     { 
      long start = i*splitRecordsCount; 
      long end = (i+1)*splitRecordsCount;   
      DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);   
      list1.add(temp.toJavaRDD()); 
     }  

     long length =list1.size(); 

     int split=0; 

     for (int i = 0; i < length; i++) { 

      JavaRDD rdd1 =list1.get(i); 

      JavaPairRDD rdd3=rdd1.cartesian(rdd1); 

      JavaPairRDD<Row,Row> rdd4=rdd3.filter(
        new Function<Tuple2<Row,Row>,Boolean>() 
        { 
         public Boolean call(Tuple2<Row,Row> s) 
         { 
          Row line1=s._1; 
          Row line2=s._2; 

          long app1 = Integer.parseInt(line1.get(0).toString()); 

          long app2 = Integer.parseInt(line2.get(0).toString()); 


          if(app1<app2) 
          { 
           return true; 
          } 
          return false; 
         } 
        }); 

      JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { 
       @Override 
       public String call(Tuple2<Row, Row> s) 
         throws Exception { 

        Row data1=s._1; 
        Row data2=s._2; 

        int x =Integer.parseInt(data1.get(0).toString()); 
        int y =Integer.parseInt(data2.get(0).toString()); 

        String result =x +","+ y+","+(x+y); 
        return result; 
       } 
      }); 

      RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); 

      R.start(); 
      split++;    
      R.start(); 

      int index =i; 

      while(index<length) 
      { 
       JavaRDD rdd2 =list1.get(index); 
       rdd3=rdd1.cartesian(rdd2); 

       rdd4=rdd3.filter(
         new Function<Tuple2<Row,Row>,Boolean>() 
         { 
          public Boolean call(Tuple2<Row,Row> s) 
          { 
           Row line1=s._1; 
           Row line2=s._2; 

           long app1 = Integer.parseInt(line1.get(0).toString()); 

           long app2 = Integer.parseInt(line2.get(0).toString()); 


           if(app1<app2) 
           { 
            return true; 
           } 
           return false; 
          } 
         });   

       test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { 
        @Override 
        public String call(Tuple2<Row, Row> s) 
          throws Exception { 

         Row data1=s._1; 
         Row data2=s._2; 

         int x =Integer.parseInt(data1.get(0).toString()); 
         int y =Integer.parseInt(data2.get(0).toString()); 

         String result =x +","+ y+","+(x+y); 
         return result; 
        } 
       });   

       R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); 

       R.start(); 
       split++;    
       index++;     
      } 
     } 
    } 

} 

In dieser Fall habe ich die folgende Ausnahme

konfrontiert

enter image description here

Ich habe die Lösung in dem folgenden Link zur Verfügung gestellt versuche

How to run concurrent jobs(actions) in Apache Spark using single spark context

Aber noch, ich kann dieses Problem nicht beheben.

Könnten Sie mir bitte helfen, dies zu lösen?

Antwort

0

Könnten Sie versuchen

SparkConf sparkConf=new SparkConf() 
.setAppName("SparkJavaTest") 
.set("spark.driver.allowMultipleContexts", "true"); 

Ich bin auch neue Spark mit der Programmierung aber das hat mir geholfen, wenn sie in den Scheduler mehr Spark-Jobs ausgeführt werden. Aus meiner Sicht müssen wir nicht über die Threads oder M-R-Logik/Ausführungsplan nachdenken, wie Spark es für Sie tut.

+0

Hallo Chetandalal, schon habe ich mit dieser Eigenschaft versucht .nicht für mich arbeiten – Raja

4

Zunächst versuchen Sie, alle Arbeiten am Treiberknoten mit mehreren Threads auszuführen. Dies ist nicht wirklich im Sinne von Spark, da jede Arbeitseinheit in Ihrem Fall unabhängig von den anderen ist und auf verschiedenen Maschinen ausgeführt werden kann. Sie haben hier ein Spielzeugbeispiel, aber das wird mit großen Datenmengen sehr wichtig.

Ein besserer Ansatz wäre es, so etwas wie mapPartitions zu verwenden, um den Schlüsselbereich an jeden Worker zu senden und sie die entsprechenden SQL-Abfragen ausführen zu lassen und dann die Ergebnisse mit einem Thread pro Worker zu speichern. Dies wird den Code sauberer und einfacher zu verstehen machen (sobald man sich daran gewöhnt hat, wie RDDs funktionieren). Sie müssen natürlich die level of parallelism und die Anzahl der Partitionen (über here) für Ihre Eingabedaten geeignet festlegen.

Das unmittelbare Problem mit Ihrem Code ist, dass der Haupt-Thread andere Threads startet, aber nicht auf sie zu Ende wartet. Normalerweise bewirkt dies, dass die erzeugten Threads zusammen mit dem übergeordneten Thread enden (siehe javadoc). Beachten Sie, wie in der answer zu der verknüpften Frage die Hauptfunktion eine get() auf die erzeugten Futures vor der Rückkehr.