2016-05-11 9 views
8

Ich übermittle meinen Code an einen Funke-Standalone-Cluster. Submit-Befehl ist wie folgt:Ist das ein Fehler von Spark-Stream oder Speicherleck?

nohup ./bin/spark-submit \ 
--master spark://ES01:7077 \ 
--executor-memory 4G \ 
--num-executors 1 \ 
--total-executor-cores 1 \ 
--conf "spark.storage.memoryFraction=0.2" \ 
./myCode.py 1>a.log 2>b.log & 

Ich gebe den Executor 4G-Speicher im obigen Befehl angeben. Aber benutze den Befehl top, um den Executor-Prozess zu überwachen. Ich merke, dass die Speicherauslastung weiter wächst. Nun ist die Top-Befehlsausgabe unter:

PID USER  PR NI VIRT RES SHR S %CPU %MEM  TIME+ COMMAND                                      
12578 root  20 0 20.223g 5.790g 23856 S 61.5 37.3 20:49.36 java  

Mein Gesamtspeicher ist 16G so 37,3% ist bereits größer als die 4 GB I angegeben. Und es wächst immer noch.

Verwenden Sie den Befehl ps, Sie können wissen, es ist der Executor-Prozess.

[[email protected] ~]# ps -awx | grep spark | grep java 
10409 ?  Sl  1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080 
10603 ?  Sl  6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077 
12420 ?  Sl 10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py 
12578 ?  Sl 21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://[email protected]:52660 

Unten finden Sie den Code. Es ist sehr einfach, also glaube ich nicht, dass es ein Speicherleck gibt.

if __name__ == "__main__": 

    dataDirectory = '/stream/raw' 

    sc = SparkContext(appName="Netflow") 
    ssc = StreamingContext(sc, 20) 

    # Read CSV File 
    lines = ssc.textFileStream(dataDirectory) 

    lines.foreachRDD(process) 

    ssc.start() 
    ssc.awaitTermination() 

Der Code für die Prozessfunktion ist unten. Bitte beachten Sie, dass ich hier HiveContext nicht SqlContext verwende. Da SqlContext nicht Fensterfunktion

def getSqlContextInstance(sparkContext): 
    if ('sqlContextSingletonInstance' not in globals()): 
     globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext) 
    return globals()['sqlContextSingletonInstance'] 

def process(time, rdd): 

    if rdd.isEmpty(): 
     return sc.emptyRDD() 

    sqlContext = getSqlContextInstance(rdd.context) 

    # Convert CSV File to Dataframe 
    parts = rdd.map(lambda l: l.split(",")) 
    rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11]))) 
    dataframe = sqlContext.createDataFrame(rowRdd) 

    # Get the top 2 interface of each router 
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits')) 
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc()) 
    rank = func.dense_rank().over(windowSpec) 
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2") 

    ret.show() 
    dataframe.show() 

Eigentlich fand ich unten Code unterstützen das Problem verursachen:

# Get the top 2 interface of each router 
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits')) 
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc()) 
    rank = func.dense_rank().over(windowSpec) 
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2") 
    ret.show() 

Denn wenn ich diese 5 Zeile entfernen. Der Code kann die ganze Nacht ohne Speichererweiterung laufen. Wenn Sie sie jedoch hinzufügen, wird die Speicherauslastung von Executor auf eine sehr hohe Zahl anwachsen.

Grundsätzlich ist der obige Code nur ein paar Fenster + grous in SparkSQL. Ist das ein Fehler?

+0

In Verbindung stehende http://stackoverflow.com/q/37283624/1560062? – zero323

+0

@ zero323 danke. Aber ich benutze spark1.6.1 –

Antwort

0

Wie ich in Ihren 5 Zeilen sehen kann, vielleicht die groupBy ist das Problem, würden Sie versuchen, mit reduceBy, und sehen, wie es funktioniert.

Siehe here und here.

+0

danke für die Info. Aber ich erwarte zu wissen, ob es ein Fehler ist oder ich verwende ihn nicht richtig. –

+0

@Tristan Es ist nicht die gleiche Gruppe wie auf RDD, siehe http://StackOverflow.com/q/32902982/1560062 – zero323

+0

Ich nehme an, dass diese CSV-Datei auf HDFS gespeichert ist. Was ist seine Größe? Wie viel wächst/verändert sich und in welcher Häufigkeit. Was ich zu verstehen versuche ist, wie viele Daten in jedem Batch-Intervall verarbeitet werden müssen und wie lang ist dieses Intervall (standardmäßig 1 Sekunde)? –

3

Haftungsausschluss: diese Antwort basiert nicht auf Fehlersuche, sondern mehr auf Beobachtungen und die Dokumentation Apache Spark bietet

Ich glaube nicht, dass dies ein Fehler ist zu beginnen!

Wenn wir Ihre Konfigurationen betrachten, können Sie sehen, dass Sie sich hauptsächlich auf das Tuning des Executors konzentrieren, was nicht falsch ist, aber Sie vergessen den Treiberteil der Gleichung.

Mit Blick auf die Funken Cluster Übersicht von Apache Spark documentaion

enter image description here

Wie Sie sehen können, hat jeder Arbeitnehmer einen Testamentsvollstrecker, aber in Ihrem Fall der Arbeiter Knoten der gleiche wie der Fahrer Knoten ist! Was ist ehrlich gesagt der Fall, wenn Sie lokal oder in einem eigenständigen Cluster in einem einzigen Knoten ausgeführt werden.

Weiterhin nimmt der Treiber standardmäßig 1G Speicher, sofern nicht mit spark.driver.memory Flag abgestimmt.Darüber hinaus sollten Sie nicht die Heap-Nutzung von der JVM selbst vergessen, und die Web-UI, die vom Treiber zu AFAIK!

Wenn Sie die Codezeilen löschen, die Sie erwähnt, wird der Code links ohne Aktionen als map Funktion nur eine Transformation ist, daher wird es keine Ausführung sein, und daher Sie sehen gar nicht Speicher Anstieg !

Gleiches gilt für groupBy, da es nur eine Transformation ist, die ausgeführt werden, es sei denn eine Aktion aufgerufen wird, in Ihrem Fall die agg und show weiter unten den Strom!

Versuchen Sie, Ihren Treiberspeicher und die Gesamtzahl der Cores in Spark zu minimieren, die durch spark.cores.max definiert ist, wenn Sie die Anzahl der Kerne in diesem Prozess steuern möchten, dann kaskadieren Sie herunter zu den Executoren. Darüber hinaus würde ich spark.python.profile.dump zu Ihrer Konfigurationsliste hinzufügen, damit Sie ein Profil für die Ausführung Ihres Spark-Jobs sehen können, das Ihnen helfen kann, den Fall besser zu verstehen und Ihren Cluster besser auf Ihre Bedürfnisse abzustimmen.

+0

Hallo, Danke für die Antwort. Aber 1. Ich habe immer noch eine dataframe.show() nach dem Entfernen dieser Zeilen. Es gibt also immer noch eine Aktion. 2. In meinem Fall kann die Stream-Berechnung einige Stunden dauern. Das bedeutet Tausende von Schleifen (das Intervall beträgt 20 Sekunden). Während dieser Zeit wächst die Speicherauslastung von Executor. Ich weiß also nicht, was Ihre vorgeschlagene Lösung ist. Minimiere meinen Treiberspeicher? Warum? –

+0

Ich habe nicht bemerkt, dass die Show nicht Teil des gelöschten Codes ist. Was den Speicher anbelangt, baue ich darauf basierend, es zu minimieren und zu sehen, ob es überläuft! –