2012-04-05 20 views
5

Ich muss Nachrichten an CLI PHP-Prozesse über stdin von Java übergeben. Ich möchte etwa 20 PHP-Prozesse in einem Pool laufen lassen, so dass, wenn ich eine Nachricht an den Pool übergebe, jede Nachricht an einen separaten Thread gesendet wird, wobei eine Warteschlange von zuzustellenden Nachrichten verbleibt. Ich würde mir wünschen, dass diese PHP-Prozesse so lange wie möglich am Leben bleiben und eine neue erstellen, wenn jemand stirbt. Ich habe versucht, dies mit einem statischen Thread-Pool zu tun, aber es scheint eher für Aufgaben gedacht zu sein, die ausgeführt werden und einfach absterben. Wie könnte ich dies tun, mit einer einfachen Schnittstelle, um eine Nachricht an den Pool zu übergeben? Muss ich meinen eigenen "Thread-Pool" implementieren?ThreadPool der CLI-Prozesse

+0

Sehr ähnlich wie diese Frage: http://stackoverflow.com/questions/2592093/php-thread-pool –

+1

ich es eine Ausgabe aus dem PHP, so dass Sie wissen, wann es fertig ist zu verarbeiten? – Clint

+0

Es wird nie eine Verarbeitung durchgeführt werden. Wenn einer stirbt, muss ich einen neuen spawnen, um ihn zu ersetzen. Ich werde die Daten round-robin über stdin an sie weitergeben. – Will

Antwort

4

Ich biete einige Code mit diesem, wie ich denke, es wird die Dinge klarer machen. Grundsätzlich müssen Sie einen Pool von Prozessobjekten herumhalten. Bedenken Sie, dass jeder dieser Prozesse einen Eingabe-, Ausgabe- und Fehlerstrom hat, den Sie irgendwie verwalten müssen. In meinem Beispiel leite ich einfach den Fehler um und gebe ihn an die Hauptprozesskonsole aus. Sie können Callbacks und Handler einrichten, um bei Bedarf die Ausgabe des PHP-Programms zu erhalten. Wenn Sie nur Aufgaben bearbeiten und sich nicht darum kümmern, was PHP sagt, lassen Sie es unverändert oder leiten Sie es in eine Datei um.

Ich verwende die Apache Commons Pool Bibliothek für den ObjectPool. Keine Notwendigkeit, einen neu zu erfinden.

Sie haben einen Pool von 20 Prozessen, die Ihr PHP-Programm ausführen. Dies alleine wird dich nicht bekommen, was du brauchst. Möglicherweise möchten Sie gleichzeitig Aufgaben für alle 20 dieser Prozesse bearbeiten. Sie benötigen also auch einen ThreadPool, der einen Prozess aus Ihrem ObjectPool zieht.

Sie müssen auch verstehen, dass, wenn Sie töten, oder CTRL-C Ihr Java-Prozess der init Prozess wird Ihre PHP-Prozesse übernehmen und sie werden nur dort sitzen. Wahrscheinlich möchten Sie alle PIDs der von Ihnen generierten PHP-Prozesse protokollieren und sie dann bereinigen, wenn Sie Ihr Java-Programm erneut ausführen.

public class StackOverflow_10037379 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName()); 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectError(Redirect.INHERIT); 
      // I am being lazy, but really the InputStream is where 
      // you can get any output of the PHP Process. This setting 
      // will make it output to the current processes console. 
      builder.redirectOutput(Redirect.INHERIT); 
      builder.redirectInput(Redirect.PIPE); 
      builder.command(mProcessToRun); 
      return builder.start(); 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     // Also change mock_php.exe to /usr/bin/php or wherever. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5);   

     // This will only allow you to queue 100 work items at a time. I would suspect 
     // that if you only want 20 PHP processes running at a time and this queue 
     // filled up you'll need to implement some other strategy as you are doing 
     // more work than PHP can keep up with. You'll need to block at some point 
     // or throw work away. 
     BlockingQueue<Runnable> queue = 
      new ArrayBlockingQueue<>(100, true); 

     ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close();   
    } 
} 

Ausgabe des Programms Run:

12172 - Message 2 
10568 - Message 1 
4804 - Message 3 
11916 - Message 4 
11116 - Message 5 
12172 - Message 6 
4804 - Message 7 
10568 - Message 8 
11916 - Message 9 
11116 - Message 10 
12172 - Message 11 

-Code von C++ Programm nur ausgegeben, was eingegeben wurde:

