2017-07-13 2 views
-1
public class Test { 

    private static final ExecutorService pool = Executors.newFixedThreadPool(10); 

    public static void main(String[] args) { 
     Test test = new Test(); 
     test.processData("data/test.txt"); 
    } 

    public void processData(String filePath){ 
     File folder = new File(filePath); 
     for (String filename : folder.list()) { 
     String filePath = folder.toPath().resolve(filename).toString(); 
     File inputfile = new File(filePath); 

     if (inputfile.isDirectory()) { 
      processData(filePath); 
     }else{ 
     pool.execute(() -> { 
      log.info("Start processing " + filePath); 
      Processor.process(filePath); 
     }); 
    } 
    } 
} 

class Processor{ 
    private static final Logger log = LoggerFactory.getLogger(Processor.class); 

    public static void process(String filePath){ 

     try{ 

      List<Document> documets = DocumentProcessor.analyze(filePath); 
      ... 

     }catch(IOException e){ 
      e.printStackTrace(); 
     } 
    } 
} 

class DocumentProcessor{ 

    private static Tokenizer tokenizer = null; 
    private static Resource resource = null; 
    private static Checker checker = null; 
    private static final ExecutorService pool = Executors.newFixedThreadPool(4); 
    static { 
     // static initialization here 
     // ommited 
    } 

    public static List<Document> analyze(String filePath){ 
     BufferedReader br = null; 

     List<Document> processedDocs = new ArrayList<>(); 
     try { 
      br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); 
      String line = null; 
      List<Future<Document>> tasks = new ArrayList<>(); 
      while ((line = br.readLine()) != null) { 
       line = line.trim(); 
       Callable<Document> callable = new FileThread(line, filePath); 
       tasks.add(pool.submit(callable)); 
      } 
      for (Future<Document> task : tasks) { 
       try { 
        processedDocs.add(task.get()); 
       } catch (InterruptedException e) { 
        log.error("InterruptedException Failure: " + line); 
       } catch (ExecutionException e) { 
        log.error("Thread ExecutionException e: " + line); 
        e.printStackTrace(); 
       } 
      } 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } finally { 
      try { 
       if (br != null) 
        br.close(); 
      } catch (IOException ex) { 
       ex.printStackTrace(); 
      } 
     } 
     return processedDocs; 
    } 
} 



public class FileThread implements Callable<Document> { 

    private String textLine; 
    private String filePath; 

    public TextThread(String textLine, String filePath) { 
     this.textLine = textLine; 
     this.filePath = filePath; 
    } 

    public Document call() { 

     ParsedDoc parsedDoc = JSONDocParser.parse(textLine, Source.news); 
     Document doc = new Document(parsedDoc.getURL(), parsedDoc.getDocTime()); 
doc.setDocument(Parser.parseDoc(parsedDoc.getText()); 

     return doc; 
    } 
} 

Aus diesem Code wird Multithreading in der processData() - Methode mit einer ExecutorService-Klasse verwendet. Meine Frage ist: 1) Brauche ich irgendeine Art von Thread-Synchronisation in Prozessor oder DocumentProcessor-Klasse? Die Klasse 'Prozessor' hat keine Instanz- oder Klassenvariable mit Ausnahme der Protokollvariablen.Ist dieser Code Thread mit ExecutorService sicher?

2) Wenn dies Teil eines großen Projekts ist und der einzige Multi-Threading-Code in der Test-Klasse ist, wie hier gezeigt. Muss ich mir Sorgen über Threading-Probleme in allen anderen Klassen machen, vorausgesetzt, es gibt keinen Threading-Schutz in allen anderen Klassen. Der Grund, warum ich das frage, ist, dass, basierend auf den Beispielen, die ich gesehen habe, es scheint, wenn ExecutorService in diesem Idiom verwendet wird, brauche ich keine Threading-Probleme in allen anderen Klassen, d. H. Prozessor oder DocumentProcessor. Ist das wahr?

EDIT: Bitte beachten Sie meinen bearbeiteten Code. Ich denke, jetzt kann es meine Frage besser illustrieren. Vielen Dank.

+0

Was macht 'DocumentProcessor.analyze (filePath)'? Halte es Zustand? –

+1

Threadsicherheit ist ein Problem, das bei der Behandlung von _mehr als einem Thread_ auftritt, die den gleichen Code ausführen. Dein Code hat das nicht. – Seelenvirtuose

+0

Wenn jedoch mehrere Threads (unabhängig davon, ob sie aus einem Pooldienst oder manuell von Ihnen erstellt wurden) denselben Code ausführen, müssen Sie immer nach Threadsicherheit suchen. – Seelenvirtuose

Antwort

2

Synchronisiert nur erforderlich, wenn ein kritischer Abschnitt Zugriff durch mehrere thread.In Ihrem Fall, was es scheint, dass Sie als Dienst aufrufen, und es gibt keinen gemeinsamen Code zwischen anderen Threads.

+0

Bitte lesen Sie meinen Code erneut. Ich habe bearbeitet. – user697911

0

Was Sie tun müssen, ist kritische Abschnitte Ihres Codes mit Synchronisierung zu schützen. Dies bedeutet, dass nur wo Sie haben, dass möglicherweise durch mehr als einen Thread gleichzeitig geändert werden könnte. Wenn Sie keinen Zugriff auf den DocumentProcessor-Code haben, könnten Sie den Aufruf von analyze() synchronisieren, aber das wäre wahrscheinlich zu grob. Wenn die meisten Ihrer Arbeit in dieser Methode sind, werden Sie viel von dem Vorteil der Verwendung mehrerer Threads verlieren.

Idealerweise sollte die analyze() - Methode threadsicher gemacht werden, damit Sie nur dort synchronisieren, wo die Methode statische Instanzvariablen von DocumentProcessor ändert.

+0

"statische Instanzvariablen", meinst du "statische Klassenvariablen"? – user697911

+0

Ich habe den DocumentProcessor hinzugefügt und analyze(). Muss ich irgendeinen Schutz tun? Es gibt einen weiteren ExecutorService im Prozessor. – user697911

+0

Ich glaube, der einzige Ort, den ich schützen muss, ist die Variable "textLine" in der call() -Methode der FileThread-Klasse. Ist das wahr? – user697911