2017-03-25 4 views
0

Ich versuche, von einem Thema (Eltern) zu einem anderen Thema (Kind) in Kafka basierend auf dem Inhalt der Datensätze im Eltern zu schreiben. Eine Beispielaufzeichnung, wenn ich vom übergeordneten Thema konsumiere, ist {"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563"},"event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":{"string":"ISWSnk"},"msg_param_2":{"string":"Application startup"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null,"app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}.Schreiben an kafka Thema basierend auf dem Inhalt auf Inhalt von Datensatz mit kafkastreams

Ich mag wird den Wert von Einheit verwenden, um ein Thema, mit dem gleichen Namen wie der Wert der Einheit (Es gibt eine feste Menge von Werten von Einheit zu schreiben, so kann ich statisch dass erstellen, wenn es schwierig ist, programmatisch dynamisch Themen erstellen). Ich versuche, diesen

import org.apache.kafka.common.serialization.Serde; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.KeyValue; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 
import java.util.Properties; 

public class entityDataLoader { 
    public static void main(final String[] args) throws Exception { 
    final Properties streamsConfiguration = new Properties(); 
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example"); 
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); 
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

    // Set up serializers and deserializers, which we will use for overriding the default serdes 
    // specified above. 
    final Serde<String> stringSerde = Serdes.String(); 
    final Serde<byte[]> byteArraySerde = Serdes.ByteArray(); 

    // In the subsequent lines we define the processing topology of the Streams application. 
    final KStreamBuilder builder = new KStreamBuilder(); 

    // Read the input Kafka topic into a KStream instance. 
    final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events"); 

    String content = textLines.toString(); 
    String entity = JSONExtractor.returnJSONValue(content, "entity"); 
    System.out.println(entity); 

    textLines.to(entity); 

    final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 
    streams.cleanUp(); 
    streams.start(); 

    // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams 
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 
} 

Den Inhalt Inhalte zu verwenden ist [email protected] macht es offensichtlich, dass @ KStream.toString() ist nicht die richtige Methode zu verwenden, um zu versuchen, den Wert des Unternehmens zu erhalten .

P.S. Die JSONExtractor Klasse ist definiert als

import org.json.simple.JSONObject; 
import org.json.simple.parser.ParseException; 
import org.json.simple.parser.JSONParser; 
class JSONExtractor { 

public static String returnJSONValue(String args, String value){ 
    JSONParser parser = new JSONParser(); 
    String app= null; 
    System.out.println(args); 
    try{ 
     Object obj = parser.parse(args); 
     JSONObject JObj = (JSONObject)obj; 
     app= (String) JObj.get(value); 
     return app; 
    } 
    catch(ParseException pe){ 
     System.out.println("No Object found"); 
     System.out.println(pe); 
    } 
    return app; 
} 
} 

Antwort

1

Sie branch() können Ihre Mutterstrom in „Unterströme“ und schreiben jede „Unterstrom“ zu einem Ausgang Thema (vgl http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations)

Ihre branch() muss aufzuspalten Erstellen Sie einen einzelnen "sub stream" für alle von Ihnen ausgegebenen Themen, aber da Sie alle Themen kennen, sollte dies kein Problem sein.

Auch für Kafka Streams wird empfohlen, alle Ausgabethemen zu erstellen, bevor Sie Ihre Anwendung starten (vgl. http://docs.confluent.io/current/streams/developer-guide.html#user-topics)

Verwandte Themen