Ich habe eine Spark-Anwendung, die iterativ über 5 Millionen Elemente ausgeführt wird. Die Anwendung benötigt 2 Stunden für den gesamten Datenbestand. Aber ich muss die Anwendung auf dem gesamten Datensatz von über 50 Millionen Elementen ausführen.
Der Code läuft erfolgreich, aber die meisten meiner Programme laufen auf dem Treiber und die Executoren spielen eine minimale Rolle beim Ausführen der Anwendung. Daher ist die Rechenzeit für diese iterative Anwendung sehr groß.
Die Anwendung findet verbundene Komponenten, indem sie ein Diagramm aus dem n-Tripel-Dataset erstellt.
Das Problem ist, dass der Executor keine Tasks erhält und die erste For-Schleife läuft, bis alle 5 Millionen Elemente fertig sind und dieser Teil ungefähr 90% Zeit benötigt. Daher muss ich diesen Teil hauptsächlich optimieren.
Änderungen vorschlagen, um die Arbeit vom Treiber zum Executor zu übertragen, wodurch dieser Code skalierbar gemacht wird, um die Rechenzeit signifikant zu reduzieren.
Reduzieren Rechenzeit in Spark-Anwendung
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
object Wisdom {
val componentLists = HashMap[VertexId, ListBuffer[VertexId]]()
val prefLabelMap = HashMap[VertexId, String]()
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val tripleEndingPattern = """\s*\.\s*$""".r
val languageTagPattern = "@[\\w-]+".r
var edgeArray = Array(Edge(0L,0L,"http://dummy/URI"))
var literalPropsTriplesArray = new Array[(Long,Long,String)](0)
var vertexArray = new Array[(Long,String)](0)
val source = sc.textFile("hdfs://ec2-54-172-85-190.compute-1.amazonaws.com:54310/akshat/datas.nt")
val lines = source.toArray
var vertexURIMap = new HashMap[String, Long];
var triple = new Array[String](3)
var nextVertexNum = 0L
for (i <- 0 until lines.length) {
lines(i) = tripleEndingPattern.replaceFirstIn(lines(i)," ")
triple = lines(i).mkString.split(">\\s+")
val tripleSubject = triple(0).substring(1)
val triplePredicate = triple(1).substring(1)
if (!(vertexURIMap.contains(tripleSubject))) {
vertexURIMap(tripleSubject) = nextVertexNum
nextVertexNum += 1
}
if (!(vertexURIMap.contains(triplePredicate))) {
vertexURIMap(triplePredicate) = nextVertexNum
nextVertexNum += 1
}
val subjectVertexNumber = vertexURIMap(tripleSubject)
val predicateVertexNumber = vertexURIMap(triplePredicate)
if (triple(2)(0) == '<') {
val tripleObject = triple(2).substring(1)
if (!(vertexURIMap.contains(tripleObject))) {
vertexURIMap(tripleObject) = nextVertexNum
nextVertexNum += 1
}
val objectVertexNumber = vertexURIMap(tripleObject)
edgeArray = edgeArray :+
Edge(subjectVertexNumber,objectVertexNumber,triplePredicate)
}
else {
literalPropsTriplesArray = literalPropsTriplesArray :+
(subjectVertexNumber,predicateVertexNumber,triple(2))
}
}
for ((k, v) <- vertexURIMap) vertexArray = vertexArray :+ (v, k)
for (i <- 0 until literalPropsTriplesArray.length) {
if (literalPropsTriplesArray(i)._2 ==
vertexURIMap("http://www.w3.org/2000/01/rdf-schema#label")) {
val prefLabel =
languageTagPattern.replaceFirstIn(literalPropsTriplesArray(i)._3,"")
prefLabelMap(literalPropsTriplesArray(i)._1) = prefLabel;
}
}
val vertexRDD: RDD[(Long, String)] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[(String)]] =
sc.parallelize(edgeArray.slice(1,edgeArray.length))
val literalPropsTriplesRDD: RDD[(Long,Long,String)] =
sc.parallelize(literalPropsTriplesArray)
val graph: Graph[String, String] = Graph(vertexRDD, edgeRDD)
val skosRelatedSubgraph =
graph.subgraph(t => t.attr ==
"http://purl.org/dc/terms/subject")
val ccGraph = skosRelatedSubgraph.connectedComponents()
ccGraph.vertices.saveAsTextFile("hdfs://ec2-54-172-85-190.compute-1.amazonaws.com/akshat/outp")
sc.stop
}
}
Es wird toll sein, wenn Sie einige Kommentare setzen oder zumindest eine Zusammenfassung sagen, was ist Ihr Code zu tun. Es wird die Dinge für andere leichter machen –
Analysieren, welcher Teil der langsamste ist und darauf hinweisen und dann kann jemand helfen – Sohaib
Ich stimme @PreetiKhurana zu. Auch dieser Code verwendet nicht korrekt Funken Sie lesen eine Textdatei mit Funkenkontext, um es anschließend zu sammeln. Ich stimme zu, diese Frage zu schließen, weil sie zu weit gefasst ist, um zu beantworten, und unklar ist, was Sie fragen usw. – eliasah