2016-04-22 9 views
4

Ich lese eine Datei über eine Java-nio-Schnittstelle direkt in einen Stream. Dies startet asynchrone HTTP-Anfragen und behandelt diese in der Zukunft. Alle 10.000 Datensätze lade ich dieses Ergebnis auf einen Server hoch und lösche die Datensätze, so dass dies meinen Speicherverbrauch löscht.
Ich beginne mit dem Byte-Array, das ständig im Speicher bleibt. Der HTTP-Client (commons CloseableHttpAsyncClient) löst die Anforderungen asynchron aus, sodass diese am Anfang alle gleichzeitig ausgelöst werden.
Gibt es eine Möglichkeit, den Lambda-Stream so zu begrenzen, dass ich die Anzahl der Zeilen begrenzen kann, die gleichzeitig verarbeitet werden? So kontrolliere ich meine Erinnerung.Speicherverbrauch Java-Datei-Stream

new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file))) 
    .lines() 
    .map(line -> CsvLine.create(line)) 
    .filter(line -> !line.isHeader()) 
    .forEach(line -> getResult(line, new FutureCallback<HttpResponse>() { 
     @Override 
     public void completed(HttpResponse response) { 
      try { 
       result.addLine(response); 
      } catch (IOException e) { 
       LOGGER.error("IOException, cannot write to server", e); 
       todo.set(-1); // finish in error 
      } finally { 
       todo.decrementAndGet(); 
      } 
     } 

     @Override 
     public void failed(Exception ex) { 
      handleError(); 
     } 

     @Override 
     public void cancelled() { 
      handleError(); 
     } 
    } 
)); 
+0

Können Sie nicht einfach den Code in Stücke von 10000 ausführen? Schleife durch den Stream und wenn du 10000 hast, starte alle Anrufe. –

+0

Verwenden Sie eine 'For'-Schleife. Es gibt keinen Grund, es als eine Stream-Lösung zu schreiben, wenn nur zwei von dreißig Zeilen eigentliche funktionale Programmierung sind ... – Holger

+0

Ich handle 10 Millionen Zeilen, und möchte in Zukunft größer werden, mit einer For-Schleife werden sie alle gehen im Speicher und konsumiere all meinen Speicher – jelle

Antwort

1

Sie könnten versuchen, eine Semaphore mit Ihrem Strom zu drosseln, so dass nur eine bestimmte maximale Asynchron-Anfragen zu einem Zeitpunkt ausstehen. Es könnte so aussehen:

Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS, true); // false if FIFO is not important 
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file))) 
.lines() 
     .map(line -> CsvLine.create(line)) 
     .filter(line -> !line.isHeader()) 
     .forEach(line -> { 
      try { 
       if (!semaphore.tryAcquire(ASYNC_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)) { 
        handleTimeout(); 
       } else { 
        getResult(line, new FutureCallback<HttpResponse>() { 
         @Override 
         public void completed(HttpResponse response) { 
          try { 
           result.addLine(response); 
          } catch (IOException e) { 
           LOGGER.error("IOException, cannot write to server", e); 
           todo.set(-1); // finish in error 
          } finally { 
           todo.decrementAndGet(); 
           semaphore.release(); 
          } 
         } 

         @Override 
         public void failed(Exception ex) { 
          handleError(); 
          semaphore.release(); 
         } 

         @Override 
         public void cancelled() { 
          handleError(); 
          semaphore.release(); 
         } 
        } 
        ); 
       } 
      } catch (InterruptedException e) { 
       // handle appropriately 
      } 

     }); 
+0

Ich habe es jetzt ein wenig gelöst, ich werde meinen Code entsprechend umgestalten. Vielen Dank – jelle