0

Ich habe eine Spark-Anwendung, die iterativ über 5 Millionen Elemente ausgeführt wird. Die Anwendung benötigt 2 Stunden für den gesamten Datenbestand. Aber ich muss die Anwendung auf dem gesamten Datensatz von über 50 Millionen Elementen ausführen.
Der Code läuft erfolgreich, aber die meisten meiner Programme laufen auf dem Treiber und die Executoren spielen eine minimale Rolle beim Ausführen der Anwendung. Daher ist die Rechenzeit für diese iterative Anwendung sehr groß.
Die Anwendung findet verbundene Komponenten, indem sie ein Diagramm aus dem n-Tripel-Dataset erstellt.
Das Problem ist, dass der Executor keine Tasks erhält und die erste For-Schleife läuft, bis alle 5 Millionen Elemente fertig sind und dieser Teil ungefähr 90% Zeit benötigt. Daher muss ich diesen Teil hauptsächlich optimieren.
Änderungen vorschlagen, um die Arbeit vom Treiber zum Executor zu übertragen, wodurch dieser Code skalierbar gemacht wird, um die Rechenzeit signifikant zu reduzieren.
Reduzieren Rechenzeit in Spark-Anwendung

import scala.io.Source 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext._ 
import org.apache.spark.graphx._ 
import org.apache.spark.rdd.RDD 
import scala.collection.mutable.ListBuffer 
import scala.collection.mutable.HashMap 
import scala.collection.mutable.ArrayBuffer 

object Wisdom { 

val componentLists = HashMap[VertexId, ListBuffer[VertexId]]() 
val prefLabelMap = HashMap[VertexId, String]() 

def main(args: Array[String]) { 

val conf = new SparkConf() 
val sc = new SparkContext(conf) 

val tripleEndingPattern = """\s*\.\s*$""".r  
val languageTagPattern = "@[\\w-]+".r  

var edgeArray = Array(Edge(0L,0L,"http://dummy/URI")) 
var literalPropsTriplesArray = new Array[(Long,Long,String)](0) 
var vertexArray = new Array[(Long,String)](0) 

val source = sc.textFile("hdfs://ec2-54-172-85-190.compute-1.amazonaws.com:54310/akshat/datas.nt") 
val lines = source.toArray 

var vertexURIMap = new HashMap[String, Long]; 

var triple = new Array[String](3) 
var nextVertexNum = 0L 
for (i <- 0 until lines.length) { 

    lines(i) = tripleEndingPattern.replaceFirstIn(lines(i)," ") 
    triple = lines(i).mkString.split(">\\s+")  
    val tripleSubject = triple(0).substring(1) 
    val triplePredicate = triple(1).substring(1) 
    if (!(vertexURIMap.contains(tripleSubject))) { 
     vertexURIMap(tripleSubject) = nextVertexNum 
     nextVertexNum += 1 
    } 
    if (!(vertexURIMap.contains(triplePredicate))) { 
     vertexURIMap(triplePredicate) = nextVertexNum 
     nextVertexNum += 1 
    } 
    val subjectVertexNumber = vertexURIMap(tripleSubject) 
    val predicateVertexNumber = vertexURIMap(triplePredicate) 

    if (triple(2)(0) == '<') { 
     val tripleObject = triple(2).substring(1) 
     if (!(vertexURIMap.contains(tripleObject))) { 
      vertexURIMap(tripleObject) = nextVertexNum 
      nextVertexNum += 1 
     } 
     val objectVertexNumber = vertexURIMap(tripleObject) 
     edgeArray = edgeArray :+ 
      Edge(subjectVertexNumber,objectVertexNumber,triplePredicate) 
    } 
    else { 
     literalPropsTriplesArray = literalPropsTriplesArray :+ 
      (subjectVertexNumber,predicateVertexNumber,triple(2)) 
    } 
} 

for ((k, v) <- vertexURIMap) vertexArray = vertexArray :+ (v, k) 

for (i <- 0 until literalPropsTriplesArray.length) { 
    if (literalPropsTriplesArray(i)._2 == 
     vertexURIMap("http://www.w3.org/2000/01/rdf-schema#label")) { 

     val prefLabel = 
      languageTagPattern.replaceFirstIn(literalPropsTriplesArray(i)._3,"") 
     prefLabelMap(literalPropsTriplesArray(i)._1) = prefLabel; 
    } 
} 

val vertexRDD: RDD[(Long, String)] = sc.parallelize(vertexArray) 

val edgeRDD: RDD[Edge[(String)]] = 
    sc.parallelize(edgeArray.slice(1,edgeArray.length)) 

val literalPropsTriplesRDD: RDD[(Long,Long,String)] = 
    sc.parallelize(literalPropsTriplesArray) 

val graph: Graph[String, String] = Graph(vertexRDD, edgeRDD) 

val skosRelatedSubgraph = 
    graph.subgraph(t => t.attr == 
        "http://purl.org/dc/terms/subject") 

val ccGraph = skosRelatedSubgraph.connectedComponents() 

ccGraph.vertices.saveAsTextFile("hdfs://ec2-54-172-85-190.compute-1.amazonaws.com/akshat/outp") 

sc.stop 
} 
} 
+3

Es wird toll sein, wenn Sie einige Kommentare setzen oder zumindest eine Zusammenfassung sagen, was ist Ihr Code zu tun. Es wird die Dinge für andere leichter machen –

+0

Analysieren, welcher Teil der langsamste ist und darauf hinweisen und dann kann jemand helfen – Sohaib

+0

Ich stimme @PreetiKhurana zu. Auch dieser Code verwendet nicht korrekt Funken Sie lesen eine Textdatei mit Funkenkontext, um es anschließend zu sammeln. Ich stimme zu, diese Frage zu schließen, weil sie zu weit gefasst ist, um zu beantworten, und unklar ist, was Sie fragen usw. – eliasah

Antwort

0

Sie for-Schleife, wo Sie Karte mit einer RDD (Quelle in Ihrem Code) verwenden. Dann kommen die Ausführenden ins Bild und die Aufgaben werden geteilt. Spark ist nicht dafür vorgesehen, eine for-Schleife zu verwenden, um über die Zeilen der Eingabedatei zu iterieren. Bitte bringen Sie Ihre Grundlagen richtig. Glücklich lernen