2016-12-09 2 views
0

Ich versuche zu verstehen, die Protokollausgabe von bestimmten einfachen Programm generiert. Brauchen Sie Hilfe, um die einzelnen Schritte zu verstehen oder einen Verweis auf eine solche Writ-up wäre in Ordnung.Ich versuche zu verstehen, gegebenes Protokoll von Spark-Programm generiert

Befehl sc.parallelize (Array (("a", 1), ("B", 1), ("a", 1), ("a", 1), ("B" , 1), ("b", 1), ("b", 1), ("b", 1)), 3) .map (a => a) .reduceByKey (_ + _) .collect()

Ausgang:

16/12/08 23:41:57 INFO spark.SparkContext: Starting job: collect at <console>:28 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Registering RDD 1 (map at <console>:28) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:28) with 3 output partitions 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (collect at <console>:28) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at map at <console>:28), which has no missing parents 
16/12/08 23:41:57 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.6 KB, free 2.6 KB) 
16/12/08 23:41:57 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1588.0 B, free 4.2 KB) 
16/12/08 23:41:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.17.0.6:31122 (size: 1588.0 B, free: 511.5 MB) 
16/12/08 23:41:57 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at map at <console>:28) 
16/12/08 23:41:57 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks 
16/12/08 23:41:57 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 34b943b3f6ea, partition 0,PROCESS_LOCAL, 2183 bytes) 
16/12/08 23:41:57 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 34b943b3f6ea, partition 1,PROCESS_LOCAL, 2199 bytes) 
16/12/08 23:41:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 34b943b3f6ea:28772 (size: 1588.0 B, free: 511.5 MB) 
16/12/08 23:41:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 34b943b3f6ea:39570 (size: 1588.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 34b943b3f6ea, partition 2,PROCESS_LOCAL, 2200 bytes) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 740 ms on 34b943b3f6ea (1/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 778 ms on 34b943b3f6ea (2/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 66 ms on 34b943b3f6ea (3/3) 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (map at <console>:28) finished in 0.792 s 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: looking for newly runnable stages 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: running: Set() 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1) 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: failed: Set() 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[2] at reduceByKey at <console>:28), which has no missing parents 
16/12/08 23:41:58 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/12/08 23:41:58 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.6 KB, free 6.7 KB) 
16/12/08 23:41:58 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1589.0 B, free 8.3 KB) 
16/12/08 23:41:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.17.0.6:31122 (size: 1589.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (ShuffledRDD[2] at reduceByKey at <console>:28) 
16/12/08 23:41:58 INFO cluster.YarnScheduler: Adding task set 1.0 with 3 tasks 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 34b943b3f6ea, partition 1,NODE_LOCAL, 1894 bytes) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, 34b943b3f6ea, partition 2,NODE_LOCAL, 1894 bytes) 
16/12/08 23:41:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 34b943b3f6ea:39570 (size: 1589.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 34b943b3f6ea:28772 (size: 1589.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 34b943b3f6ea:60986 
16/12/08 23:41:58 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 163 bytes 
16/12/08 23:41:58 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 34b943b3f6ea:60984 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 5, 34b943b3f6ea, partition 0,PROCESS_LOCAL, 1894 bytes) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 4) in 331 ms on 34b943b3f6ea (1/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 351 ms on 34b943b3f6ea (2/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 5) in 29 ms on 34b943b3f6ea (3/3) 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: ResultStage 1 (collect at <console>:28) finished in 0.359 s 
16/12/08 23:41:58 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:28, took 1.381102 s 
res14: Array[(String, Int)] = Array((a,3), (b,5)) 

Antwort

0
  1. Wie Sie sehen können, wird die Verarbeitung gestartet mit collect() - Dies ist die verzögerte Initialisierung, der Funke verwendet. Obwohl Sie map und reduceByKey hatten, wurde der Prozess bei collect gestartet. Wie Karte und reduceByKey sind Transformationen

  2. Sie 3 Partitionen sehen können und jeweils eine Aufgabe, die - da Sie RDD mit 3 Partitionen initialisiert

  3. Ein weiterer Punkt ist, wie jede Karte und reduceByKey Datenlokalität behandelt. Alle drei Aufgaben in map haben PROCESS_LOCAL. Der reduceByKey benötigt eine Datenmischung und Sie haben möglicherweise PROCESS_LOCAL und NODE_LOCAL.

Verwandte Themen