2016-09-23 1 views
1

Ich muss mit einer C++ - Anwendung sprechen, die als Server auf einem bestimmten Port ausgeführt wird. Es stellt eine binäre API (Protocol Buffer) für eine bessere Leistung zur Verfügung. Mein RESTful-Service wird im Frühjahr MVC und Jersey entwickelt und möchte diese neue Funktion nutzen. Ich konnte Protokollpuffer-Nachrichten erfolgreich konsumieren und produzieren.Socket-Kommunikation zwischen Java-Webanwendung und C++ Server

In meiner Spring-Webanwendung habe ich zunächst einen Apache Commons Pool erstellt, um einen Pool von Socket-Verbindungen zu erstellen. Dies ist, wie ich las/an die Buchse Schreiben

Update 1: PooledObjectFactory Implementierung

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<Socket> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public Socket create() throws Exception { 
     return new Socket(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(Socket socket) { 
     return new DefaultPooledObject<>(socket); 
    } 

    @Override 
    public void destroyObject(final PooledObject<Socket> p) throws Exception { 
     final Socket socket = p.getObject(); 
     socket.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<Socket> p) { 
     final Socket socket = p.getObject(); 
     return socket != null && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 
} 

@Service 
@Scope("prototype") 
public class Gateway { 
    @Autowired 
    private GenericObjectPool pool; 

    public Response sendAndReceive(Request request) throws CommunicationException { 
     Response response = null; 
     final Socket socket = pool.borrowObject(); 
     try { 
      request.writeDelimitedTo(socket.getOutputStream()); 
      response = Response.parseDelimitedFrom(socket.getInputStream()); 
     } catch (Exception ex) { 
      LOGGER.error("Gateway error", ex); 
      throw new CommunicationException("Gateway error", ex); 
     } finally { 
      pool.returnObject(socket); 
     } 
     return response; 
    } 
} 

Dies funktioniert für die erste Anforderung hinzufügen und wenn der Pool gibt alle bisher verwendeten Sockel festgestellt, dass die Socket ist bereits geschlossen. Dies könnte daran liegen, dass verschiedene Anfragen mit denselben Eingabe- und Ausgabeströmen verbunden werden. Wenn ich den Socket nach dem Lesen der Antwort schließe, schlägt es den Zweck des Pooling. Wenn ich einen Singleton-Socket benutze und ihn einschleuse, kann er die erste Anfrage verarbeiten und dann ablaufen.

Wenn ich den Socket auf jeder Instanz erstellen, funktioniert es und die Leistung beträgt etwa 2500 Mikrosekunden für jede Anfrage. Mein Ziel ist es, die Leistung innerhalb von 500 Mikrosekunden zu erreichen.

Was sollte der beste Ansatz angesichts der Anforderungen sein?

Update 2: einen Server und Client

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.ServerSocket; 
import java.net.Socket; 

public class TcpServer1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpServer1.class.getName()); 

    public static void main(String[] args) throws Exception { 
     ServerSocket serverSocket = new ServerSocket(Integer.parseInt(args[0])); 
     Socket socket = null; 
     while (true) { 
      try { 
       socket = serverSocket.accept(); 
      } catch (IOException e) { 
       LOGGER.warn("Could not listen on port"); 
       System.exit(-1); 
      } 

      Thread thread = new Thread(new ServerConnection1(socket)); 
      thread.start(); 
     } 
    } 
} 

class ServerConnection1 implements Runnable { 

