portierte ich einen Scala-Code, die einfache Aggregation in Python tut:Pyspark Aggregation groupBy verwendet, ist sehr langsam im Vergleich zu Scala
from time import time
from utils import notHeader, parse, pprint
from pyspark import SparkContext
start = time()
src = "linkage"
sc = SparkContext("spark://aiur.local:7077", "linkage - Python")
rawRdd = sc.textFile(src)
noheader = rawRdd.filter(notHeader)
parsed = noheader.map(parse)
grouped = parsed.groupBy(lambda md: md.matched)
res = grouped.mapValues(lambda vals: len(vals)).collect()
for x in res: pprint(x)
diff = time() - start
mins, secs = diff/60, diff % 60
print "Analysis took {} mins and {} secs".format(int(mins), int(secs))
sc.stop()
utils.py:
from collections import namedtuple
def isHeader(line):
return line.find("id_1") >= 0
def notHeader(line):
return not isHeader(line)
def pprint(s):
print s
MatchedData = namedtuple("MatchedData", "id_1 id_2 scores matched")
def parse(line):
pieces = line.split(",")
return MatchedData(pieces[0], pieces[1], pieces[2:11], pieces[11])
Und die Scala Version:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object SparkTest {
def main(args: Array[String]): Unit = {
val start: Long = System.currentTimeMillis/1000
val filePath = "linkage"
val conf = new SparkConf()
.setAppName("linkage - Scala")
.setMaster("spark://aiur.local:7077")
val sc = new SparkContext(conf)
val rawblocks = sc.textFile(filePath)
val noheader = rawblocks.filter(x => !isHeader(x))
val parsed = noheader.map(line => parse(line))
val grouped = parsed.groupBy(md => md.matched)
grouped.mapValues(x => x.size).collect().foreach(println)
val diff = System.currentTimeMillis/1000 - start
val (mins, secs) = (diff/60, diff % 60)
val pf = printf("Analysis took %d mins and %d secs", mins, secs)
println(pf)
sc.stop()
}
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
def toDouble(s: String): Double = {
if ("?".equals(s)) Double.NaN else s.toDouble
}
case class MatchData(id1: Int, id2: Int,
scores: Array[Double], matched: Boolean)
def parse(line: String) = {
val pieces = line.split(",")
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2, 11).map(toDouble)
val matched = pieces(11).toBoolean
MatchData(id1, id2, scores, matched)
}
}
Die Scala-Version wird in 26 Sekunden abgeschlossen, aber die Python-Version dauerte ~ 6 Minuten. Die Protokolle zeigen einen sehr großen Unterschied bei der Vervollständigung der jeweiligen collect() - Aufrufe.
Python:
17/01/25 16:22:10 INFO DAGScheduler: ResultStage 1 (collect at /Users/esamson/Hackspace/Spark/run_py/dcg.py:12) finished in 234.860 s
17/01/25 16:22:10 INFO DAGScheduler: Job 0 finished: collect at /Users/esamson/Hackspace/Spark/run_py/dcg.py:12, took 346.675760 s
Scala:
17/01/25 16:26:23 INFO DAGScheduler: ResultStage 1 (collect at Spark.scala:17) finished in 9.619 s
17/01/25 16:26:23 INFO DAGScheduler: Job 0 finished: collect at Spark.scala:17, took 22.022075 s
'groupBy' scheint der einzige Anruf von Bedeutung zu sein. Gibt es eine Möglichkeit, die Leistung des Python-Codes zu verbessern?
Gründe, die Dataframe-API und das CSV-Leseformat nicht zu verwenden? Die Leistung wird sich wahrscheinlich auf das Scala-Niveau verbessern, wenn nicht sogar besser. – maasg
@ maasg Würde das definitiv versuchen – kerrigangster