2

Ich führe einen Spark-Job in einem Google-Cluster aus und versuche, während eines RDD-Map-Prozesses einige Protokollierungsinformationen zu erfassen. Schnell Beispiel:Abrufen von Protokollausgaben von Spark-Mitarbeitern in Google Cloud

object LoggerWrapper extends Serializable{ 
    @transient lazy val logger=Logger.getLogger("myLogger") 
} 
object Processing{ 
... 
    rdd.map(x=>{ 
     LoggerWrapper.logger.info("processing:"+x) 
     foo(x) 
    }) 
    ... 
    sparkContext.stop 
} 

Ich bin nach der Methodik beschrieben here mit den Richtungen kombiniert in the Spark webpage gefunden. Die resultierende log4j.properties wird am Ende angezeigt. Die Datei wird mit dem Flag --files des Befehls gcloud hochgeladen (siehe unten). Ich habe auch die Datei "yarn-site.xml" so aktualisiert, dass die Eigenschaft yarn.log-aggregation-enable auf true gesetzt ist.

Meine erste Frage ist, dass, wenn ich vom Master-Knoten yarn logs -application <applicationID> laufen immer bekomme ich die Fehlermeldung "Log-Aggregation wurde nicht abgeschlossen oder ist nicht aktiviert." Gibt es noch etwas anderes, was getan werden muss, um die Nachrichten zu sammeln?

Die zweite Frage ist, ob es möglich ist, die Protokollmeldungen aller Worker in der Konsolenausgabe zu erhalten, während der Prozess läuft. Wenn der Spark-Job beispielsweise ein Streaming-Job ist, möchte ich die Nachrichten erhalten, während der Job ausgeführt wird.

log4j.properties:

log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender 
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout 
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c - %m%n 
log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.RollingAppender.File=${spark.yarn.app.container.log.dir}/spark.log 
log4j.appender.RollingAppender.DatePattern='.'yyyy-MM-dd 
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout 
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n 

log4j.appender.RollingAppenderU=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.RollingAppenderU.File=${spark.yarn.app.container.log.dir}/sparkU.log 
log4j.appender.RollingAppenderU.DatePattern='.'yyyy-MM-dd 
log4j.appender.RollingAppenderU.layout=org.apache.log4j.PatternLayout 
log4j.appender.RollingAppenderU.layout.ConversionPattern=[%p] %d %c %M - %m%n 


# By default, everything goes to console and file 
log4j.rootLogger=INFO, RollingAppender, myConsoleAppender 

# My custom logging goes to another file 
log4j.logger.myLogger=INFO, RollingAppenderU, myConsoleAppender 

# The noisier spark logs go to file only 
log4j.logger.spark.storage=INFO, RollingAppender 
log4j.additivity.spark.storage=false 
log4j.logger.spark.scheduler=INFO, RollingAppender 
log4j.additivity.spark.scheduler=false 
log4j.logger.spark.CacheTracker=INFO, RollingAppender 
log4j.additivity.spark.CacheTracker=false 
log4j.logger.spark.CacheTrackerActor=INFO, RollingAppender 
log4j.additivity.spark.CacheTrackerActor=false 
log4j.logger.spark.MapOutputTrackerActor=INFO, RollingAppender 
log4j.additivity.spark.MapOutputTrackerActor=false 
log4j.logger.spark.MapOutputTracker=INFO, RollingAppender 
log4j.additivty.spark.MapOutputTracker=false 

gcloud Befehl: gcloud dataproc jobs submit spark --cluster myCluster --properties spark.driver.memory=1000m,spark.driver.maxResult=512m,spark.executor.memory=1000m --jars gs://path/to/jar/myJar.jar --files /absolute/path/to/local/file/log4j.properties --class contextual.wikidata.spark.jobs.$1 <application-arguments>

Antwort

3

Wie Sie in der ContainerManagerImpl, die Überprüfung, ob Protokollaggregation aktiviert sehen kann, ist innerhalb NodeManager Code:

protected LogHandler createLogHandler(Configuration conf, Context context, 
    DeletionService deletionService) { 
    if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, 
     YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { 
    return new LogAggregationService(this.dispatcher, context, 
     deletionService, dirsHandler); 
    } else { 
    return new NonAggregatingLogHandler(this.dispatcher, deletionService, 
             dirsHandler, 
             context.getNMStateStore()); 
    } 
} 

Und zusätzlich scheint als Teil der Initialisierung, bei der Erstellung dergetan werdenInstanz zum ersten Mal; Das bedeutet, dass der Konfigurationswert für alle Worker-Knoten bereitgestellt werden muss und vor dem Start/Neustart von nodemanagers in der Konfiguration enthalten sein muss.

In Dataproc, anstatt manuell die yarn-site.xml Datei selbst zu modifizieren, müssen Sie nur das viel einfacher --properties Flag verwenden, wenn der Cluster, und der Config-Schlüssel erzeugt wird richtig in allen Knoten vor Daemons fertig starten:

gcloud dataproc clusters create my-cluster \ 
    --properties yarn:yarn.log-aggregation-enable=true 

auch, müssen Sie sicherstellen, dass Ihr yarn logs Befehl als der Benutzer ausführen, die die Jobs oder GARN lief versuchen, innerhalb der falschen $USER Verzeichnis im Log-Aggregation Verzeichnis suchen:

sudo yarn logs -applicationId <applicationId> 
+0

Vielen Dank für die Antwort! Ist es möglich, benutzerdefinierte Protokolle in einem Streaming-Job zu lesen, während die Stream-Verarbeitung noch läuft? – orestis

+1

Jetzt ist dies wahrscheinlich am besten über die [YARN UI] (https://cloud.google.com/dataproc/concepts/cluster-web-interfaces) möglich und navigieren Sie direkt zu den Containerprotokollen, indem Sie auf Ihren Link klicken Anwendung ausführen und Links zu Containerprotokollen auf der linken Seite finden. Es gibt Pläne, die Stackdriver-Integration von Dataproc in Zukunft weiter zu verbessern, wo Containerprotokolle über die Cloud Logging-Benutzeroberfläche zugänglich wären, aber noch kein konkretes Datum. –

Verwandte Themen