Ich versuche, von einem Thema (Eltern) zu einem anderen Thema (Kind) in Kafka basierend auf dem Inhalt der Datensätze im Eltern zu schreiben. Eine Beispielaufzeichnung, wenn ich vom übergeordneten Thema konsumiere, ist {"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563"},"event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":{"string":"ISWSnk"},"msg_param_2":{"string":"Application startup"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null,"app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}
.Schreiben an kafka Thema basierend auf dem Inhalt auf Inhalt von Datensatz mit kafkastreams
Ich mag wird den Wert von Einheit verwenden, um ein Thema, mit dem gleichen Namen wie der Wert der Einheit (Es gibt eine feste Menge von Werten von Einheit zu schreiben, so kann ich statisch dass erstellen, wenn es schwierig ist, programmatisch dynamisch Themen erstellen). Ich versuche, diesen
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class entityDataLoader {
public static void main(final String[] args) throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Set up serializers and deserializers, which we will use for overriding the default serdes
// specified above.
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
// In the subsequent lines we define the processing topology of the Streams application.
final KStreamBuilder builder = new KStreamBuilder();
// Read the input Kafka topic into a KStream instance.
final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events");
String content = textLines.toString();
String entity = JSONExtractor.returnJSONValue(content, "entity");
System.out.println(entity);
textLines.to(entity);
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Den Inhalt Inhalte zu verwenden ist [email protected]
macht es offensichtlich, dass @ KStream.toString() ist nicht die richtige Methode zu verwenden, um zu versuchen, den Wert des Unternehmens zu erhalten .
P.S. Die JSONExtractor Klasse ist definiert als
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import org.json.simple.parser.JSONParser;
class JSONExtractor {
public static String returnJSONValue(String args, String value){
JSONParser parser = new JSONParser();
String app= null;
System.out.println(args);
try{
Object obj = parser.parse(args);
JSONObject JObj = (JSONObject)obj;
app= (String) JObj.get(value);
return app;
}
catch(ParseException pe){
System.out.println("No Object found");
System.out.println(pe);
}
return app;
}
}