2017-08-06 3 views
0

Ich habe Schwierigkeiten, dieses einfache kaskadierende Programm zu starten. Aus irgendeinem Grund tut es nichts. Zumindest würde ich erwarten, dass es die Aufzeichnungen druckt. Jede Hilfe wäre willkommen.Führen Sie ein einfaches Cascading-Programm im lokalen Modus

package com.myLearning.cascading; 

import cascading.flow.Flow; 
import cascading.flow.FlowDef; 
import cascading.flow.local.LocalFlowConnector; 
import cascading.operation.Debug; 
import cascading.operation.expression.ExpressionFilter; 
import cascading.pipe.Each; 
import cascading.pipe.Pipe; 
import cascading.scheme.Scheme; 
import cascading.scheme.local.TextDelimited; 
import cascading.tap.SinkMode; 
import cascading.tap.Tap; 
import cascading.tap.local.FileTap; 
import cascading.tuple.Fields; 

public class operations_example 
{ 
    public static void main(String[] args) 
    { 
    Scheme sourceScheme = new TextDelimited(new Fields("username", "age"), true, ","); 
    String sourcePath = "C:/Users/Desktop/cascading/data/names.txt"; 
    Tap sourceTap = new FileTap(sourceScheme, sourcePath); 

    Scheme targetScheme = new TextDelimited(new Fields("username", "age"), true, ","); 
    String targetPath = "C:/Users/Desktop/cascading/data/output2.txt"; 
    Tap targetTap = new FileTap(targetScheme, targetPath, SinkMode.REPLACE); 

    Pipe dataPipe = new Pipe("data"); 
    dataPipe = new Each(dataPipe, new Debug()); 
    ExpressionFilter filter = new ExpressionFilter("age >= 30", Integer.TYPE); 

    dataPipe = new Each(dataPipe,new Fields("username","age"), filter); 

    FlowDef flowdef = FlowDef.flowDef(). 
      addSource(dataPipe, sourceTap). 
      addTailSink(dataPipe, targetTap); 

    Flow flow = new LocalFlowConnector().connect(flowdef); 
    flow.stop();  
    } 
} 

Antwort

Verwandte Themen