2017-12-23 7 views
0

Ich habe eine JavaSE8-App für die parallele Verarbeitung großer Datenmengen. Ich erzeuge 1M Objekte, die ich in eine einzige komprimierte Datei serialisieren möchte. Die Datei wird von einer Web-App heruntergeladen/hochgeladen. Der parallele Prozess ist gut optimiert. Die Serialisierung/Komprimierung erfolgt jedoch sequenziell und ist der Flaschenhals meiner App.Parallele Java-Serialisierung und -Komprimierung

Ich habe verschiedene Lösung getestet: Kryo, ChronicleMap ... Ich verwende jetzt Kryo und BZ2-Komprimierung. Es funktioniert. Aber die Leistung ist nicht gut genug.

Ich kann keine Lösung für die parallele Serialisierung und Komprimierung finden. Alle Informationen in diesem Sinne ist willkommen

Antwort

1

Es ist nicht wirklich wichtig, wie Sie Dataset entweder parallel oder sequentiell verarbeiten, in klarem Design - Serialisierung ist immer eine sequentielle (aufgrund der sequentiellen Natur der Ausgangsströme, Sockets und so weiter) Operation und bleibt bei der Verarbeitung Ihres Datensatzes Wenn Sie also Ihr serialisiertes Dataset in eine Datei, eine Verbindung oder einen Rohdatenspeicher serialisieren wollen, müssen Sie eine Barriere definieren, die Daten vor konkurrierenden Rennen und unerwünschten Änderungen schützt.

Sicherlich gibt es Fälle, in denen jeder Arbeitsfaden die Daten selbst serialisiert, wie zum Beispiel http-Server, aber hier handelt es sich um einen einzelnen Datensatz, der parallel verarbeitet und schließlich serialisiert wird.

Also, nach oben soll es ein richtiger Code zu beantworten sein. Es verwendet Standard-Java-Serialisierung + GZIP-Komprimierung. Sie können Serialisierung und/oder Komprimierung in diesem Code problemlos ersetzen und Benchmarks mit Ihrer aktuellen Lösung vergleichen.

package com.example.demo; 

import java.io.*; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 
import java.util.zip.GZIPInputStream; 
import java.util.zip.GZIPOutputStream; 

import static java.lang.String.format; 

public final class ParallelObjectsSerialization { 

    private static final int ONE_MILLION = 1_000_000; 
    private static final String SERIALIZE_FILE = "/tmp/out.bin"; 

    public static void main(String[] args) throws IOException, ClassNotFoundException { 
//  List<Player> players = parallelGenerate1MPlayers(); 
     List<Player> players = seqGenerate1MPlayers(); 
     serialize(players); 
     players.clear(); 
     players = deserialize(); 
    } 

    private static List<Player> deserialize() throws IOException, ClassNotFoundException { 
     long started = System.currentTimeMillis(); 
     List<Player> players = new ArrayList<>(); 
     try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new FileInputStream(SERIALIZE_FILE)))) { 
      for (int i = 0; i < ONE_MILLION; i++) { 
       players.add((Player) in.readObject()); 
      } 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("deserialization of %d objects took %d ms", players.size(), time)); 
     return players; 
    } 

    private static final class Player implements Serializable { 
     private final String name; 
     private final int level; 

     private Player(String name, int level) { 
      this.name = name; 
      this.level = level; 
     } 
    } 

    private static List<Player> seqGenerate1MPlayers() { 
     long started = System.currentTimeMillis(); 
     List<Player> players = new ArrayList<>(ONE_MILLION); 
     for (int i = 0; i < ONE_MILLION; i++) { 
      players.add(new Player(randomName(i), i)); 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("sequential generating of %d objects took %d ms", players.size(), time)); 
     return players; 
    } 

    private static List<Player> parallelGenerate1MPlayers() { 
     long started = System.currentTimeMillis(); 
     Player[] players = new Player[ONE_MILLION]; 
     Arrays.parallelSetAll(players, (i) -> new Player(randomName(i), i)); 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("parallel generating of %d objects took %d ms", players.length, time)); 
     return Arrays.asList(players); 
    } 

    private static void serialize(List<Player> players) throws IOException { 
     long started = System.currentTimeMillis(); 
     try (ObjectOutputStream out = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(SERIALIZE_FILE)))) { 
      for (Player player : players) { 
       out.writeObject(player); 
      } 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("serialization of %d objects took %d ms", players.size(), time)); 
    } 

    private static String randomName(int seed) { 
     StringBuilder builder = new StringBuilder(); 
     double chance = 30.0; 
     for (char c = 'a'; c <= 'z'; c++) { 
      if (Math.random() * 100.0 <= chance) { 
       builder.append(c); 
       if (builder.length() == 7) { 
        break; 
       } 
      } 
     } 
     if (builder.length() == 0) { 
      builder.append("unknown").append(seed); 
     } 
     return builder.toString(); 
    } 
} 
+0

Vielen Dank Alexander. Ich habe diese Lösung bereits getestet. Es ist der Engpass im gesamten Prozess, da die Serialisierung sequenziell erfolgen muss. – kem

+0

Was ist mit der Aufteilung Sammlung in mehrere kleinste Sammlungen und serialisieren sie parallel und gruppieren sie in Zip-oder TAR-Datei? – kem

+1

Ich fürchte in diesem Fall '' 'Teer''' wäre ein Flaschenhals. Wie ich in meinem Post erwähnt habe - letzter Schritt wird immer eine sequentielle Operation sein. Aber andererseits, wenn Ihre Serialisierung CPU-gebunden ist und ein Problem in der CPU-Auslastung während der Serialisierung von Objekten auftaucht, würde '' 'tar''' auf vielen kleinen, serialisierten Chunks funktionieren, aber hüte dich vor der Festplatten-I/O-Sättigung. –