2017-09-26 1 views
0

Es scheint, dass ich die Fehlerbehandlung bei der Verwendung von Akka Streams nie richtig bekomme.Flow ist mit Fehler fehlgeschlagen Herunterfahren wegen Verletzung der Reactive Streams-Spezifikation

Das ist also mein Code

var db = Database.forConfig("oracle") 
var mysqlDb = Database.forConfig("mysql_read") 
var mysqlDbWrite = Database.forConfig("mysql_write") 

implicit val actorSystem = ActorSystem() 
val decider : Supervision.Decider = { 
    case _: Exception => 
     println("got an exception restarting connections") 
    // let us restart our connections 
    db.close() 
    mysqlDb.close() 
    mysqlDbWrite.close() 
    db = Database.forConfig("oracle") 
    mysqlDb = Database.forConfig("mysql_read") 
    mysqlDbWrite = Database.forConfig("mysql_write") 
    Supervision.Restart 
} 
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) 

und ich habe eine Strömung wie diese

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(10){ foo => 
    try { 
    val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long] 
    mysqlDbWrite.run(existsQuery).map(v => (foo, v)) 
    } catch { 
    case e: Throwable => 
     println(s"Lookup failed for ${foo}") 
     throw e // will restart the stream 
    } 
}.collect {case (f, v) if v.isEmpty => f} 

Also im Grunde, wenn die foo bereits in MySQL existiert dann der Datensatz sollte durch die nicht weiter verarbeitet werden Strom.

Meine Hoffnung mit diesem Code war, dass, wenn etwas mit der MySQL-Lookup fehlschlägt (die Mysql-Maschine ist ziemlich schlecht und Timeouts sind üblich), wird der Datensatz gedruckt und verworfen und der Stream wird mit den verbleibenden Aufzeichnungen mit freundlicher Genehmigung der Aufsicht.

Wenn ich diesen Code ausführen. Ich sehe Fehler wie

[error] (mysql_write network timeout executor) java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state 
java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5576) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.sql.SQLException: Invalid socket timeout value or state 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872) 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4852) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketException: Socket is closed 
    at java.net.Socket.setSoTimeout(Socket.java:1137) 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

und

[error] (mysql_write network timeout executor) java.lang.NullPointerException 
java.lang.NullPointerException 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Eine Sache, die mich hier überrascht, ist, dass diese Ausnahmen von meinem catch-Block nicht kommen. weil ich die println-Anweisung meines catch-Blocks nicht sehe. Der Stack-Trace zeigt mir nicht, woher er stammt ... aber da es mysql_write heißt, kann ich davon ausgehen, dass es sich um den obigen Flow handelt, da nur dieser Flow mysql_write verwendet.

schließlich der gesamte Strom stürzt mit dem Fehler

[trace] Stack trace suppressed: run last compile:runMain for the full output. 
flow has failed with error Shutting down because of violation of the Reactive Streams specification. 
14:51:06,973 |-INFO in ch.qos.logback.classic.AsyncAppender[asyncKafkaAppender] - Worker thread will flush remaining events before exiting. 
[success] Total time: 3480 s, completed Sep 26, 2017 2:51:07 PM 
14:51:07,603 |-INFO in [email protected] - Sleeping for 1 seconds 

Ich weiß nicht, was ich mit der reaktiven Strom Spezifikation zu verletzen habe !!

Antwort

1

Ein erster Versuch, eine berechenbarere Lösung zu bekommen, wäre, das Blockierungsverhalten zu entfernen (Await.result) und mapAsync zu verwenden. Ein Umschreiben der alreadyExistsFilter fließen könnte sein:

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(3) { foo ⇒ 
    val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long] 
    foo → Await.result(mysqlDbWrite.run(existsQuery), Duration.Inf) 
    }.collect{ 
    case (foo, res) if res.isDefined ⇒ foo 
    } 

Weitere Informationen in Akka auf der Blockierung in den docs finden.

+0

OK das Problem kommt wieder. Ich aktualisiere meinen Beitrag oben. –

0

Die Antwort von Stefano ist korrekt. Der Fehler kam tatsächlich, weil Code im Flow blockiert wurde.

Obwohl mein ursprüngliches Programm gegen scala 2.11 ausgeführt wurde und auch nach dem Wechsel zu mapAsync, blieb das Problem bestehen.

Da dies ein Befehlszeilenwerkzeug ist, war es für mich einfach, zu scala 2.12 zu wechseln und es erneut zu versuchen.

Als ich mit Scala 2.12 versuchte, funktionierte es perfekt.

Eine Sache, die mir sehr geholfen hat, ist "ch.qos.logback" % "logback-classic" % "1.2.3", in den Abhängigkeiten zu haben. Dies zeigt Ihnen jede einzelne SQL-Anweisung, die gerade ausgeführt wird, und Sie können leicht sehen, ob etwas schief läuft.

Verwandte Themen