2017-01-25 3 views
0

portierte ich einen Scala-Code, die einfache Aggregation in Python tut:Pyspark Aggregation groupBy verwendet, ist sehr langsam im Vergleich zu Scala

from time import time 
from utils import notHeader, parse, pprint 
from pyspark import SparkContext 

start = time() 
src = "linkage" 
sc = SparkContext("spark://aiur.local:7077", "linkage - Python") 
rawRdd = sc.textFile(src) 
noheader = rawRdd.filter(notHeader) 
parsed = noheader.map(parse) 
grouped = parsed.groupBy(lambda md: md.matched) 
res = grouped.mapValues(lambda vals: len(vals)).collect() 
for x in res: pprint(x) 
diff = time() - start 
mins, secs = diff/60, diff % 60 
print "Analysis took {} mins and {} secs".format(int(mins), int(secs)) 
sc.stop() 

utils.py:

from collections import namedtuple 

def isHeader(line): 
    return line.find("id_1") >= 0 

def notHeader(line): 
    return not isHeader(line) 

def pprint(s): 
    print s 

MatchedData = namedtuple("MatchedData", "id_1 id_2 scores matched") 

def parse(line): 
    pieces = line.split(",") 
    return MatchedData(pieces[0], pieces[1], pieces[2:11], pieces[11]) 

Und die Scala Version:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

object SparkTest { 
    def main(args: Array[String]): Unit = { 
     val start: Long = System.currentTimeMillis/1000 
     val filePath = "linkage" 
     val conf = new SparkConf() 
      .setAppName("linkage - Scala") 
      .setMaster("spark://aiur.local:7077") 
     val sc = new SparkContext(conf) 
     val rawblocks = sc.textFile(filePath) 
     val noheader = rawblocks.filter(x => !isHeader(x)) 
     val parsed = noheader.map(line => parse(line)) 
     val grouped = parsed.groupBy(md => md.matched) 
     grouped.mapValues(x => x.size).collect().foreach(println) 
     val diff = System.currentTimeMillis/1000 - start 
     val (mins, secs) = (diff/60, diff % 60) 
     val pf = printf("Analysis took %d mins and %d secs", mins, secs) 
     println(pf) 
     sc.stop() 
    } 

    def isHeader(line: String): Boolean = { 
     line.contains("id_1") 
    } 

    def toDouble(s: String): Double = { 
     if ("?".equals(s)) Double.NaN else s.toDouble 
    } 

    case class MatchData(id1: Int, id2: Int, 
     scores: Array[Double], matched: Boolean) 

    def parse(line: String) = { 
     val pieces = line.split(",") 
     val id1 = pieces(0).toInt 
     val id2 = pieces(1).toInt 
     val scores = pieces.slice(2, 11).map(toDouble) 
     val matched = pieces(11).toBoolean 
     MatchData(id1, id2, scores, matched) 
    } 
} 

Die Scala-Version wird in 26 Sekunden abgeschlossen, aber die Python-Version dauerte ~ 6 Minuten. Die Protokolle zeigen einen sehr großen Unterschied bei der Vervollständigung der jeweiligen collect() - Aufrufe.

Python:

17/01/25 16:22:10 INFO DAGScheduler: ResultStage 1 (collect at /Users/esamson/Hackspace/Spark/run_py/dcg.py:12) finished in 234.860 s 
17/01/25 16:22:10 INFO DAGScheduler: Job 0 finished: collect at /Users/esamson/Hackspace/Spark/run_py/dcg.py:12, took 346.675760 s 

Scala:

17/01/25 16:26:23 INFO DAGScheduler: ResultStage 1 (collect at Spark.scala:17) finished in 9.619 s 
17/01/25 16:26:23 INFO DAGScheduler: Job 0 finished: collect at Spark.scala:17, took 22.022075 s 

'groupBy' scheint der einzige Anruf von Bedeutung zu sein. Gibt es eine Möglichkeit, die Leistung des Python-Codes zu verbessern?

+3

Gründe, die Dataframe-API und das CSV-Leseformat nicht zu verwenden? Die Leistung wird sich wahrscheinlich auf das Scala-Niveau verbessern, wenn nicht sogar besser. – maasg

+0

@ maasg Würde das definitiv versuchen – kerrigangster

Antwort

1

Sie verwenden RDDs, und wenn Sie Transformationen für sie durchführen (z. B. groupby, map), müssen Sie Funktionen an sie übergeben. Wenn Sie diese Funktionen in scala übergeben, werden die Funktionen einfach ausgeführt. Wenn Sie dasselbe in Python machen, muss Spark diese Funktionen serialisieren, eine Python-VM auf jedem Executor öffnen und dann, wenn die Funktion ausgeführt werden muss, die Scala-Daten in Python konvertieren, an die Python-VM übergeben und dann übergeben und Konvertiere die Ergebnisse.

Alle diese Konvertierungen erfordern viel Arbeit und daher ist RDD in pyspark im Allgemeinen viel langsamer als scala.

Eine Möglichkeit, dies zu umgehen, ist die Verwendung von Dataframe-Logik, die die Verwendung bereits erstellter Funktionen (in pyspark.sql.functions) ermöglicht, die im Hintergrund Scala-Funktionen verwenden. Das würde ungefähr so ​​aussehen (für spark 2.0):

from pyspark import SparkSession 
from pyspark.sql.functions import size 

src = "linkage" 
spark = SparkSession.builder.master(""spark://aiur.local:7077"").appName(""linkage - Python"").getOrCreate() 
df = spark.read.option("header", "true").csv(src) 
res = df.groupby("md").agg(size(df.vals)).collect() 
... 

Dies setzt natürlich voraus, dass matched und vals die Spaltennamen sind.

Verwandte Themen