2016-12-14 4 views
1

ich mehrere logstash jdbc Eingänge bin mit:synchronisieren logstash jdbc Eingänge

jdbc { 
    jdbc_driver_library => "../vendor/oracle/ojdbc7.jar" 
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
    connection_retry_attempts => 10 
    connection_retry_attempts_wait_time => 5 
    jdbc_validate_connection => "true" 
    jdbc_connection_string => "connectionString/myDataBase" 
    jdbc_user => "USER_NAME" 
    jdbc_password => "PASSWORD" 
    schedule => "* * * * *" 
    statement_filepath => "myPath/queryA.sql" 
    tracking_column => "myTrackingcolumn" 
    last_run_metadata_path => "myPath/.logstash_jdbc_last_run" 
    type => "documentType" 
    add_field => { 
      "tag" => "myFirstTag" 
      } 
    } 

jdbc { 
    jdbc_driver_library => "../vendor/oracle/ojdbc7.jar" 
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
    connection_retry_attempts => 10 
    connection_retry_attempts_wait_time => 5 
    jdbc_validate_connection => "true" 
    jdbc_connection_string => "connectionString/myDataBase" 
    jdbc_user => "USER_NAME" 
    jdbc_password => "PASSWORD" 
    schedule => "* * * * *" 
    statement_filepath => "myPath/queryB.sql" 
    tracking_column => "myTrackingcolumn" 
    last_run_metadata_path => "myPath/.logstash_jdbc_last_run" 
    type => "documentType" 
    add_field => { 
      "tag" => "mySecondTag" 
      } 
    } 

jdbc { 
    jdbc_driver_library => "../vendor/oracle/ojdbc7.jar" 
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
    connection_retry_attempts => 10 
    connection_retry_attempts_wait_time => 5 
    jdbc_validate_connection => "true" 
    jdbc_connection_string => "connectionString/myDataBase" 
    jdbc_user => "USER_NAME" 
    jdbc_password => "PASSWORD" 
    schedule => "* * * * *" 
    statement_filepath => "myPath/queryC.sql" 
    tracking_column => "myTrackingcolumn" 
    last_run_metadata_path => "myPath/.logstash_jdbc_last_run" 
    type => "documentType" 
    add_field => { 
      "tag" => "myThirdTag" 
      } 
    } 

Da es eine SESSIONS_PER_USER limit für die Datenbank definiert ist, ich bin Abfrage dies den folgenden Fehler provoziert:

