I Eingangsdaten mit dem folgenden Format verwendet:Concurrent Job Execution in Spark-
0
1
2
3
4
5
…
14
Input Location: hdfs://localhost:9000/Input/datasource
Ich habe den folgenden Code-Schnipsel speichern RDD als Textdatei mit mehreren Threads verwendet:
package org.apache.spark.examples;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
class RunnableDemo implements Runnable
{
private Thread t;
private String threadName;
private String path;
private JavaRDD<String> javaRDD;
// private JavaSparkContext javaSparkContext;
RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
{
this.threadName=threadName;
this.javaRDD=javaRDD;
this.path=path;
// this.javaSparkContext=javaSparkContext;
}
@Override
public void run() {
System.out.println("Running " + threadName);
try {
this.javaRDD.saveAsTextFile(path);
// System.out.println(this.javaRDD.count());
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
// this.javaSparkContext.stop();
}
public void start()
{
System.out.println("Starting " + threadName);
if (t == null)
{
t = new Thread (this, threadName);
t.start();
}
}
}
public class SparkJavaTest {
public static void main(String[] args) {
//Spark Configurations:
SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");
JavaSparkContext ctx=new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(ctx);
JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");
List<StructField> fields= new ArrayList<StructField>();
fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));
JavaRDD<Row> rowRDD =dataCollection.map(
new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] fields = record.split("\u0001");
return RowFactory.create(Integer.parseInt(fields[0].trim()));
}
});
StructType schema = DataTypes.createStructType(fields);
DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);
dataFrame.registerTempTable("data");
long recordsCount=dataFrame.count();
long splitRecordsCount=5;
long splitCount =recordsCount/splitRecordsCount;
List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();
for(int i=0;i<splitCount;i++)
{
long start = i*splitRecordsCount;
long end = (i+1)*splitRecordsCount;
DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);
list1.add(temp.toJavaRDD());
}
long length =list1.size();
int split=0;
for (int i = 0; i < length; i++) {
JavaRDD rdd1 =list1.get(i);
JavaPairRDD rdd3=rdd1.cartesian(rdd1);
JavaPairRDD<Row,Row> rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
R.start();
int index =i;
while(index<length)
{
JavaRDD rdd2 =list1.get(index);
rdd3=rdd1.cartesian(rdd2);
rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
index++;
}
}
}
}
In dieser Fall habe ich die folgende Ausnahme
konfrontiertIch habe die Lösung in dem folgenden Link zur Verfügung gestellt versuche
How to run concurrent jobs(actions) in Apache Spark using single spark context
Aber noch, ich kann dieses Problem nicht beheben.
Könnten Sie mir bitte helfen, dies zu lösen?
Hallo Chetandalal, schon habe ich mit dieser Eigenschaft versucht .nicht für mich arbeiten – Raja