    static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class.getName()); 

    private Socket socket = null; 

    ServerConnection1(Socket socket) { 
     this.socket = socket; 
    } 

    public void run() { 
     try { 
      serveRequest(socket.getInputStream(), socket.getOutputStream()); 
      //socket.close(); 
     } catch (IOException ex) { 
      LOGGER.warn("Error", ex); 
     } 
    } 

    public void serveRequest(InputStream inputStream, OutputStream outputStream) { 
     try { 
      read(inputStream); 
      write(outputStream); 
     } catch (IOException ex) { 
      LOGGER.warn("ERROR", ex); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Response.Builder builder = Response.newBuilder(); 
     Response response = builder.setStatus("SUCCESS").setPing("PING").build(); 
     response.writeDelimitedTo(outputStream); 
     LOGGER.info("Server sent {}", response.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Request request = Request.parseDelimitedFrom(inputStream); 
     LOGGER.info("Server received {}", request.toString()); 
    } 
} 

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.Socket; 

public class TcpClient1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpClient1.class.getName()); 

    private Socket openConnection(final String hostName, final int port) { 
     Socket clientSocket = null; 
     try { 
      clientSocket = new Socket(hostName, port); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while connecting to server", e); 
     } 
     return clientSocket; 
    } 

    private void closeConnection(Socket clientSocket) { 
     try { 
      LOGGER.info("Closing the connection"); 
      clientSocket.close(); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while closing the connection", e); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Request.Builder builder = Request.newBuilder(); 
     Request request = builder.setPing("PING").build(); 
     request.writeDelimitedTo(outputStream); 
     LOGGER.info("Client sent {}", request.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Response response = Response.parseDelimitedFrom(inputStream); 
     LOGGER.info("Client received {}", response.toString()); 
    } 

    public static void main(String args[]) throws Exception { 
     TcpClient1 client = new TcpClient1(); 
     try { 
      Socket clientSocket = null; 

      LOGGER.info("Scenario 1 --> One socket for each call"); 
      for (int i = 0; i < 2; i++) { 
       clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
       OutputStream outputStream = clientSocket.getOutputStream(); 
       InputStream inputStream = clientSocket.getInputStream(); 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
       client.closeConnection(clientSocket); 
      } 

      LOGGER.info("Scenario 2 --> One socket for all calls"); 
      clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
      OutputStream outputStream = clientSocket.getOutputStream(); 
      InputStream inputStream = clientSocket.getInputStream(); 
      for (int i = 0; i < 2; i++) { 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
      } 
      client.closeConnection(clientSocket); 
     } catch (Exception e) { 
      LOGGER.warn("Exception occurred", e); 
      System.exit(1); 
     } 
    } 
} 

Hier Anfrage- und Antwort sind Protocol Buffer Klassen Hinzufügen. In Szenario 1 kann es beide Aufrufe verarbeiten, während es in Szenario 2 nie vom zweiten Lesevorgang zurückkehrt. Scheint, dass Protocol Buffer API die Streams anders behandelt. Beispielausgabe unter

17:03:10.508 [main] INFO c.d.e.socket.TcpClient1 - Scenario 1 --> One socket for each call 
17:03:10.537 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.698 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.731 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.732 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Scenario 2 --> One socket for all calls 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.735 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
+0

Nicht genügend Informationen darüber, wie der 'PooledObjectFactory' (verwendet von' GenericObjectPool') verhält sich - vielleicht die 'passivateObject' Methode schließt die Steckdose? –

+0

Könnte es nicht auch die C++ - Anwendung sein, die Verbindungen schließt, sagen wir nach einem bestimmten Zeitraum der Untätigkeit? – Gimby

+0

@Adrian Hinzugefügt Fabrikklasse – user2459396

Antwort

0

Nach großen Schmerzen war ich in der Lage, das Problem zu lösen. Die Klasse, die den Lese-/Schreibzugriff auf den Socket übernahm, wurde als Prototyp definiert. Sobald ein Verweis auf den Socket gefunden wurde, wurde er nicht mehr geklärt (verwaltet von Tomcat). So werden nachfolgende Aufrufe an den Socket in eine Warteschlange gestellt, die dann ausläuft und das Objekt von Apache Commons Pool zerstört wird.

Um das zu beheben, habe ich die Klasse SocketConnection mit einem ThreadLocal von Socket erstellt. Auf der Verarbeitungsseite habe ich einen Callback erstellt, um den Lese-/Schreibzugriff auf den Socket zu übernehmen. Beispielcode-Ausschnitt unten:

class SocketConnection { 

    final private String identity; 
    private boolean alive; 
    final private ThreadLocal<Socket> threadLocal; 

    public SocketConnection(final String hostname, final int port) throws IOException { 
     this.identity = UUID.randomUUID().toString(); 
     this.alive = true; 
     threadLocal = ThreadLocal.withInitial(rethrowSupplier(() -> new Socket(hostname, port))); 
    } 

} 

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<SocketConnection> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 
    private SocketConnection connection = null; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public SocketConnection create() throws Exception { 
     LOGGER.info("Creating Socket"); 
     return new SocketConnection(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(SocketConnection socketConnection) { 
     return new DefaultPooledObject<>(socketConnection); 
    } 

    @Override 
    public void destroyObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
     socketConnection.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<SocketConnection> p) { 
     final SocketConnection connection = p.getObject(); 
     final Socket socket = connection.get(); 
     return connection != null && connection.isAlive() && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(true); 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
    } 

} 

class SocketCallback implements Callable<Response> { 

    private SocketConnection socketConnection; 
    private Request request; 

    public SocketCallback() { 
    } 

    public SocketCallback(SocketConnection socketConnection, Request request) { 
     this.socketConnection = socketConnection; 
     this.request = request; 
    } 

    public Response call() throws Exception { 
     final Socket socket = socketConnection.get(); 
     request.writeDelimitedTo(socket.getOutputStream()); 
     Response response = Response.parseDelimitedFrom(socket.getInputStream()); 
     return response; 
    } 

} 

@Service 
@Scope("prototype") 
public class SocketGateway { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SocketGateway.class); 

    @Autowired 
    private GenericObjectPool<SocketConnection> socketPool; 
    @Autowired 
    private ExecutorService executorService; 

    public Response eligibility(Request request) throws DataException { 
     EligibilityResponse response = null; 
     SocketConnection connection = null; 
     if (request != null) { 
      try { 
       connection = socketPool.borrowObject(); 
       Future<Response> future = executorService.submit(new SocketCallback(connection, request)); 
       response = future.get(); 
      } catch (Exception ex) { 
       LOGGER.error("Gateway error {}"); 
       throw new DataException("Gateway error", ex); 
      } finally { 
       socketPool.returnObject(connection); 
      } 
     } 

     return response; 
    } 

} 
Verwandte Themen