[31mPipeline aborted due to error {:exception=>#<Sequel::DatabaseConnectionError: Java::JavaSql::SQLException: ORA-02391: 
exceeded simultaneous SESSIONS_PER_USER limit>, :backtrace=>["oracle.jdbc.driver.T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:450)", "oracle.jdbc.driver. 
T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:392)", "oracle.jdbc.driver.T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:385)", 
"oracle.jdbc.driver.T4CTTIfun.processError(oracle/jdbc/driver/T4CTTIfun.java:938)", "oracle.dbc.driver.T4CTTIoauthenticate.processError(oracle/jdbc/driver/T4CTTIoauthenticate.java:480)", 
"oracle.jdbc.driver.T4CTTIfun.receive(oracle/jdbc/driver/T4CTTIfun.java:655)", "oracle.jdbc.driver.T4CTTIfun.doRPC(oracle/jdbc/driver/T4CTTIfun.java:249)", 
"oracle.jdbc.driver.T4CTTIoauthenticate.doOAUTH(oracle/jdbc/driver/T4CTTIoauthenticate.java:416)", "oracle.jdbc.driver.T4CTTIoauthenticate.doOAUTH(oracle/jdbc/driver/T4CTTIoauthenticate.java:825)", 
"oracle.jdbc.driver.T4CConnection.logon(oracle/jdbc/driver/T4CConnection.java:596)", "oracle.jdbc.driver.PhysicalConnection.<init>(oracle/jdbc/driver/PhysicalConnection.java:715)", 
"oracle.jdbc.driver.T4CConnection.<init>(oracle/jdbc/driver/T4CConnection.java:385)", "oracle.jdbc.driver.T4CDriverExtension.getConnection(oracle/jdbc/driver/T4CDriverExtension.java:30)", 
"oracle.jdbc.driver.OracleDriver.connect(oracle/jdbc/driver/OracleDriver.java:564)", "RUBY.connect(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/adapters/jdbc.rb:222)", 
"RUBY.make_new(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool.rb:110)", "RUBY.make_new(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:226)", 
"RUBY.available(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:199)", "RUBY._acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:135)", 
"RUBY.acquire(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:149)", "RUBY.sync(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:280)", 
"org.jruby.ext.thread.Mutex.synchronize(org/jruby/ext/thread/Mutex.java:149)", "RUBY.sync(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:280)", 
"RUBY.acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:148)", "RUBY.acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/extensions/connection_validator.rb:98)", 
"RUBY.hold(D:myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:106)", "RUBY.synchronize(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/database/connecting.rb:256)", 
"RUBY.test_connection(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/database/connecting.rb:266)", "RUBY.prepare_jdbc_connection(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.1.0/lib/logstash/plugin_mixins/jdbc.rb:173)", 
"RUBY.register(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.1.0/lib/logstash/inputs/jdbc.rb:187)", "RUBY.start_inputs(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:330)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", 
"RUBY.start_inputs(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:329)", "RUBY.start_workers(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:180)", 
"RUBY.run(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:136)", 
"RUBY.start_pipeline(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/agent.rb:473)"], :level=>:error}?[0mstopping pipeline {:id=>"main"} 

So konfigurieren diese Eingaben so logstash die SQL-Abfragen der Reihe nach durchführen und vermeiden, das erlaubte Sitzungslimit zu überschreiten?

Antwort

1

Ich glaube nicht, dass es eine Möglichkeit gibt, die Eingabe der Reihe nach durchzuführen.

Aber die Option schedule vom jdbc-Eingang kann die Häufigkeit der Abfragen reduzieren, um die SESSIONS_PER_USER Grenze zu vermeiden.

Wie es ist: schedule => "* * * * *", Ihr Plugin wird jede Minute eine Verbindung zur db herstellen (siehe here). Sie können stattdessen schedule => "*/15 * * * *" verwenden, die alle 15 Minuten eine Verbindung herstellen (siehe here).

+0

Ich denke, dass dies eine Abhilfe, aber keine dauerhafte Lösung sein kann. Ich muss sicher sein, dass alle Eingaben durchgeführt werden und vermeiden Sie den Absturz der Logstash-Instanz. Der Crontab-Editor ist großartig! – M3HD1

+1

@Mehdi Ja, ich weiß, es ist nur ein Workaround. Eine langfristige Lösung wäre, einen Benutzer pro Eingabe zu verwenden oder das Limit zu erhöhen (was in Ihrem Fall jedoch nicht möglich ist). Eine andere Lösung (aber ich bin mir nicht sicher, ob es funktionieren wird): Legen Sie die Anzahl der Worker und die Batchgröße auf 1 fest (siehe [hier] (https://www.elastic.co/guide/en/logstash/current/command) -line-flags.html)), so dass jeweils nur ein Ereignis verarbeitet wird, das die Eingaben blockieren könnte (die [Dokumentation] (https://www.elastic.co/guide/en/logstash/5.1/execution- model.html) ist darüber unklar) – baudsp

+0

Ich habe versucht, die Logstash-Instanz mit nur einem Pipeline-Arbeiter zu starten, aber das löst das Problem nicht. Wie in dem Dokument erwähnt, scheint es, dass die Pipeline-Arbeiter nur den Filter und die Ausgabe ausführen, nicht aber die Eingaben! (Diese Option legt die Anzahl der Worker fest, die parallel die Filter- und Endstufen der Pipeline ausführen.) – M3HD1

Verwandte Themen