2016-06-30 4 views
0

Also habe ich diesen Befehl rake environment elasticsearch:import:model CLASS='AutoPartsMapper' FORCE=true ausgeführt, um Dokumente in elasticsearch zu indizieren.In meiner Datenbank habe ich 10 000 000 Datensätze =) ... es dauert (glaube ich) eines Tages, dies zu indexieren ... Als Indexierung lief mein Computer ausgeschaltet ... (Ich habe 2 000 000 Dokumente indexiert) Ist es möglich, Dokumente weiter zu indexieren?Wie können Dokumente in elasticsearch (Rails) weiterindexiert werden?

Antwort

0

Es gibt keine solche Funktionalität in elasicsearch-rails afaik, aber Sie könnten eine einfache Aufgabe schreiben, das zu tun.

namespace :es do 
    task :populate, [:start_id] => :environment do |_, args| 
    start_id = args[:start_id].to_i 

    AutoPartsMapper.where('id > ?', start_id).order(:id).find_each do |record| 
     puts "Processing record ##{record.id}" 
     record.__elasticsearch__.index_document 
    end 
    end 
end 

es mit bundle exec rake es:populate[<start_id>] Starten Sie die ID des Datensatzes von dem Passieren der nächsten Charge zu starten.

Beachten Sie, dass dies eine einfache Lösung ist, die viel langsamer als Batch-Indizierung sein wird.

UPDATE

Hier ist eine Batch-Indizierung. Es ist viel schneller und erkennt automatisch den Datensatz, von dem fortgefahren werden soll. Es wird davon ausgegangen, dass zuvor importierte Datensätze in aufsteigender Reihenfolge und ohne Lücken verarbeitet wurden. Ich habe es nicht getestet, aber der meiste Code stammt aus einem Produktionssystem.

namespace :es do 
    task :populate_auto => :environment do |_, args| 
    start_id = get_max_indexed_id 
    AutoPartsMapper.find_in_batches(batch_size: 1000).where('id > ?', start_id).order(:id) do |records| 
     elasticsearch_bulk_index(records) 
    end 
    end 

    def get_max_indexed_id 
    AutoPartsMapper.search(aggs: {max_id: {max: {field: :id }}}, size: 0).response[:aggregations][:max_id][:value].to_i 
    end 

    def elasticsearch_bulk_index(records) 
    return if records.empty? 
    klass = records.first.class 
    klass.__elasticsearch__.client.bulk({ 
     index: klass.__elasticsearch__.index_name, 
     type: klass.__elasticsearch__.document_type, 
     body: elasticsearch_records_to_index(records) 
    }) 
    end 

    def self.elasticsearch_records_to_index(records) 
    records.map do |record| 
     payload = { _id: record.id, data: record.as_indexed_json } 
     { index: payload } 
    end 
    end 
end 
0

Wenn Sie Schienen 4.2+ verwenden, können Sie mit ActiveJob planen und lassen Sie es laufen. Also, zuerst erzeugen sie mit diesem

bin/rails generate job elastic_search_index 

Dies wird Ihnen Klasse und Methode geben ausführen:

class ElasticSearchIndexJob < ApplicationJob 
    def perform 
    # impleement here indexing 
    AutoPartMapper.__elasticsearch__.create_index! force:true 
    AutoPartMapper.__elasticsearch__.import 
    end 
end 

Stellen Sie den sidekiq als aktive Job-Anbieter und von der Konsole initiieren dies mit:

ElasticSearchIndexJob.perform_later 

Dadurch wird der aktive Job festgelegt und beim nächsten freien Job ausgeführt, aber die Konsole wird freigegeben. Sie können es laufen lassen und den Prozess in bash später überprüfen:

ps aux | grep side 

dies wird Ihnen so etwas wie: sidekiq 4.1.2 app[1 of 12 busy]

Werfen Sie einen Blick auf diese Stelle, die sie

http://ruby-journal.com/how-to-integrate-sidekiq-with-activejob/

erklärt Hoffe, es hilft

+0

Beachten Sie, dass die Frage über die kontinuierliche Indexierung von wo der letzte Lauf abgeschlossen ist. Ein Hintergrundjob muss immer noch auf einmal ausgeführt werden, was möglicherweise nicht machbar ist. –

+0

Entschuldigung, die Frage wurde später aktualisiert. Ich denke, dass der Prozess neu gestartet werden muss. Ich würde sidekiq definitiv dafür verwenden, da Sie nicht laufen müssen, während der Prozess mit perform_later läuft ... sidekiq hat Protokolle, die tailed und auf Fehler überwacht werden können –

Verwandte Themen