2017-06-15 1 views
0

Ich versuche einen neuen Stream einzurichten, um einen Tika-Bolzen mit einer Warc-Schraube zu verbinden.Das Einrichten eines neuen Streams für Warc Bolt schlägt fehl

import com.digitalpebble.stormcrawler.tika.ParserBolt; 
import com.digitalpebble.stormcrawler.warc.WARCHdfsBolt; 

builder.setBolt("tika", new ParserBolt(), numWorkers) 
    .localOrShuffleGrouping("shunt","tika"); 

WARCHdfsBolt warcbolt = getWarcBolt("XX"); 

builder.setBolt("warc", warcbolt, numWorkers) 
    .localOrShuffleGrouping("tika", "warc"); 

In der Tika Definition habe ich die outputDeclarerFields Funktion wie folgt modifiziert meinen neuen „RWB“ Strom zu definieren:

@Override 
public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    declarer.declare(new Fields("url", "content", "metadata", "text")); 
    declarer.declareStream(StatusStreamName, new Fields("url", "metadata", "status")); 
    declarer.declareStream("warc", new Fields("url", "content", "metadata", "text")); 
} 

Allerdings, wenn ich die Topologie im lokalen Modus starten erhalte ich:

14308 [main] oasdsSlot WARN - SCHLITZ debian8: 1027 Ab Zustand LEER - Zuordnung null 14308 [main] oasdsSlot WARN - SCHLITZ debian8: 1028 Startin g in Zustand LEER - Zuordnung null 14308 [main] oasdsSlot WARN - SCHLITZ debian8: 1029 LEER in Zustand Start - Zuordnung null 14309 [main] INFO oaslAsyncLocalizer - Reinigung nicht verwendete Topologien in /tmp/up a1e3b7f5-e251- 40ae-a032-b0839ca103c8/supervisor/stormdist 14318 [main] INFO oasdsSupervisor - Startet den Supervisor mit der ID f42c64cd-7c36-40ab-9f85-4b7751ed2d6a auf dem Host debian8. 15030 [main] WARN o.a.s.d.nimbus - Ausnahmebedingung für Topologieübergabe. (Topologie name = 'xxCrawler') #fehler {: cause nil: über [{: type org.apache.storm.generated.InvalidTopologyException: message nil : at [org.apache.storm.daemon.common $ validate_structure_BANG_ aufrufen common.clj 185]}]: trace [[org.apache.storm.daemon.common $ validate_structure_BANG_ common.clj 185]
aufrufen [org.apache.storm.daemon.common $ system_topology_BANG_ gemeinsam aufrufen. CLJ 378]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782 submitTopologyWithOpts nimbus.clj 1694]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ re ify__10782 submitTopology nimbus.clj 1726]
[sun.reflect.NativeMethodAccessorImpl invoke0 NativeMethodAccessorImpl.java -2]
[sun.reflect.NativeMethodAccessorImpl NativeMethodAccessorImpl.java 62]
[sun.reflect.DelegatingMethodAccessorImpl aufrufen aufrufen DelegatingMethodAccessorImpl .java 43] [java.lang.reflect.Method aufrufen Methode.java 498] [clojure.lang.Reflector invokeMatchingMethod Reflector.java 93] [clojure.lang.Reflector invokeInstanceMethod Reflector.java 28] [org.apache. storm.testing $ submit_local_topology rufen test.clj 310]auf [org.apache.storm.LocalCluster $ _submitTopology aufrufen LocalCluster.clj 49] [org.apache.storm.LocalCluster submitTopology nil -1]
[com.digitalpebble.stormcrawler.ConfigurableTopology einreichen ConfigurableTopology.java 76]
[com.digitalpebble.stormcrawler.ConfigurableTopology einreichen ConfigurableTopology.java 65] [xx.xx.xx.xx.xxTopology xxTopology.java laufen 111]
[com.digitalpebble.stormcrawler.ConfigurableTopology ConfigurableTopology.java 50 starten] [xx.xx.xx.xx.xxTopology main xxTopology.java 53]]} 15035 [main] FEHLER oassoazsNIOServerCnxnFactory - Thread Thread [main, 5, main] gestorben org.apache.storm.generated.InvalidTopologyException: null bei org.apache.storm.daemon.common $ validate_structure_BANG_.invoke (common.clj: 185) ~ [storm-core-1.1.0.jar: 1.1.0] bei org.apache.storm. daemon.common $ system_topology_BANG_.invoke (common.clj: 378) ~ [storm-core-1.1.0.jar: 1.1.0] bei org.apache.storm.daemon.nimbus $ mk_refied_nimbus $ reify__10782.submitTopologyWithOpts (nimbus .clj: 1694) ~ [storm-core-1.1.0.jar: 1.1.0] bei org.apache.storm.daemon.nimbus $ mk_refied_nimbus $ reify__10782.submitTopologie (nimbus.clj: 1726) ~ [Sturm -core-1.1.0.jar: 1.1.0] bei sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Methode) ~ [?: 1.8.0_131] bei sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java : 62) ~ [? 1.8.0_131] bei sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) ~ [? 1.8.0_131] bei java.lang.reflect.Method.invoke (Methode .java: 498) ~ [?: 1.8.0_131] bei clojure.lang.Reflector.invokeMatchingMethod (Reflector.java:93) ~ [clojure-1.7.0.jar :?] bei clojure.lang.Reflector. invokeInstanceMethod (Reflector.java:28) ~ [clojure-1.7.0.jar :?] bei org.apache.storm.testing $ submit_local_topology.invoke (testing.clj: 310) ~ [storm-core-1.1. 0.jar: 1.1.0] bei org.apache.storm.LocalCluster $ _submitTopology.invoke (LocalCluster.clj: 49) ~ [storm-core-1.1.0.jar: 1.1.0] at org.apache.storm.LocalCluster.submitTopology (Unbekannte Quelle) ~ [storm-core-1.1.0.jar: 1.1.0] unter com.digitalpebble.stormcrawler.ConfigurableTopology.submit (ConfigurableTopology.java:76) ~ [xx-crawler-1.1.jar :?] unter com.digitalpebble.stormcrawler.ConfigurableTopology.submit (ConfigurableTopology.java:65) ~ [xx-1.1.jar :?] bei xx.xx.xx.xx .xxTopology.run (xxTopology.java:111) ~ [xx-crawler-1.1.jar :?] unter com.digitalpebble.stormcrawler.ConfigurableTopology.start (ConfigurableTopology.java:50) ~ [xx-crawler-1.1. jar :?] bei xx.xx.xx.xx.xxTopology.main (xxTopology.java:53) ~ [xx-crawler-1.1.jar:?]

