Ich versuche, eine große Menge asynchron eine HTTP-Anfragen an einen Server zu senden. Mein Ziel ist es, jede Antwort mit ihrer ursprünglichen Anfrage zu vergleichen.Wie kann ich mehrere HTTP-Anfragen asynchron mit Netty senden?
Dazu verfolge ich die Netty Snoop example.
In diesem Beispiel (und den anderen http-Beispielen) wird jedoch nicht behandelt, wie mehrere Anfragen asynchron gesendet werden oder wie diese anschließend mit den entsprechenden Anforderungen verknüpft werden.
Alle ähnlichen Fragen (wie this one, this one oder this one, die SimpleChannelUpstreamHandler Klasse implementieren, die von Netty 3 ist und existiert nicht mehr in 4,0 (documentation netty 4.0)
Wer hat eine Idee, wie diese zu lösen in netty 4,0
Edit:?
Mein Problem ist, obwohl ich viele Nachrichten an den Kanal zu schreiben, ich die Antworten nur sehr langsam erhalten (1 Antwort/sec, wHE reas eine Hoffnung, einige tausend/sec zu erhalten). Um dies zu verdeutlichen, bitte ich, was ich bisher bekommen habe. Ich bin sicher, dass der Server, an den ich die Anfragen sende, auch viel Verkehr bewältigen kann.
Was ich so weit gekommen:
import java.net.URI
import java.nio.charset.StandardCharsets
import java.io.File
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.{Unpooled, ByteBuf}
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer}
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http._
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.{ReferenceCountUtil, CharsetUtil}
import io.netty.channel.nio.NioEventLoopGroup
import scala.io.Source
object ClientTest {
val URL = System.getProperty("url", MY_URL)
val configuration = new Configuration
def main(args: Array[String]) {
println("Starting client")
start()
}
def start(): Unit = {
val group = new NioEventLoopGroup()
try {
val uri: URI = new URI(URL)
val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"}
val port: Int = {val p = uri.getPort; if (p != -1) p else 80}
val b = new Bootstrap()
b.group(group)
.channel(classOf[NioSocketChannel])
.handler(new HttpClientInitializer())
val ch = b.connect(host, port).sync().channel()
val logFolder: File = new File(configuration.LOG_FOLDER)
val fileToProcess: Array[File] = logFolder.listFiles()
for (file <- fileToProcess){
val name: String = file.getName()
val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name)
val lineIterator: Iterator[String] = source.getLines()
while (lineIterator.hasNext) {
val line = lineIterator.next()
val jsonString = parseLine(line)
val request = createRequest(jsonString, uri, host)
ch.writeAndFlush(request)
}
println("closing")
ch.closeFuture().sync()
}
} finally {
group.shutdownGracefully()
}
}
private def parseLine(line: String) = {
//do some parsing to get the json string I want
}
def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = {
val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8)
val request: FullHttpRequest = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath())
request.headers().set(HttpHeaders.Names.HOST, host)
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json")
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes())
request.content().clear().writeBytes(bytebuf)
request
}
}
class HttpClientInitializer() extends ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel) = {
val pipeline = ch.pipeline()
pipeline.addLast(new HttpClientCodec())
//aggregates all http messages into one if content is chunked
pipeline.addLast(new HttpObjectAggregator(1048576))
pipeline.addLast(new IdleStateHandler(0, 0, 600))
pipeline.addLast(new HttpClientHandler())
}
}
class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] {
override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
try {
msg match {
case res: FullHttpResponse =>
println("response is: " + res.content().toString(CharsetUtil.US_ASCII))
ReferenceCountUtil.retain(msg)
}
} finally {
ReferenceCountUtil.release(msg)
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = {
println("HttpHandler caught exception", e)
ctx.close()
}
}
Ist schreiben nicht asynchron Kanal? Als Ergebnis des Schreibens erhalten Sie Future, was Ihnen überlassen ist, wie Sie damit umgehen können. – user1582639
Ich lerne auch Netty 4.0. Hier ist mein Verständnis von Design. Das erste, was ich im Hinterkopf habe, ist, dass Sie in Netty 4 die Gewissheit haben, dass alle registrierten Handler in einem einzigen Thread ausgeführt werden, so dass keine Synchronisation erforderlich ist, es sei denn, Sie verwenden Shared Handler. Daher werden alle Ihre eingereichten Anfragen nacheinander über den Kanal gesendet und die Antworten werden in der gleichen Reihenfolge empfangen. Wenn Sie also die Datenstruktur wie die Warteschlange in Ihrem Duplex-Handler für alle Anfragen verwalten, können Sie immer die entsprechende Anfrage für die zuletzt erhaltene Antwort abfragen. – user1582639
Danke für die Antworten! Mein Problem ist, obwohl ich viele Nachrichten auf den Kanal schreibe, bekomme ich nur sehr langsam die Antworten (1 Antwort/Sek., Während ich hoffe, einige tausend/Sek. Zu erhalten). Um dies zu verdeutlichen, bitte ich, was ich bisher bekommen habe. – Mart