2013-07-15 11 views
6

zugreifen Ich schreibe einen Server in Netty, in dem ich einen Anruf an Memcached machen muss. Ich benutze spymemcached und kann den synchronen memcached Anruf leicht tun. Ich möchte, dass dieser Memcached-Aufruf asynchron ist. Ist das möglich? Die mit netty gelieferten Beispiele scheinen nicht hilfreich zu sein.Wie memcached asynchron in Netty

Ich habe versucht, Callbacks zu verwenden: einen Pool ExecutorService in meinem Handler erstellt und einen Callback-Worker an diesen Pool gesendet. Wie folgt aus:

public class MyHandler ChannelInboundMessageHandlerAdapter erweitert <MyPOJO> CallbackInterface implementiert {

... 
    private static ExecutorService pool = Executors.newFixedThreadPool(20); 


    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MyPOJO pojo) { 
     ... 
     CallingbackWorker worker = new CallingbackWorker(key, this); 
     pool.submit(worker); 
     ... 
    } 
    public void myCallback() { 
     //get response 
     this.ctx.nextOutboundMessageBuf().add(response); 
    } 

}

CallingbackWorker wie folgt aussieht:

public class CallingbackWorker i mplements aufrufbare {

public CallingbackWorker(String key, CallbackInterface c) { 
     this.c = c; 
     this.key = key; 
    } 
    public Object call() { 
    //get value from key 
    c.myCallback(value); 
    } 

Allerdings, wenn ich dies tun, this.ctx.nextOutboundMessageBuf() in myCallback stecken bleibt.

Also, insgesamt ist meine Frage: wie async memcached Anrufe in Netty tun?

+0

Ich glaube, ich vermisse etwas Grundlegendes über Netty/NIO hier. In einem NIO-Framework sollte ein Netzwerkanruf asynchron erfolgen. Alle Zeiger würden helfen. –

Antwort

3

Hier gibt es zwei Probleme: ein kleines Problem mit der Art und Weise, wie Sie dies zu codieren versuchen, und ein größeres mit vielen Bibliotheken, die asynchrone Serviceaufrufe bereitstellen, aber keine gute Möglichkeit, sie voll auszunutzen ein asynchrones Framework wie Netty. Das zwingt Benutzer zu suboptimalen Hacks wie diesem oder zu einem weniger schlechten, aber immer noch nicht idealen Ansatz, auf den ich gleich eingehen werde.

Zuerst das Codierungsproblem. Das Problem besteht darin, dass Sie versuchen, eine ChannelHandlerContext-Methode von einem anderen Thread als dem mit Ihrem Handler verknüpften aufzurufen, der nicht zulässig ist. Das ist ziemlich einfach zu beheben, wie unten gezeigt. Man könnte es ein paar andere Möglichkeiten, codieren, aber das ist wahrscheinlich die einfachste:

private static ExecutorService pool = Executors.newFixedThreadPool(20); 

public void channelRead(final ChannelHandlerContext ctx, final Object msg) { 
    //... 

    final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder()); 

    // first wait for the response on a pool thread 
    pool.execute(new Runnable() { 
     public void run() { 
      String value; 
      Exception err; 
      try { 
       value = future.get(3, TimeUnit.SECONDS); // or whatever timeout you want 
       err = null; 
      } catch (Exception e) { 
       err = e; 
       value = null; 
      } 
      // put results into final variables; compiler won't let us do it directly above 
      final fValue = value; 
      final fErr = err; 

      // now process the result on the ChannelHandler's thread 
      ctx.executor().execute(new Runnable() { 
       public void run() { 
        handleResult(fValue, fErr); 
       } 
      }); 
     } 
    }); 
// note that we drop through to here right after calling pool.execute() and 
// return, freeing up the handler thread while we wait on the pool thread. 
} 

private void handleResult(String value, Exception err) { 
    // handle it 
} 