Jede Hilfe würde sehr geschätzt werden !!

Beachten Sie, dass es funktioniert gut, wenn ich den Stream StatusStreamName ("Status") verwenden, um die Tika und Warc-Schrauben zu verbinden.

Danke,

Etienne

Antwort

0

WARCs aus dem rohen, nicht geparsten Inhalt erzeugt werden. Sie sollten den WARC mit dem Ausgang des Abrufers anstelle der Parser-Schraube verbinden.

Sie müssen keinen neuen Stream nur für den Warc deklarieren, Sie könnten einfach die Warc-Schraube mit dem Standardstrom verbinden, der aus der Tika-Schraube kommt.

Ich sehe in Ihren Code

Import com.digitalpebble.stormcrawler.tika.ParserBolt;

was bedeuten würde, dass Sie sich auf die Standardimplementierung verlassen (die keinen 'warc' Stream erzeugt). Könnten Sie vergessen haben, das durch Ihre modifizierte Implementierung zu ersetzen?

+0

Sehr geehrte Julien, Vielen Dank für die hilfreichen Kommentare. Was ich will ist, nur ausgewählte Webseiten zu WARC-Dateien zu archivieren. Die Auswahl basiert auf einer Regex, die sich im Parser befindet. Wenn es eine Übereinstimmung gibt, archiviere den Inhalt der Seite (mit dem Stream "warc"). Daher die Verbindung zwischen dem Parser und den Warc-Bolzen. Klingt dieser Ansatz korrekt für Sie? Oder sollte ich das sonst umsetzen? Nochmals vielen Dank! Etienne – EJO

+0

Hallo Etienne. Habe meine Antwort oben bearbeitet. Hatte vergessen, als ich anfänglich geantwortet habe, dass die Parser-Schrauben auch den binären Inhalt erzeugen, so dass du den Tika-Bolzen überhaupt nicht modifizieren musst. Sie könnten eine maßgeschneiderte Schraube schreiben, um den Inhalt der Metadaten zu überprüfen und nur die Tupel weiterzugeben, die eine K/V haben, die Sie während des Parsens gesetzt hätten. Wenn Sie nur mit HTML-Seiten arbeiten, dann bleiben Sie bei der JsoupParser-Schraube. BTW war das Anfangsproblem aufgrund eines falschen Imports. Wenn ja, bitte meine Antwort als richtig markieren. Vielen Dank! –

Verwandte Themen