2016-05-09 2 views
0

Ich versuche, eine große Menge asynchron eine HTTP-Anfragen an einen Server zu senden. Mein Ziel ist es, jede Antwort mit ihrer ursprünglichen Anfrage zu vergleichen.Wie kann ich mehrere HTTP-Anfragen asynchron mit Netty senden?

Dazu verfolge ich die Netty Snoop example.

In diesem Beispiel (und den anderen http-Beispielen) wird jedoch nicht behandelt, wie mehrere Anfragen asynchron gesendet werden oder wie diese anschließend mit den entsprechenden Anforderungen verknüpft werden.

Alle ähnlichen Fragen (wie this one, this one oder this one, die SimpleChannelUpstreamHandler Klasse implementieren, die von Netty 3 ist und existiert nicht mehr in 4,0 (documentation netty 4.0)

Wer hat eine Idee, wie diese zu lösen in netty 4,0

Edit:?

Mein Problem ist, obwohl ich viele Nachrichten an den Kanal zu schreiben, ich die Antworten nur sehr langsam erhalten (1 Antwort/sec, wHE reas eine Hoffnung, einige tausend/sec zu erhalten). Um dies zu verdeutlichen, bitte ich, was ich bisher bekommen habe. Ich bin sicher, dass der Server, an den ich die Anfragen sende, auch viel Verkehr bewältigen kann.

Was ich so weit gekommen:

import java.net.URI 
import java.nio.charset.StandardCharsets 
import java.io.File 

import io.netty.bootstrap.Bootstrap 
import io.netty.buffer.{Unpooled, ByteBuf} 
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer} 
import io.netty.channel.socket.SocketChannel 
import io.netty.channel.socket.nio.NioSocketChannel 
import io.netty.handler.codec.http._ 
import io.netty.handler.timeout.IdleStateHandler 
import io.netty.util.{ReferenceCountUtil, CharsetUtil} 
import io.netty.channel.nio.NioEventLoopGroup 

import scala.io.Source 

object ClientTest { 

    val URL = System.getProperty("url", MY_URL)  
    val configuration = new Configuration 

    def main(args: Array[String]) { 
    println("Starting client") 
    start() 
    } 

    def start(): Unit = { 

    val group = new NioEventLoopGroup() 

    try { 

     val uri: URI = new URI(URL) 
     val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"} 
     val port: Int = {val p = uri.getPort; if (p != -1) p else 80} 

     val b = new Bootstrap() 

     b.group(group) 
     .channel(classOf[NioSocketChannel]) 
     .handler(new HttpClientInitializer()) 

     val ch = b.connect(host, port).sync().channel() 

     val logFolder: File = new File(configuration.LOG_FOLDER) 
     val fileToProcess: Array[File] = logFolder.listFiles() 

     for (file <- fileToProcess){ 
     val name: String = file.getName() 
     val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name) 

     val lineIterator: Iterator[String] = source.getLines() 

     while (lineIterator.hasNext) { 
      val line = lineIterator.next() 
      val jsonString = parseLine(line) 
      val request = createRequest(jsonString, uri, host) 
      ch.writeAndFlush(request) 
     } 
     println("closing") 
     ch.closeFuture().sync() 
     } 
    } finally { 
     group.shutdownGracefully() 
    } 
    } 

    private def parseLine(line: String) = { 
    //do some parsing to get the json string I want 
    } 

    def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = { 
    val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8) 

    val request: FullHttpRequest = new DefaultFullHttpRequest(
     HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath()) 
    request.headers().set(HttpHeaders.Names.HOST, host) 
    request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE) 
    request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP) 
    request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json") 

    request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes()) 
    request.content().clear().writeBytes(bytebuf) 

    request 
    } 
} 

class HttpClientInitializer() extends ChannelInitializer[SocketChannel] { 

    override def initChannel(ch: SocketChannel) = { 
    val pipeline = ch.pipeline() 

    pipeline.addLast(new HttpClientCodec()) 

    //aggregates all http messages into one if content is chunked 
    pipeline.addLast(new HttpObjectAggregator(1048576)) 

    pipeline.addLast(new IdleStateHandler(0, 0, 600)) 

    pipeline.addLast(new HttpClientHandler()) 
    } 
} 

class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] { 

