public class Test {
private static final ExecutorService pool = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
Test test = new Test();
test.processData("data/test.txt");
}
public void processData(String filePath){
File folder = new File(filePath);
for (String filename : folder.list()) {
String filePath = folder.toPath().resolve(filename).toString();
File inputfile = new File(filePath);
if (inputfile.isDirectory()) {
processData(filePath);
}else{
pool.execute(() -> {
log.info("Start processing " + filePath);
Processor.process(filePath);
});
}
}
}
class Processor{
private static final Logger log = LoggerFactory.getLogger(Processor.class);
public static void process(String filePath){
try{
List<Document> documets = DocumentProcessor.analyze(filePath);
...
}catch(IOException e){
e.printStackTrace();
}
}
}
class DocumentProcessor{
private static Tokenizer tokenizer = null;
private static Resource resource = null;
private static Checker checker = null;
private static final ExecutorService pool = Executors.newFixedThreadPool(4);
static {
// static initialization here
// ommited
}
public static List<Document> analyze(String filePath){
BufferedReader br = null;
List<Document> processedDocs = new ArrayList<>();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
String line = null;
List<Future<Document>> tasks = new ArrayList<>();
while ((line = br.readLine()) != null) {
line = line.trim();
Callable<Document> callable = new FileThread(line, filePath);
tasks.add(pool.submit(callable));
}
for (Future<Document> task : tasks) {
try {
processedDocs.add(task.get());
} catch (InterruptedException e) {
log.error("InterruptedException Failure: " + line);
} catch (ExecutionException e) {
log.error("Thread ExecutionException e: " + line);
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (br != null)
br.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
return processedDocs;
}
}
public class FileThread implements Callable<Document> {
private String textLine;
private String filePath;
public TextThread(String textLine, String filePath) {
this.textLine = textLine;
this.filePath = filePath;
}
public Document call() {
ParsedDoc parsedDoc = JSONDocParser.parse(textLine, Source.news);
Document doc = new Document(parsedDoc.getURL(), parsedDoc.getDocTime());
doc.setDocument(Parser.parseDoc(parsedDoc.getText());
return doc;
}
}
Aus diesem Code wird Multithreading in der processData() - Methode mit einer ExecutorService-Klasse verwendet. Meine Frage ist: 1) Brauche ich irgendeine Art von Thread-Synchronisation in Prozessor oder DocumentProcessor-Klasse? Die Klasse 'Prozessor' hat keine Instanz- oder Klassenvariable mit Ausnahme der Protokollvariablen.Ist dieser Code Thread mit ExecutorService sicher?
2) Wenn dies Teil eines großen Projekts ist und der einzige Multi-Threading-Code in der Test-Klasse ist, wie hier gezeigt. Muss ich mir Sorgen über Threading-Probleme in allen anderen Klassen machen, vorausgesetzt, es gibt keinen Threading-Schutz in allen anderen Klassen. Der Grund, warum ich das frage, ist, dass, basierend auf den Beispielen, die ich gesehen habe, es scheint, wenn ExecutorService in diesem Idiom verwendet wird, brauche ich keine Threading-Probleme in allen anderen Klassen, d. H. Prozessor oder DocumentProcessor. Ist das wahr?
EDIT: Bitte beachten Sie meinen bearbeiteten Code. Ich denke, jetzt kann es meine Frage besser illustrieren. Vielen Dank.
Was macht 'DocumentProcessor.analyze (filePath)'? Halte es Zustand? –
Threadsicherheit ist ein Problem, das bei der Behandlung von _mehr als einem Thread_ auftritt, die den gleichen Code ausführen. Dein Code hat das nicht. – Seelenvirtuose
Wenn jedoch mehrere Threads (unabhängig davon, ob sie aus einem Pooldienst oder manuell von Ihnen erstellt wurden) denselben Code ausführen, müssen Sie immer nach Threadsicherheit suchen. – Seelenvirtuose