0

Hey Ich versuche, einige strraming Metriken Cloudwatch über StreamingListenerFunken StreamingListener Cloudwatch Integration

so etwas zu Ende:

class MyStreamingListener() 
extends StreamingListener{ 

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted):Unit={ 
val cloudWatch = new AmazonCloudWatchClient(new BasicAWSCredentials(awsAccessKeyId, awsSecretKey)) 
    cloudWatch.setEndpoint("monitoring.eu-west-1.amazonaws.com") 
    val putMetricDataRequest = new PutMetricDataRequest() 
    putMetricDataRequest.setNamespace("my-name-space") 
    val metricDatum = new MetricDatum().withMetricName("test") 
    metricDatum.setValue(batchCompleted.batchInfo.numRecords) 
    metricDatum.setUnit(StandardUnit.fromValue("Milliseconds")) 
    putMetricDataRequest.getMetricData.add(metricDatum) 
    cloudWatch.putMetricData(putMetricDataRequest) 
} 
} 

und es dann in saprkStreaming verwenden:

val streamingContext: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(2)) 
    streamingContext.addStreamingListener(new LoadIndexStreamingListener) 

    val dstream = KinesisUtils.createStream(
     streamingContext, "this-is-just-a-test", "my-stream", "kinesis.eu-west-1.amazonaws.com", 
     "eu-west-1", InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) 
     .map(byteArray => new String(byteArray)) 
    dstream.print() 
    streamingContext.start() 
    streamingContext.awaitTermination() 

Als ich einen Test mit Spark-Shell auf meinem Cluster (EMR) durchführte, funktionierte es Ok und Metriken wurden an CloudWacth

gesendet

aber wenn ich mit sbt clean assembly meinen Code in ein Gefäß gepackt und es mit Funken läuft vorlegen, bekam ich folgende Fehlermeldung:

java.lang.NoSuchMethodError: com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.putMetricData(Lcom/amazonaws/services/cloudwatch/model/PutMetricDataRequest;)Lcom/amazonaws/services/cloudwatch/model/PutMetricDataResult; 

dies ist der Funke -submit Befehl, den ich versuchte:

spark-submit --class com.me.sparkTest.App --master local[4] --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,com.amazonaws:amazon-kinesis-client:1.7.2 clowdwatch-spark-test-assembly-1.0.jar 

Irgendeine Idee, die verursacht, dass es fehlschlägt, wenn es Funken-Einreichen verwendet?

+0

Problem mit Abhängigkeiten Version ist erstellen, mit der folgenden Abhängigkeit sein können Sie andere Version in Ihrer Assembly jar – Kaushal

+0

ich es verwenden markiert, wie in der Build bereitgestellt .sbt also benutzt es das Paket, das in der Spark-Submit angegeben ist .... –

+0

Aber Sie kompilieren Ihr Glas damit, also müssen Sie die gleiche Version verwenden. – Kaushal

Antwort

0

Der Fehler wurde früher verursacht, da die kompilierte Klasse für AmazonCloudWatchClient.putMetricData eine andere Signatur als die im EMR-Cluster verfügbaren Laufzeitbibliotheken hatte.

Die Lösung dieses Problems ein uber jar

<dependency> 
    <groupId>com.amazonaws</groupId> 
    <artifactId>aws-java-sdk</artifactId> 
    <version>1.10.75.1</version> 
</dependency> 
Verwandte Themen