2016-05-16 8 views
2

Meine Spark-Anwendung schlägt fehl, wenn sie auf zahlreiche CSV-Dateien (~ 1000 @ 63MB) von S3 zugreifen und sie in eine Spark RDD pipen muss. Der eigentliche Prozess des Aufteilens der CSV scheint zu funktionieren, aber ein zusätzlicher Funktionsaufruf zu S3NativeFileSystem scheint einen Fehler zu verursachen und den Job zum Absturz zu bringen.Ruft der S3NativeFileSystem-Aufruf meine Pyspark-Anwendung auf AWS EMR ab 4.6.0

Um zu beginnen, ist die folgend meine PySpark Anwendung:

from pyspark import SparkContext 
sc = SparkContext("local", "Simple App") 
from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
import time 

startTime = float(time.time()) 

dataPath = 's3://PATHTODIRECTORY/' 
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "MYKEY") 
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "MYSECRETKEY") 

def buildSchemaDF(tableName, columnList): 
    currentRDD = sc.textFile(dataPath + tableName).map(lambda line: line.split("|")) 
    currentDF = currentRDD.toDF(columnList) 
    return currentDF 

loadStartTime = float(time.time()) 
lineitemDF = buildSchemaDF('lineitem*', ['l_orderkey','l_partkey','l_suppkey','l_linenumber','l_quantity','l_extendedprice','l_discount','l_tax','l_returnflag','l_linestatus','l_shipdate','l_commitdate','l_receiptdate','l_shipinstruct','l_shipmode','l_comment']) 
lineitemDF.registerTempTable("lineitem") 
loadTimeElapsed = float(time.time()) - loadStartTime 

queryStartTime = float(time.time()) 

qstr = """ 
    SELECT 
     lineitem.l_returnflag, 
     lineitem.l_linestatus, 
     sum(l_quantity) as sum_qty, 
     sum(l_extendedprice) as sum_base_price, 
     sum(l_discount) as sum_disc, 
     sum(l_tax) as sum_tax, 
     avg(l_quantity) as avg_qty, 
     avg(l_extendedprice) as avg_price, 
     avg(l_discount) as avg_disc, 
     count(l_orderkey) as count_order 
    FROM 
     lineitem 
    WHERE 
     l_shipdate <= '19981001' 
    GROUP BY 
     l_returnflag, 
     l_linestatus 
    ORDER BY 
     l_returnflag, 
     l_linestatus 
    """ 
tpch1DF = sqlContext.sql(qstr) 

queryTimeElapsed = float(time.time()) - queryStartTime 
totalTimeElapsed = float(time.time()) - startTime 

tpch1DF.show() 

queryResults = [qstr, loadTimeElapsed, queryTimeElapsed, totalTimeElapsed] 
distData = sc.parallelize(queryResults) 
distData.saveAsTextFile(dataPath + 'queryResults.csv') 

print 'Load Time: ' + str(loadTimeElapsed) 
print 'Query Time: ' + str(queryTimeElapsed) 
print 'Total Time: ' + str(totalTimeElapsed) 

es Schritt I durch Spinnen auf einen Funken EMR Cluster mit dem folgenden AWS-CLI-Befehl starten Schritt zu wagen (Wagenrücklauf zur besseren Lesbarkeit hinzugefügt):

aws emr create-cluster --name "Big TPCH Spark cluster2" --release-label emr-4.6.0 
--applications Name=Spark --ec2-attributes KeyName=blazing-test-aws 
--log-uri s3://aws-logs-132950491118-us-west-2/elasticmapreduce/j-1WZ39GFS3IX49/ 
--instance-type m3.2xlarge --instance-count 6 --use-default-roles 

Nachdem der EMR-Cluster beendet Provisioning kopiere ich über meine Pyspark Anwendung auf dem Master-Knoten auf '/home/hadoop/pysparkApp.py' dann. Wenn es kopiert wurde, kann ich den Schritt für das Senden von Funken hinzufügen.

aws emr add-steps --cluster-id j-1DQJ8BDL1394N --steps 
Type=spark,Name=SparkTPCHTests,Args=[--deploy-mode,cluster,- 
conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,5,--executor 
cores,5,--executor memory,20g,/home/hadoop/tpchSpark.py] 
,ActionOnFailure=CONTINUE 

Nun, wenn ich laufe diesen Schritt nur über ein paar der oben genannten CSV-Dateien die endgültigen Ergebnisse generiert werden, aber das Skript wird behaupten, noch nicht bestanden.