#include <windows.h> 
#include <iostream> 
#include <string> 

int main(int argc, char* argv[]) 
{ 
    DWORD pid = GetCurrentProcessId(); 
    std::string line; 
    while (true) {  
     std::getline (std::cin, line); 
     std::cout << pid << " - " << line << std::endl; 
    } 

    return 0; 
} 

aktualisieren

Sorry für die Verspätung. Hier ist eine JDK 6 Version für alle Interessierten. Sie müssen einen separaten Thread ausführen, um alle Eingaben aus dem InputStream des Prozesses zu lesen. Ich habe diesen Code eingerichtet, um neben jedem neuen Prozess einen neuen Thread zu erstellen. Dieser Thread liest immer aus dem Prozess, solange er lebt. Anstatt direkt in eine Datei auszugeben, habe ich es so eingerichtet, dass es das Logging-Framework verwendet. Auf diese Weise können Sie eine Protokollierungskonfiguration einrichten, um zu einer Datei zu wechseln, einen Rollover durchzuführen, zur Konsole usw. zu wechseln, ohne dass sie fest codiert ist, um in eine Datei zu gelangen.

Sie werden bemerken, dass ich nur einen einzelnen Gobbler für jeden Prozess starte, obwohl ein Prozess stdout und stderr hat. Ich leite stderr auf stdout um, um die Dinge einfacher zu machen. Scheinbar unterstützt jdk6 nur diese Art von Redirect.

public class StackOverflow_10037379_jdk6 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName()); 

    // Shamelessy taken from Google and modified. 
    // I don't know who the original Author is. 
    public static class StreamGobbler extends Thread { 

     InputStream is; 
     Logger logger; 
     Level level; 

     StreamGobbler(String logName, Level level, InputStream is) { 
      this.is = is; 
      this.logger = Logger.getLogger(logName); 
      this.level = level; 
     } 

     public void run() { 
      try { 
       InputStreamReader isr = new InputStreamReader(is); 
       BufferedReader br = new BufferedReader(isr); 
       String line = null; 
       while ((line = br.readLine()) != null) { 
        logger.log(level, line); 
       } 
      } catch (IOException ex) { 
       logger.log(Level.SEVERE, "Failed to read from Process.", ex); 
      } 
      logger.log(
        Level.INFO, 
        String.format("Exiting Gobbler for %s.", logger.getName())); 
     } 
    } 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectErrorStream(true); 
      builder.command(mProcessToRun); 
      Process process = builder.start(); 
      StreamGobbler loggingGobbler = 
        new StreamGobbler(
        String.format("process.%s", process.hashCode()), 
        Level.INFO, 
        process.getInputStream()); 
      loggingGobbler.start(); 
      return process; 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<Process>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5); 

     BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true); 

     ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close(); 
    } 
} 

Ausgabe

Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 3 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 2 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 1 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 4 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 5 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 8 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 10 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 9 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 6 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 7 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 11 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.295131993. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.756434719. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.332711452. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1981440623. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1043636732. 
+0

Wow, danke für die gründliche Antwort! Jetzt eine Test-Implementierung durchführen. Schätze es wirklich. – Will

+0

Also ich bin auf Java6 und habe keine Redirect. Wie kann ich verhindern, dass der stdout/stderr des Prozesses blockiert? In meinem normalen Anwendungsfall möchte ich in einen Prozess schreiben und stdout/stderr umleiten, um Logfiles zu trennen (ohne zu blockieren). – Will

+1

@Will Mit einer jdk6-Version aktualisiert. –

1

Ihre beste Wette hier ist, verwenden Sie die pcntl Funktionen, um einen Prozess zu verzweigen, aber Kommunikation zwischen Prozessen ist schwierig. Ich würde empfehlen, eine Warteschlange zu erstellen, aus der Ihre Prozesse lesen können, anstatt zu versuchen, Nachrichten an die Befehlszeile weiterzuleiten.

hat mehrere PHP-Clients, die Sie verwenden könnten, um das Messaging zwischen Prozessen zu behandeln.

+0

Sorry, vielleicht war meine Frage unklar - wird bearbeitet. Dies ist eine Java-Frage. Ich möchte einen Java threapool mit lang laufenden Cli Prozessen (/ usr/bin/php in diesem Fall). Ich muss in der Lage sein, etwas an den Pool zu senden, das dann in einen der CLI-Prozesse geschrieben wird. – Will

Verwandte Themen