    override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { 
    try { 
     msg match { 
     case res: FullHttpResponse => 
      println("response is: " + res.content().toString(CharsetUtil.US_ASCII)) 
      ReferenceCountUtil.retain(msg) 
     } 
    } finally { 
     ReferenceCountUtil.release(msg) 
    } 
    } 

    override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = { 
    println("HttpHandler caught exception", e) 
    ctx.close() 
    } 
} 
+0

Ist schreiben nicht asynchron Kanal? Als Ergebnis des Schreibens erhalten Sie Future, was Ihnen überlassen ist, wie Sie damit umgehen können. – user1582639

+0

Ich lerne auch Netty 4.0. Hier ist mein Verständnis von Design. Das erste, was ich im Hinterkopf habe, ist, dass Sie in Netty 4 die Gewissheit haben, dass alle registrierten Handler in einem einzigen Thread ausgeführt werden, so dass keine Synchronisation erforderlich ist, es sei denn, Sie verwenden Shared Handler. Daher werden alle Ihre eingereichten Anfragen nacheinander über den Kanal gesendet und die Antworten werden in der gleichen Reihenfolge empfangen. Wenn Sie also die Datenstruktur wie die Warteschlange in Ihrem Duplex-Handler für alle Anfragen verwalten, können Sie immer die entsprechende Anfrage für die zuletzt erhaltene Antwort abfragen. – user1582639

+0

Danke für die Antworten! Mein Problem ist, obwohl ich viele Nachrichten auf den Kanal schreibe, bekomme ich nur sehr langsam die Antworten (1 Antwort/Sek., Während ich hoffe, einige tausend/Sek. Zu erhalten). Um dies zu verdeutlichen, bitte ich, was ich bisher bekommen habe. – Mart

Antwort

0

ChannelFuture cf = channel.writeAndFlush (erzeugeAnf());

noch wie sie später mit den entsprechenden Anfragen verknüpft werden.

Can netty assign multiple IO threads to the same Channel?

der Arbeiter-Thread einmal für einen zugewiesenen Kanal für die gesamte Lebensdauer des Kanals sich nicht ändert. Wir profitieren also nicht von den Threads. Das liegt daran, dass Sie die Verbindung aufrechterhalten und der Kanal am Leben erhalten wird.

Um dieses Problem zu beheben, könnten Sie einen Pool von Kanälen in Betracht ziehen (sagen wir 30). Verwenden Sie dann den Kanalpool, um Ihre Anforderungen zu platzieren.

 int concurrent = 30; 

    // Start the client. 
    ChannelFuture[] channels = new ChannelFuture[concurrent]; 
    for (int i = 0; i < channels.length; i++) { 
    channels[i] = b.connect(host, port).sync(); 
    } 

    for (int i = 0; i < 1000; i++) { 
     ChannelFuture requestHandle = process(channels[(i+1)%concurrent]); 
     // do something with the request handle  
    } 

    for (int i = 0; i < channels.length; i++) { 
    channels[i].channel().closeFuture().sync(); 
    } 

HTH

+0

Ich denke, dass sogar mit einem Kanal er mehr als 1 Nachricht pro Sekunde von der Serverseite erreichen sollte.Ich gehe davon aus, dass der Autor den Kanal mit Anfragen stapelt, die keine Chance geben, Antworten zu verarbeiten. – user1582639

+0

concurrency - Zeit (Millisekunden) 1000-Anforderungen zu verarbeiten (wird basierend auf dem Endpunkt variieren) 1-55.219 10-48.749 30-13.364 100-29.106 bis eine Zeit Zunahme der Anzahl von Threads, die Leistung verbessert, sondern später Kontextwechsel beeinflusst die Leistung. –

+0

user1582639 Was Sie erwähnt haben, war in der Tat eines der Probleme. Durch die Begrenzung der Anzahl der Anfragen wurde die Leistung erheblich verbessert. @AmodPandey: die Verwendung eines Pools von Kanälen funktioniert auch und verbessert die Leistung noch weiter. Ich verstehe jedoch immer noch nicht, wie man die Initialresquests mit der entsprechenden Antwort verknüpft. writeAndFlush gibt tatsächlich eine ChannelFurture zurück, aber wenn ich einen Listener hinzufüge, wird dieser abgeschlossen, sobald die Anfrage gesendet wurde, was es mir nicht erlaubt, sie irgendwie mit der Antwort zu verbinden. – Mart

Verwandte Themen