2017-09-08 2 views
2

Ich versuche, eine Protokolldatei zu lesen und zu analysieren, die nur CPU verbraucht. Ich habe einen Server, der eine riesige Textdatei 230MB/Sekunde liest, lesen Sie einfach Textdatei nicht analysieren. Wenn ich versuche, die Textdatei zu analysieren, benutze single thread, kann ich die Datei um 50-70MB/Sekunde analysieren.Erhöhung des Festplattenlesedurchsatzes durch Parallelität

Ich möchte meinen Durchsatz erhöhen, diesen Job Nebenläufigkeit. In diesem Code erreichte ich 130 MB/Sekunde. Auf dem Höhepunkt sah ich 190MB/Sekunde. Ich habe BlockedQueue, Semaphore, ExecutionService usw. versucht. Gibt es einen Rat, den Sie mir bei 200MB/Sekunde Durchsatz geben.

public static void fileReaderTestUsingSemaphore(String[] args) throws Exception { 

    CustomFileReader reader = new CustomFileReader(args[0]); 
    final int concurrency = Integer.parseInt(args[1]); 
    ExecutorService executorService = Executors.newFixedThreadPool(concurrency); 
    Semaphore semaphore = new Semaphore(concurrency,true); 
    System.out.println("Conccurrency in Semaphore: " + concurrency); 


    String line; 

    while ((line = reader.getLine()) != null) 
    { 
     semaphore.acquire(); 

     try 
     { 

      final String p = line; 

      executorService.execute(new Runnable() { 
       @Override 
       public void run() { 
        reader.splitNginxLinewithIntern(p); // that is the method which parser string and convert to class. 
        semaphore.release(); 
       } 
      }); 
     } 

     catch (Exception ex) 
     { 
      ex.printStackTrace(); 
     } 

     finally { 
      semaphore.release(); 
     } 
    } 

    executorService.shutdown(); 
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); 

    System.out.println("ReadByteCount: " + reader.getReadByteCount()); 
} 

Antwort

0

Unter der Annahme, dass Sie nicht über Reihenfolge der Linien egal:

final String MARKER = new String(""); 
    BlockingQueue<String> q = new LinkedBlockingDeque<>(1024); 
    for (int i = 0; i < concurrency; i++) 
     executorService.execute(() -> { 
      for (;;) { 
       try { 
        String s = q.take(); 
        if(s == MARKER) { 
         q.put(s); 
         return; 
        } 
        reader.splitNginxLinewithIntern(s); 
       } catch (InterruptedException e) { 
        return; 
       } 
      } 
     }); 
    String line; 
    while ((line = reader.readLine()) != null) { 
     q.put(line); 
    } 
    q.put(MARKER); 
    executorService.awaitTermination(10, TimeUnit.MINUTES); 

Diese eine Anzahl von Threads startet, die jeweils eine bestimmte Aufgabe ausgeführt wird; Diese Aufgabe besteht darin, aus der Warteschlange zu lesen und die Split-Methode auszuführen. Der Leser füttert nur die Warteschlange, benachrichtigt sie, wenn sie abgeschlossen ist, und wartet auf die Beendigung.

Wenn Sie RxJava2 und rxjava2-extras verwenden sind, die einfach wären

Strings.from(reader) 
      .flatMap(str -> Flowable 
       .just(str) 
       .observeOn(Schedulers.computation()) 
       .doOnNext(reader::splitNginxLinewithIntern) 
      ) 
      .blockingSubscribe(); 
+0

Danke für Ihren Code. Ich habe Ihren Code getestet, das Ergebnis ist das gleiche. Ich habe diese Art von Code mit ArrayBlockingQueue versucht. Ich denke, ich sollte besser Chunked String Array Solitun verwenden, dass Ralf erwähnt wird. @Tassos –

0

Sie benötigen Multi-Thread zu gehen, und Sie müssen das Parsen an Worker-Threads des Leser Thread delegiert haben, das ist klar. Der Punkt ist, wie diese Delegierung mit so wenig Overhead wie möglich durchgeführt wird.

@Tassos lieferte Code, der wie eine solide Verbesserung aussieht.

Noch eine Sache, die Sie versuchen können, ist die Delegationsgranularität zu ändern, nicht jede einzelne Zeile einzeln zu delegieren, sondern Bausteine ​​von z. 100 Zeilen, wodurch der Aufwand für das Delegieren/Synchronisieren um den Faktor 100 reduziert wird (aber dann ein String [] - Array oder ähnliches benötigt wird, was nicht zu sehr schaden sollte).

+0

Vielen Dank für Ihre Nachricht. Ich habe es getestet. Es hat sich stark verbessert, so wie meine Parsingzeit um 130 Sekunden auf 70 Sekunden gesunken ist. Aber Per Huss Lösung ist viel besser, es verringerte sich um 40 Sekunden. @Ralf Kleberhoff –

1

Sie könnten von der Files.lines()-Methode und dem in Java 8 eingeführten Stream-Paradigma profitieren. Es wird der gemeinsame fork/join-Pool der Systeme verwendet. Versuchen Sie dieses Muster:

import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 

public class LineCounter 
{ 
    public static void main(String[] args) throws IOException 
    { 
     Files.lines(Paths.get("/your/file/here")) 
      .parallel() 
      .forEach(LineCounter::processLine); 
    } 

    private static void processLine(String line) { 
     // do the processing 
    } 
} 
+1

Mein Laptop (MacBook Pro Mitte 2015) wird eine 450 MB Datei in einer Sekunde von einem Kaltstart mit diesem Muster verarbeiten ... –

+1

Vielen Dank für diesen Code. Dieser Code ist der Beste für jetzt. Meine Analysezeit wurde auf 130 Sekunden bis 40 Sekunden verringert. Ich hoffe, ich kann diese Lösung in meinen Code einfügen. @Per Huss :) –

+0

Ich freu mich zu hören! Viel Glück mit Ihrem Projekt! –

Verwandte Themen