2017-05-20 2 views
0

ich einen Batch-Job mit zwei Schritten habenSpring Batch: Alternative zu JpaPagingItemReader, die bewirkt, dass ORA-01555

  1. Schritt 1: auf externe Datenbank gehen, gespeicherte Prozedur aufrufen, JPA-Entitäten zusammenstellen und sie in interne persistieren Datenbank mit dem Flag NOT_PROCESSED.
  2. Schritt 2: Schleife durch gerade gespeichert Einheiten mit Flagge NOT_PROCESSED, verarbeiten sie und aktualisiert Einheit zurückschreiben (nicht aktualisiert, die die Flagge)

Sobald alle von ihnen verarbeitet werden, die die Flagge für alle von ihnen wird auf VERARBEITET aktualisiert. I.e. aktualisiere alles oder nichts.

Der Schritt 1 ist in Ordnung und funktioniert ziemlich glatt.

Der Schritt 2 ist im Grunde JpaPagingItemReader mit pageSize = 4, Satz von Prozessoren (meist http Aufrufe) und JpaItemWriter mit Commit-Intervall = 1. (Ich weiß, dass es pageSize gleich Commit-Intervall ist, es ist genau das, was ich habe) Es ist auch ein Multithread-Schritt mit 10 Threads, die den Job erledigen.

Das heißt auf der Stufe 2 I zwei Art von Anfragen haben:

  1. lesen: select * from ENTITY where processed=false order by id Blättern in zwei Abfragen verschachtelt select ... from (select .. where rownum < M) where rownum >= N

  2. schreiben: update ENTITY set .. where id = ID

Für Irgendein Grund, wenn ich genug Entitäten habe, werde ich berüchtigt:

Ora-01555, Snapshot zu alt: Rollback-Segment mit dem Namen „“ zu klein

Ich weiß nicht, genauer Grund dieses Fehlers (stat rückgängig machen nichts schlecht zeigen, hoffentlich DBAs Ich werde den Schuldigen bald finden), aber in der Zwischenzeit denke ich, dass das, was Leseabfrage tut, furchtbar schlecht ist. Solche Paging-Abfragen sind sowieso für eine Datenbank schwierig, aber ich denke, wenn Sie die Einträge lesen und gleichzeitig aktualisieren, die Sie lesen, kann dies zu solchen Fehlern führen.

Ich möchte den Ansatz in Schritt 2 ändern. Anstatt Seiten einzulesen. Ich möchte alle IDs in den Speicher nur einmal lesen (d. H. Gib mir IDs aller Entitäten, die ich verarbeiten muss) und gebe dann jedem Thread die ID aus dieser Liste. Der erste Prozessor in der Kette erhält die Entity durch die ID durch JPA. Auf diese Weise aktualisiere und schreibe ich die Entitäten nacheinander und gleichzeitig lese ich die IDs, die ich nur einmal brauche.

Mein Problem ist, dass ich keine Out-of-the-Box-Lösung für solche Leser finden konnte. Kann ich irgendetwas dafür verwenden?

+0

Warum führen Sie sowohl select + update als eine einzelne SQL-Anweisung aus? Auch was ist dein Isolationslevel? – ibre5041

+0

Da, um Werte für das Update zu erhalten, gibt es eine sehr komplizierte Logik, wenn ich zu stark vereinfache - das Update hängt von externen Serviceaufrufen ab. Standard für Orakel, afair wird gelesen. –

+0

Sie können diesen Fehler aus zwei Hauptgründen erhalten, 1. wenn Sie lobs ändern, 2. dauert die Ausführung Ihrer Abfrage zu viel Zeit. Das "zu viel" wird normalerweise als Spalte TUNED_UNDORETENTION aus der Sicht v $ undostat definiert. Dadurch wird angezeigt, was für eine sinnvolle Aufbewahrungsfrist für Ihre Datenbank mit bestimmter UNDO tbs-Größe und bestimmter Transaktionsaktivität gilt. – ibre5041

Antwort

0

Nun, ich implementierte die Lösung von mir selbst und es basiert auf this und this. Tatsächlich habe ich diese nicht direkt benutzt, aber meine Implementierung ist ziemlich nah.

Im Grunde ist, wie es aussieht (ich habe den Code nicht, so meine Erinnerung mit)

public class MyUnprocessedIdReader extends AbstractItemCountingItemStreamItemReader<Long> { 

    private final Object lock = new Object(); 

    private initialized = false; 

    private final MyObjectsRepository repo; 

    private List<Long> ids; 

    private int current = -1; 

    public MyUnprocessedIdReader(MyObjectsRepository repo) { 
     this.repo = repo; 
    } 

    public void doOpen() { 
     synchronized(lock) { 
      Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first"); 

      this.initialized = true; 
      this.ids = ImmutableList.copyOf(repo.findAllUnprocessed()); 
     } 
    } 

    public Long doRead() { 
     synchronized(lock) { 
      if (ids == null || !initialized) { 
      throw new IllegalStateException("Have you opened the reader?"); 
      } 

      ++current; 
      if (current < ids.size()) { 
       return ids.get(current); 
      } else { 
       return null; 
      } 
     } 
    } 

    public void doClose() { 
     synchronized(lock) { 
      this.initialized = false; 
      this.current = -1; 
      this.ids = null; 
     }  
    } 
} 

Mein Repository JPA wird mit so unter der Haube es so etwas wie entityManager.createQuery("select obj.id from Objects where obj.processed = false order by obj.id asc", Long.class).executeSelect()

verwendet

ich habe auch einen weiteren Prozessor zu der Kette hinzugefügt:

public class LoadProcessor implements ItemProcessor<Long, MyObject> { 
    private final MyObjectsRepository repo; 

    public LoadProcessor(MyObjectsRepository repo) { 
     this.repo = repo; 
    } 

    public MyObject process(Long id) { 
     return repo.findById(id); 
    } 
} 

man kann sagen, dass es weniger skalierbar als Cursor, auch gibt es eine Konkurrenz beim Lese metho d, aber es ist eine sehr einfache Lösung, die ihre Arbeit gut macht, bis die Anzahl der unverarbeiteten IDs zu groß ist. Auch das Verarbeiten von Threads kostet viel Zeit beim Aufrufen externer REST-Services, sodass die Konkurrenz beim Lesen keinen Engpass darstellt.

Ps.s. später werde ich ein Update geben, ob es das Problem mit ORA-01555 gelöst hat oder nicht.

Verwandte Themen