ich verwenden Ich versuche, ein Projekt von Apache-Sturm zu migrieren, um Reiher zu twittern-. Nach vielen Kämpfen konnte ich die meisten Fehler loswerden, zB className: "org.apache.storm.kafka.ZkHosts"
statt className: "storm.kafka.ZkHosts"
verwenden. Ich bin jedoch beim Einreichen der Topologie festgefahren. Ich benutze Flux, um die Topologie an Sturm zu übergeben.Wie Twitter-Reiher mit Sturm Flux
Ich erhalte eine Nullpointer, wenn es ein CuratorFramework Objekt in ZkState schafft. Beim weiteren Graben fand ich an issue in github, wo es heißt, dieses Problem wird verursacht, wenn die Konfigurationen über den Tierpfleger nicht festgelegt wurden.
Weitere Debuggen ich das Problem gefunden, weil ich die folgenden Konfigurationen fehle, die in ZkState.java:46 erforderlich sind.
storm.zookeeper.session.timeout
storm.zookeeper.connection.timeout
storm.zookeeper.retry.times
storm.zookeeper.retry.interval
Während ich es geschafft haben, das Problem zu identifizieren, jedoch bin ich nicht sicher, wo diese in meine Config hinzuzufügen. Kann mir bitte jemand helfen, wo ich die obige Konfiguration hinzufügen kann. Vielen Dank.
Mein Flux Config
name: "My_Topology"
components:
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "localhost:2181"
- id: "SpoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts" # brokerHosts
- "my-topic" # topic
- "/my-zkRoot" # zkRoot
- "my-id" # spoutId
properties:
- name: "zkServers"
value: ["localhost"]
- name: "zkPort"
value: 2181
- name: "zkRoot"
value: "/my-zkRoot"
- name: "retryInitialDelayMs"
value: 2000
- name: "retryDelayMultiplier"
value: 2
config:
topology.workers: 5
topology.testing.always.try.serialize: true
spouts:
- id: "kafka-spout"
className: "org.apache.storm.kafka.KafkaSpout"
parallelism: 1
constructorArgs:
- ref: "SpoutConfig"
bolts:
- id: "my-bolt"
className: "com.example.sample.MyBolt"
parallelism: 1
streams:
- name: "kafka_spout --> my_bolt"
from: "kafka-spout"
to: "my-bolt"
grouping:
type: SHUFFLE