Hey Ich versuche, einige strraming Metriken Cloudwatch über StreamingListenerFunken StreamingListener Cloudwatch Integration
so etwas zu Ende:
class MyStreamingListener()
extends StreamingListener{
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted):Unit={
val cloudWatch = new AmazonCloudWatchClient(new BasicAWSCredentials(awsAccessKeyId, awsSecretKey))
cloudWatch.setEndpoint("monitoring.eu-west-1.amazonaws.com")
val putMetricDataRequest = new PutMetricDataRequest()
putMetricDataRequest.setNamespace("my-name-space")
val metricDatum = new MetricDatum().withMetricName("test")
metricDatum.setValue(batchCompleted.batchInfo.numRecords)
metricDatum.setUnit(StandardUnit.fromValue("Milliseconds"))
putMetricDataRequest.getMetricData.add(metricDatum)
cloudWatch.putMetricData(putMetricDataRequest)
}
}
und es dann in saprkStreaming verwenden:
val streamingContext: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
streamingContext.addStreamingListener(new LoadIndexStreamingListener)
val dstream = KinesisUtils.createStream(
streamingContext, "this-is-just-a-test", "my-stream", "kinesis.eu-west-1.amazonaws.com",
"eu-west-1", InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
.map(byteArray => new String(byteArray))
dstream.print()
streamingContext.start()
streamingContext.awaitTermination()
Als ich einen Test mit Spark-Shell auf meinem Cluster (EMR) durchführte, funktionierte es Ok und Metriken wurden an CloudWacth
gesendetaber wenn ich mit sbt clean assembly
meinen Code in ein Gefäß gepackt und es mit Funken läuft vorlegen, bekam ich folgende Fehlermeldung:
java.lang.NoSuchMethodError: com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.putMetricData(Lcom/amazonaws/services/cloudwatch/model/PutMetricDataRequest;)Lcom/amazonaws/services/cloudwatch/model/PutMetricDataResult;
dies ist der Funke -submit Befehl, den ich versuchte:
spark-submit --class com.me.sparkTest.App --master local[4] --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,com.amazonaws:amazon-kinesis-client:1.7.2 clowdwatch-spark-test-assembly-1.0.jar
Irgendeine Idee, die verursacht, dass es fehlschlägt, wenn es Funken-Einreichen verwendet?
Problem mit Abhängigkeiten Version ist erstellen, mit der folgenden Abhängigkeit sein können Sie andere Version in Ihrer Assembly jar – Kaushal
ich es verwenden markiert, wie in der Build bereitgestellt .sbt also benutzt es das Paket, das in der Spark-Submit angegeben ist .... –
Aber Sie kompilieren Ihr Glas damit, also müssen Sie die gleiche Version verwenden. – Kaushal