2013-11-01 10 views
7

Ich versuche, Millionen von Zeilen aus einer Datenbank zu lesen und in eine Textdatei zu schreiben. clojure.java.jdbc/Abfrage große Ergebnismenge träge

Dies ist eine Fortsetzung meiner Frage database dump to text file with side effects

Mein Problem scheint jetzt zu sein, dass die Protokollierung nicht passieren, bis das Programm abgeschlossen ist. Ein weiterer Hinweis darauf, dass ich nicht langsam arbeite, ist, dass die Textdatei erst geschrieben wird, wenn das Programm beendet ist.

Basierend auf einem IRC-Tipp scheint es, dass mein Problem wahrscheinlich mit :result-set-fn und doall im Bereich clojure.java.jdbc/query des Codes zu tun hat.

Ich habe versucht, dies durch eine for Funktion zu ersetzen, aber immer noch feststellen, dass Speicherverbrauch hoch ist, da es die gesamte Ergebnismenge in den Speicher zieht. Wie kann ich eine :result-set-fn haben, die nicht alles wie doall zieht? Wie kann ich die Protokolldatei progressiv schreiben, während das Programm ausgeführt wird, anstatt alles zu dumpen, sobald die Ausführung beendet ist?

(let [ 
      db-spec    local-postgres 
      sql     "select * from public.f_5500_sf " 
      log-report-interval 1000 
      fetch-size   100 
      field-delim   "\t"                 
      row-delim   "\n"                 
      db-connection  (doto (j/get-connection db-spec) (.setAutoCommit false)) 
      statement   (j/prepare-statement db-connection sql :fetch-size fetch-size) 
      joiner    (fn [v] (str (join field-delim v) row-delim))      
      start    (System/currentTimeMillis)            
      rate-calc   (fn [r] (float (/ r (/ (- (System/currentTimeMillis) start) 100)))) 
      row-count   (atom 0)                
      result-set-fn  (fn [rs] (lazy-seq rs)) 
      lazy-results   (rest (j/query db-connection [statement] :as-arrays? true :row-fn joiner :result-set-fn result-set-fn)) 
      ]; }}} 
     (.setAutoCommit db-connection false) 
     (info "Started dbdump session...")  
     (with-open [^java.io.Writer wrtr (io/writer "output.txt")] 
     (info "Running query...")  
     (doseq [row lazy-results] 
      (.write wrtr row) 
     )) 
     (info (format "Completed write with %d rows" @row-count)) 
    ) 

Antwort

7

Ich habe die letzten Korrekturen für clojure.java.jdbc von [org.clojure/java.jdbc "0.3.0-beta1"] in meinem project.clj Abhängigkeit Eintrag setzen. Dieser verbessert/korrigiert die :as-arrays? true Funktionalität von clojure.java.jdbc/query beschrieben here.

Ich denke, das half etwas, aber ich konnte immer noch die :result-set-fn zu vec überschreiben.

Das Kernproblem wurde gelöst, indem alle Zeilenlogik in :row-fn gesteckt wurde. Die anfänglichen OutOfMemory-Probleme hatten damit zu tun, dass man durch j/query Ergebnismengen iterierte, anstatt die spezifische :row-fn zu definieren.

Neu (in Betrieb) Code ist unten:

