2017-05-29 23 views
1

Ich versuche, Nachrichten in eine JMS-Warteschlange zu schreiben, die ich in einem nächsten Schritt herausnehmen, um in eine Datenbank zu schreiben. Der erste Teil sollte mit dem zweiten Async synchronisiert werden. Der JMS-Teil ist wirklich langsam (1100 Einträge in der Warteschlange in 1 Minute).Spring Batch schreiben zu ActiveMQ

So sieht mein Job aus.

@Bean 
public Job multiThreadedStepJob() { 
    Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end(); 
    Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end(); 
    Flow splitFlow = new FlowBuilder<Flow>("splitflow") 
    .split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build(); 

    return jobBuilders.get("multiThreadedStepJob") 
          .start(splitFlow).end().build(); 

} 

Der erste Schritt:

@Bean 
public Step step() { 
    return stepBuilders.get("step") 
     .<OrderDTO, OrderDTO>chunk(CHUNK_SIZE) 
     .reader(reader()) 
     .writer(writer()) 
     .build(); 
} 

der zweite Schritt:

@Bean 
public Step step2() { 
    return stepBuilders.get("step2") 
      .<OrderDTO, OrderDTO>chunk(100) 
      .reader(reader2()) 
      .writer(writer2()) 
      .build(); 
} 

Ich denke, dass meine Fehler in dem Schreiber Schritt und der Leser Schritt 2 sind, weil ich laufen kann der andere Leser und Schreiber zusammen und ich habe keine Probleme.

@Bean 
public JmsItemWriter<OrderDTO> writer() { 
    JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>(); 
    itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate()); 
    return itemWriter; 
} 

@Bean 
public JmsItemReader<OrderDTO> reader2() { 
    JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>(); 
    itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate()); 
    itemReader.setItemType(OrderDTO.class); 
    return itemReader; 
} 

Sie verwenden die gleiche JmsTemplate in die Warteschlange für die Verbindung:

@Bean 
public JmsTemplate jmsTemplate() { 
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory()); 
    jmsTemplate.setDefaultDestination(queue()); 
    jmsTemplate.setReceiveTimeout(500); 
    return jmsTemplate; 
} 

@Bean 
public Queue queue() { 
    return new ActiveMQQueue("orderList"); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); 
    factory.setTrustAllPackages(true); 

    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 
    prefetchPolicy.setQueuePrefetch(30); 

    factory.setPrefetchPolicy(prefetchPolicy); 

    PooledConnectionFactory pool = new PooledConnectionFactory(factory); 
    pool.setMaxConnections(10); 
    pool.setMaximumActiveSessionPerConnection(10); 
    pool.isCreateConnectionOnStartup(); 

    return pool; 
} 

Der Rest der Konfiguration verwende ich die Konfiguration aus @EnableBatchProcessing ist. Weiß jemand, warum das so langsam geht?

Antwort

1

Offenbar jmsTemplate.setSessionTransacted (wahr); ist wirklich wichtig. Dies beschleunigte das Schreiben und Lesen aus der JMS-Warteschlange sehr. Aus irgendeinem Grund dachte ich, der Standard wäre wahr, weil ich mit Chargen arbeite.

Wie auch immer, wenn jemand anderes dieses Problem hat, überprüfen Sie das zuerst, weil es leicht zu vergessen ist.