2016-03-29 7 views
0

Ich habe Funken Streaming-Job geschrieben, der Daten von einem s3 liest. Der Auftrag hat Reihe von mapwithstate von maptopair Anrufe gefolgt, wie unten:Eingabe Streaming-Daten nicht gleichmäßig unter den Aufgaben verteilt

JavaDStream<String> cdrLines = ssc.textFileStream(cdrInputFile); 
JavaDStream<CDR> cdrRecords = cdrLines.map(x -> cdrStreamParser.parse(x)); 
JavaDStream<CDR> cdrRecordsFiltered = cdrRecords 
     .filter(t -> t != null); 
JavaPairDStream<String, CDR> sTripletStream = cdrRecordsFiltered 
     .mapToPair(s -> new Tuple2<String, CDR>(s 
       .gettNumber(), s)); 

JavaPairDStream<String, Tuple2<CDR, List<StatusCode>>> stateDstream1 = sTripletStream 
     .mapWithState(
       StateSpec.function(hsMappingFunc).initialState(
         tripletRDD)).mapToPair(s -> s); 

JavaPairDStream<String,Tuple2<CDR,List<StatusCode>>> stateDstream2 = stateDstream1 
.mapWithState(StateSpec.function(cfMappingFunc).initialState(cfHistoryRDD)) 
     .mapToPair(s -> s); 

JavaPairDStream<String, Tuple2<CDR, List<StatusCode>>> stateDstream3 = stateDstream2 
     .mapWithState(StateSpec.function(imeiMappingFunc).initialState(imeiRDD)) 
     .mapToPair(s -> s); 

Ich habe spark.default.parallelism auf 6. Ich sehe erste und letzte maptopair Stufen sind schnell genug. Die zweite und dritte Mapopairstufe sind sehr langsam.

Jede dieser Stufen durchlaufen 6 Aufgaben. In der zweiten und dritten Stufe der Mapopair laufen 5 Aufgaben mit 2s. Aber eine Aufgabe dauert sehr lange ~ 3-4min. Die Shuffle-Daten dieser Aufgabe sind im Vergleich zu anderen Aufgaben sehr hoch, was zu Engpässen führt.

Gibt es eine Möglichkeit, die Last zwischen allen Aufgaben einheitlicher zu verteilen?

+1

Können Sie erklären, was Ihr Code zu erreichen versucht? Vielleicht können wir mit besserem Kontext helfen, ein besseres Diagramm zu formulieren. –

Antwort

0

Dies ist ein Anwendungsfall für die CDR-Verarbeitung. Jedes CDR-Ereignis hat die folgenden Felder: telno, imei, imsi, callforward, timestamp.

Ich halte 3 Arten von Informationen in Funkenzustand: 1. Letzte CDR-Ereignis (Datensatz) für eine bestimmte Telefonnummer 2. Liste der Anrufliste für jedes Telefon 3. Liste aller bekannten IMEI. Drei mapwithstate Funktionsaufrufe entsprechen der folgenden Funktionalität: step1: Wenn die CDR-Ereignisse eintreffen, muss ich einige Feldvergleiche mit dem letzten bekannten CDR-Ereignis mit derselben Telefonnummer durchführen. Ich halte das letzte Ereignis für ein gegebenes Telno im Funkenzustand, so dass ich Feldvergleiche durchführen kann, wenn neue CDR-Ereignisse eintreffen. Schritt2: Für ein gegebenes Telno möchte ich überprüfen, ob die Rufweiterleitungsnummer bekannt ist oder nicht. Also muss ich die Geschichte von Telno beibehalten. -> Liste der Rufnummern im Zustand. step3: Ich muss Liste aller IMEI Zahlen, bisher in dem Staat, so dass für jeden IMEI in der CDR-Ereignis, können wir sagen, ob seine bekannten oder neuen IMEI.

Verwandte Themen