Ich bekomme den Nachrichtenstrom von 3 MB von Kafka Thema, aber der Standardwert ist 1 MB. Jetzt habe ich die kafka-Eigenschaften von 1MB auf 3MB geändert, indem ich die folgenden Zeilen in die Dateien kafa consumer.properties und server.properties eingefügt habe.Wie Standard Kafka SpoutConfig Klasse ändern
fetch.message.max.bytes=2048576 (consumer.properties)
filemessage.max.bytes=2048576 (server.properties)
replica.fetch.max.bytes=2048576 (server.properties)
Jetzt nach dem Hinzufügen der obigen Zeilen in Kafka, gehen 3MB Nachrichtendaten in kafka Datenprotokolle. Aber STORM kann diese 3MB-Daten nicht verarbeiten und kann nur die Standardgröße, d. H. 1MB-Daten, lesen.
So, wie Sie diese Konfigurationen ändern, um die 3MB Daten zu verarbeiten/lesen. Hier ist meine Topologieklasse.
String argument = args[0];
Config conf = new Config();
conf.put(JDBC_CONF, map);
conf.setDebug(true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//set the number of workers
conf.setNumWorkers(3);
TopologyBuilder builder = new TopologyBuilder();
//Setup Kafka spout
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "year1234";
String zkRoot = "";
String consumerGroupId = "group1";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("KafkaSpout", kafkaSpout,1);
builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");
builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");
http://stackoverflow.com/questions/36159768/how-to-set-spoutconfig-from-default-setting Duplikat – LiozM