Ich denke, es ist mit einem zusätzlichen Aufruf von S3NativeFileSystem verbunden, aber ich bin mir nicht sicher. Dies sind die Yarn Log-Nachrichten, die ich bekomme, die mich zu dieser Schlussfolgerung führen. Der erste Anruf wird gut zu funktionieren:

16/05/15 23:18:00 INFO HadoopRDD: Input split: s3://data-set-builder/splitLineItem2/lineitemad:0+64901757 
16/05/15 23:18:00 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[ED8011CE4E1F6F18], ServiceEndpoint=[https://data-set-builder.s3-us-west-2.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, ClientExecuteTime=[77.956], HttpRequestTime=[77.183], HttpClientReceiveResponseTime=[20.028], RequestSigningTime=[0.229], CredentialsRequestTime=[0.003], ResponseProcessingTime=[0.128], HttpClientSendRequestTime=[0.35], 

Während die zweite richtig auszuführen scheint nicht, was zu „Teilergebnisse“ (206 Fehler):

16/05/15 23:18:00 INFO S3NativeFileSystem: Opening 's3://data-set-builder/splitLineItem2/lineitemad' for reading 
16/05/15 23:18:00 INFO latency: StatusCode=[206], ServiceName=[Amazon S3], AWSRequestID=[10BDDE61AE13AFBE], ServiceEndpoint=[https://data-set-builder.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, Client Execute Time=[296.86], HttpRequestTime=[295.801], HttpClientReceiveResponseTime=[293.667], RequestSigningTime=[0.204], CredentialsRequestTime=[0.002], ResponseProcessingTime=[0.34], HttpClientSendRequestTime=[0.337], 
16/05/15 23:18:02 INFO ApplicationMaster: Waiting for spark context initialization ... 

Ich bin verloren als warum es sogar den zweiten Aufruf von S3NativeFileSystem macht, wenn der erste scheinbar effektiv geantwortet hat und sogar die Datei geteilt hat. Ist das etwas, das ein Produkt meiner EMR-Konfiguration ist? Ich weiß, dass S3Native Probleme mit der Dateibegrenzung hat und dass ein direkter S3-Anruf optimal ist, was ich versucht habe, aber dieser Anruf scheint da zu sein, egal was ich mache. Bitte helfen Sie!

Auch, um ein paar andere Fehlermeldungen in meinem Garn-Log hinzuzufügen, falls sie relevant sind.

1)

16/05/15 23:19:22 ERROR ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application. 
16/05/15 23:19:22 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Timed out waiting for SparkContext.) 

2)

16/05/15 23:19:22 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 (No such file or directory) 
     at java.io.FileOutputStream.open(Native Method) 
     at java.io.FileOutputStream.<init>(FileOutputStream.java:221) 
     at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:162) 
     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:226) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
16/05/15 23:19:22 ERROR BypassMergeSortShuffleWriter: Error while deleting file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 
16/05/15 23:19:22 WARN TaskMemoryManager: leak 32.3 MB memory from [email protected] 
16/05/15 23:19:22 ERROR Executor: Managed memory leak detected; size = 33816576 bytes, TID = 14 
16/05/15 23:19:22 ERROR Executor: Exception in task 13.0 in stage 1.0 (TID 14) 
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/3a/temp_shuffle_b9001fca-bba9-400d-9bc4-c23c002e0aa9 (No such file or directory) 
     at java.io.FileOutputStream.open(Native Method) 
     at java.io.FileOutputStream.<init>(FileOutputStream.java:221) 
     at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88) 
     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

Antwort

3

Rangfolge für die Funken Konfigurationen ist:

SparkContext (Code/Anwendung)> Funken einreichen> Funken-defaults.

conf

So paar Dinge hier zeigen -

  1. Verwenden GARN Cluster als deploy Modus und Master in den Funken einreichen Befehl -

    spark-submit --deploy-mode cluster --master yarn ...

    ODER

    spark-submit --master yarn-cluster ...

  2. Entfernen Sie in Ihrem Code die Zeichenfolge "local" aus der Zeile sc = SparkContext("local", "Simple App"). Verwenden Sie conf = SparkConf().setAppName(appName) sc = SparkContext(conf=conf), um den Spark-Kontext zu initialisieren.

Ref - http://spark.apache.org/docs/latest/programming-guide.html

Verwandte Themen