(defn -main [] 
    (let [; {{{ 
     db-spec    local-postgres 
     source-sql   "select * from public.f_5500 " 
     log-report-interval 1000 
     fetch-size   1000 
     row-count   (atom 0) 
     field-delim   "\u0001" ; unlikely to be in source feed, 
             ; although i should still check in 
             ; replace-newline below (for when "\t" 
             ; is used especially) 
     row-delim   "\n" ; unless fixed-width, target doesn't 
            ; support non-printable chars for recDelim like 
     db-connection  (doto (j/get-connection db-spec) (.setAutoCommit false)) 
     statement   (j/prepare-statement db-connection source-sql :fetch-size fetch-size :concurrency :read-only) 
     start    (System/currentTimeMillis) 
     rate-calc   (fn [r] (float (/ r (/ (- (System/currentTimeMillis) start) 100)))) 
     replace-newline  (fn [s] (if (string? s) (clojure.string/replace s #"\n" " ") s)) 
     row-fn    (fn [v] 
           (swap! row-count inc) 
           (when (zero? (mod @row-count log-report-interval)) 
           (info (format "wrote %d rows" @row-count)) 
           (info (format "\trows/s %.2f" (rate-calc @row-count))) 
           (info (format "\tPercent Mem used %s " (memory-percent-used)))) 
           (str (join field-delim (doall (map #(replace-newline %) v))) row-delim)) 
     ]; }}} 
    (info "Started database table dump session...") 
    (with-open [^java.io.Writer wrtr (io/writer "./sql/output.txt")] 
     (j/query db-connection [statement] :as-arrays? true :row-fn 
       #(.write wrtr (row-fn %)))) 
    (info (format "\t\t\tCompleted with %d rows" @row-count)) 
    (info (format "\t\t\tCompleted in %s seconds" (float (/ (- (System/currentTimeMillis) start) 1000)))) 
    (info (format "\t\t\tAverage rows/s %.2f" (rate-calc @row-count))) 
    nil) 
) 

Andere Dinge, die ich (mit mäßigem Erfolg) experimentiert beteiligten das Timbre Logging und stardard Ausschalten aus; Ich habe mich gefragt, ob mit der Verwendung einer REPL die Ergebnisse zwischengespeichert werden können, bevor sie an meinen Editor (vim fire) zurückgegeben werden, und ich war mir nicht sicher, ob das viel Speicherkapazität beansprucht.

Auch habe ich die Logging-Teile um Speicher frei mit (.freeMemory (java.lang.Runtime/getRuntime)) hinzugefügt.Ich war mit VisualVM nicht so vertraut und konnte genau feststellen, wo mein Problem lag.

Ich bin glücklich, wie es jetzt funktioniert, danke allen für Ihre Hilfe.

3

können Sie verwenden prepare-statement mit der :fetch-size Option. Ansonsten ist die Abfrage selbst trotz der Ergebnisse, die in einer verzögerten Sequenz geliefert werden, eifrig.

prepare-statement erfordert ein Verbindungsobjekt, daher müssen Sie eines explizit erstellen. Hier ist ein Beispiel dafür, wie Ihre Nutzung aussehen könnte:

(let [db-spec local-postgres 
     sql  "select * from big_table limit 500000 " 
     fetch-size 10000 ;; or whatever's appropriate 
     cnxn  (doto (j/get-connection db-spec) 
        (.setAutoCommit false)) 
     stmt  (j/prepare-statement cnxn sql :fetch-size fetch-size) 
     results (rest (j/query cnxn [stmt]))] 
    ;; ... 
) 

Eine weitere Option

Da das Problem mit query zu sein scheint, versuchen with-query-results. Es gilt als veraltet, ist aber immer noch da und funktioniert. Hier ist ein Beispiel für die Verwendung:

(let [db-spec local-postgres 
     sql  "select * from big_table limit 500000 " 
     fetch-size 100 ;; or whatever's appropriate 
     cnxn  (doto (j/get-connection db-spec) 
        (.setAutoCommit false)) 
     stmt  (j/prepare-statement cnxn sql :fetch-size fetch-size)] 
    (j/with-query-results results [stmt] ;; binds the results to `results` 
    (doseq [row results] 
     ;; 
    ))) 
+1

ich die Verbindung hinzugefügt haben, machte das: Ergebnis-Typ vorwärtsgerichteten, hinzugefügt Cursor , machte es: schreibgeschützt, und setze die fetch-size auf 1000 und dann auf 100. Ich habe immer noch keine jvm heap size, wenn ich versuche, größere result sets zu holen. Ich habe meine Frage oben aktualisiert, um den neuen Code einzuschließen ... ich bin ratlos, was an dieser Stelle begierig sein könnte ... – joefromct

+0

@joegrangect, versuche Autocommit zu deaktivieren - '(.setAutoCommit db-connection false) '. Ich habe es dem Beispielcode in meiner Antwort hinzugefügt. Nebenbei, ein Teil der Schwierigkeit besteht darin, dass "setFetchSize" lediglich ein Hinweis auf den Treiber ist ([gemäß den API-Dokumenten] (http://docs.oracle.com/javase/1.5.0/docs/api/java/). sql/Statement.html # setFetchSize (int))), so wie es interpretiert wird, kann zwischen den Treibern variieren. PostgreSQLs [JDBC-Dokumente] (http://jdbc.postgresql.org/documentation/head/query.html) zeigen jedoch an, dass es unterstützt wird. Ich denke, wir müssen nur die richtige Beschwörungsformel finden. – jbm

+0

Um zu verdeutlichen, 'setFetchSize' ist eine Methode' prepare-statement', die intern basierend auf dem ': fetch-size'-Argument aufgerufen wird, nicht etwas Zusätzliches, das Sie in Ihrem Code benötigen. – jbm

1

Ich habe eine bessere Lösung gefunden: Sie müssen einen Cursor deklarieren und Stücke von Daten von ihm in einer Transaktion holen. Beispiel:

(db/with-tx 
    (db/execute! "declare cur cursor for select * from huge_table") 
    (loop [] 
     (when-let [rows (-> "fetch 10 from cur" db/query not-empty)] 
     (doseq [row rows] 
      (process-a-row row)) 
     (recur)))) 

Hier db/with-tx, db/execute! und db/query sind meine eigene Verknüpfungen in db Namespace deklariert:

(def ^:dynamic 
    *db* {:dbtype "postgresql" 
     :connection-uri <some db url>)}) 

(defn query [& args] 
    (apply jdbc/query *db* args)) 

(defn execute! [& args] 
    (apply jdbc/execute! *db* args)) 

(defmacro with-tx 
    "Runs a series of queries into transaction." 
    [& body] 
    `(jdbc/with-db-transaction [tx# *db*] 
    (binding [*db* tx#] 
     [email protected]))) 
+0

danke dafür, ich werde es versuchen. Es ist bedauerlich, dass die spezifische Syntax von pg sql in 'db/execute!' Verwendet werden muss. Ich habe auf Postgres geübt, aber ich habe versucht, etwas Datenbank-Agnostic zu bauen. Vielen Dank, – joefromct

Verwandte Themen