2016-07-20 9 views
0

Ich habe diesen Code unten:Elastic Search Ungültige Muster Gegeben

def main(args: Array[String]) { 
val sparkConf = new SparkConf().setAppName("Spark-Hbase").setMaster("local[2]") 
     .set("es.index.auto.create", "true") 
     .set("es.resource", "test") 
     .set("es.nodes", "127.0.0.1") 
     .set("es.output.json", "true") 

/* More code */ 

DStream.map { 
     _._2 
}.foreachRDD { (rdd: RDD[String]) => 
    EsSpark.saveJsonToEs(rdd, "127.0.0.1") 
} 

ich immer einen Fehler für die elastic search es.nodes Eigenschaft immer sagen:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: invalid pattern given 127.0.0.1/ 
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:50) 
    at org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.compile(AbstractIndexExtractor.java:51) 
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:398) 
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    ... 3 more 

ich etwas falsch von 127.0.0.1 setzen tue ? Ich habe versucht, den Port als 127.0.0.1:9200 setzen, aber es hat immer noch nicht funktioniert. Hat jemand irgendwelche Hinweise? Vielen Dank.

Antwort

1

Egal, ich habe es herausgefunden. Diese Methode saveJsonToEs:

EsSpark.saveJsonToEs(rdd, "127.0.0.1") 

Dauert die IP-Adresse der elastischen Such Cluster, sondern der Index in Elastic Suche auf die String RDD zu speichern.

So soll es so etwas wie diese:

EsSpark.saveJsonToEs(rdd, "test/sampleApp") /* Where `test` is the `index` and `sampleApp` is the `type` */ 

Der zweite Parameter hat Requires the format <index>/<type> in der Art sein. Index und Typ können alles sein, was für Ihre bestimmte Anwendung sinnvoll ist. Beachten Sie, dass der Index zuvor nicht in der elastischen Suche vorhanden sein muss. Sie können die Eigenschaft auf Ihrem Spark Conf Objekt verwenden: set("es.index.auto.create", "true"), um es automatisch zu erstellen. Wie in meinen Eigenschaften oben gezeigt.

Zum Vergleich: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

In dem obigen Link Befehl + f und die Suche nach Writing existing JSON to Elasticsearch.

Verwandte Themen