0

Beim Verarbeiten eines Avro-Nachrichtenstroms über Kafka und Spark speichere ich die verarbeiteten Daten als Dokumente in einem ElasticSearch-Index. Hier ist der Code (vereinfacht):Speichern von Daten in ElasticSearch in Spark-Task

directKafkaStream.foreachRDD(rdd ->{ 

     rdd.foreach(avroRecord -> { 
      byte[] encodedAvroData = avroRecord._2; 
      MyType t = deserialize(encodedAvroData); 

    // Creating the ElasticSearch Transport client 
    Settings settings = Settings.builder() 
      .put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build(); 
    TransportClient client = new PreBuiltTransportClient(settings) 
      .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); 

    IndexRequest indexRequest = new IndexRequest("index", "item", id) 
      .source(jsonBuilder() 
        .startObject() 
        .field("name", name) 
        .field("timestamp", new Timestamp(System.currentTimeMillis())) 
        .endObject()); 

    UpdateRequest updateRequest = new UpdateRequest("index", "item", id) 
      .doc(jsonBuilder() 
        .startObject() 
        .field("name", name) 
        .field("timestamp", new Timestamp(System.currentTimeMillis())) 
        .endObject()) 
      .upsert(indexRequest); 

    client.update(updateRequest).get(); 

    client.close(); 

alles wie erwartet funktioniert; Das einzige Problem ist die Leistung: Das Speichern in ES erfordert etwas Zeit, und ich nehme an, dass dies daran liegt, dass ich einen ES Transport-Client für jede RDD öffne/schließe. Spark documentation schlägt vor, dass dieser Ansatz ziemlich korrekt ist: sobald ich verstehe, die einzige mögliche Optimierung ist die Verwendung von rdd.foreachPartition, aber ich habe nur eine Partition, so dass ich nicht sicher bin, dass dies von Vorteil wäre. Irgendeine andere Lösung, um bessere Leistung zu erzielen?

+0

nur aus Neugier - warum Sie nicht Elasticsearch-hadoop verwenden? –

Antwort

0

Weil Sie neue Verbindung erstellen, wenn Sie einen RDD-Datensatz verarbeiten. Also, ich denke, Verwendung foreachPartition wird bessere Leistung unabhängig von nur einer Partition machen, weil es Ihnen hilft, Ihre ES-Verbindungsinstanz außerhalb bringen, wieder in der Schleife.