das funktionieren wird, und könnte für Ihre Anwendung ausreichend sein. Aber Sie haben einen Threadpool mit fester Größe. Wenn Sie also mehr als 20 gleichzeitige Verbindungen verwalten, wird dies zu einem Engpass. Sie könnten die Poolgröße erhöhen oder eine unbegrenzte verwenden, aber zu diesem Zeitpunkt können Sie genauso gut unter Tomcat laufen, da Speicherverbrauch und Kontextwechsel-Overhead zu Problemen werden und Sie die Skalierbarkeit verlieren, auf die es ankam Netty an erster Stelle!

Und die Sache ist, Spymemcached ist NIO-basiert, ereignisgesteuert, und verwendet nur einen Thread für all seine Arbeit, bietet jedoch keine Möglichkeit, seine ereignisgesteuerte Natur voll auszunutzen. Ich nehme an, dass sie das zu früh beheben werden, so wie Netty 4 und Cassandra kürzlich Callback-Methoden (Listener) auf Future-Objekten zur Verfügung gestellt haben.

Während ich im selben Boot wie Sie war, recherchierte ich die Alternativen und nicht zu glücklich mit dem, was ich gefunden habe, schrieb ich (gestern) a Future tracker class, die bis zu Tausenden von Futures zu einer konfigurierbaren Rate abrufen und anrufen können Sie zurück auf den Thread (Executor) Ihrer Wahl, wenn sie abgeschlossen sind. Es verwendet nur einen Thread, um dies zu tun. Ich habe put it up on GitHub, wenn Sie es ausprobieren möchten, aber seien Sie gewarnt, dass es immer noch nass ist, wie sie sagen.Ich habe es am vergangenen Tag viel getestet, und selbst mit 10000 simultanen Future-Objekten, die einmal pro Millisekunde abfragen, ist ihre CPU-Auslastung vernachlässigbar, obwohl sie über 10000 hinausgeht. Das obige Beispiel sieht so aus :

// in some globally-accessible class: 

public static final ForeignFutureTracker FFT = new ForeignFutureTracker(1, TimeUnit.MILLISECONDS); 

// in a handler class: 

public void channelRead(final ChannelHandlerContext ctx, final Object msg) { 
// ... 

    final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder()); 

    // add a listener for the Future, with a timeout in 2 seconds, and pass 
    // the Executor for the current context so the callback will run 
    // on the same thread. 
    Global.FFT.addListener(future, 2, TimeUnit.SECONDS, ctx.executor(), 
    new ForeignFutureListener<String,GetFuture<String>>() { 

     public void operationSuccess(String value) { 
     // do something ... 
     ctx.fireChannelRead(someval); 
     } 
     public void operationTimeout(GetFuture<String> f) { 
     // do something ... 
     } 
     public void operationFailure(Exception e) { 
     // do something ... 
     } 
    }); 
} 

Sie wollen nicht mehr als ein oder zwei FFT-Instanzen jederzeit aktiv, oder sie könnten eine Belastung für die CPU werden. Aber eine einzelne Instanz kann Tausende von ausstehenden Futures verarbeiten; Der einzige Grund, einen zweiten zu haben, bestünde darin, Anrufe mit höherer Latenz wie S3 mit einer langsameren Abfragerate, etwa 10-20 Millisekunden, zu bewältigen.

Ein Nachteil des Polling-Ansatzes ist, dass er eine geringe Latenzzeit hinzufügt. Wenn zum Beispiel einmal pro Millisekunde abgefragt wird, addiert sich die Antwortzeit im Durchschnitt um 500 Mikrosekunden. Das wird für die meisten Anwendungen kein Problem sein, und ich denke, dass die Speicher- und CPU-Einsparungen gegenüber dem Thread-Pool-Ansatz mehr als ausgeglichen werden.

Ich erwarte innerhalb eines Jahres oder so wird dies kein Problem sein, da mehr async Clients Callback-Mechanismen zur Verfügung stellen, mit denen Sie NIO und das ereignisgesteuerte Modell voll nutzen können.

+0

die Art von Graal, die ich suchte, um von blockierenden Anruf abzufragen, ja auf dem gleichen Boot, nettes Schaf, das Sie uns THX viel geliefert haben –