2016-06-30 10 views
4

Ich versuche, etwas Code von Spark 1.6 zu Spark 2.0 mit neuen Materialien aus Spark 2.0 zu portieren. Zuerst möchte ich den csv-Reader von Spark 2.0 verwenden. BTW, ich benutze pyspark.Spark 2.0 lesen CSV Anzahl der Partitionen (PySpark)

Mit der "alten" textFile Funktion kann ich die minimale Anzahl von Partitionen einstellen. Ex:

file= sc.textFile('/home/xpto/text.csv', minPartitions=10) 
header = file.first() #extract header 
data = file.filter(lambda x:x !=header) #csv without header 
... 

Jetzt, mit Spark-2.0 Ich kann die csv direkt lesen:

df = spark.read.csv('/home/xpto/text.csv', header=True) 
... 

Aber ich finde nicht einen Weg, die minPartitions einzustellen.

Ich brauche dies, um die Leistung meines Codes zu testen.

Thx, Fred

Antwort

4

Wenn die Anzahl der Zeilen kleiner als die Anzahl der Partitionen, die Sie in aufzuspalten versuchen, funken ignoriert grundsätzlich das Partitionierungs. Koaleszieren (enge Transformation) wird immer verwendet, um die Anzahl der Partitionen zu reduzieren, nicht um sie zu erhöhen, zum Erhöhen können Sie rdd.repartition(300) verwenden. Außerdem habe ich festgestellt, dass wenn Sie coalesce() verwenden. die Leistung von coalesce(100,shuffle=True) ist weit besser als coalesce(100). Probieren Sie es in Ihrem Code aus und danken Sie mir später. Ein update: auf einer rdd um die Anzahl der Partitionen zu erhöhen oder sie zu reduzieren und die Daten gleichmäßig zu mischen, können Sie coalesce verwenden (Anzahl der Partitionen, shuffle = true). Eine Sache, die zu beachten ist, ist, dass dies einen Datenmix aller Knoten auslöst.

1

ich es herausgefunden. Der DataFrame (und RDD) hat eine Methode namens "coalesce". Wo die Anzahl der Partitionen eingestellt werden kann.

Ex:

>>> df = spark.read.csv('/home/xpto/text.csv', header=True).coalesce(2) 
>>> df.rdd.getNumPartitions() 
2 

In meinem Fall splited Funken meine Datei in 153 Partitionen. Ich bin in der Lage, die Anzahl der Partitionen auf 10 zu setzen, aber wenn ich versuche, auf 300 zu setzen, ignoriert es und verwendet die 153 wieder (ich weiß nicht warum).

REF: https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce

+1

Gemäß der Dokumentation 'coalesce()' kann nur die Anzahl der Partitionen verringert werden. 'coalesce()' erhöht nicht die Anzahl der Partitionen. Dafür müssen Sie 'repartition()' verwenden und die Kosten für eine Datenmischung um Funkenarbeiter bezahlen. – DavidF

Verwandte Themen