Ich versuche, Nachrichten in eine JMS-Warteschlange zu schreiben, die ich in einem nächsten Schritt herausnehmen, um in eine Datenbank zu schreiben. Der erste Teil sollte mit dem zweiten Async synchronisiert werden. Der JMS-Teil ist wirklich langsam (1100 Einträge in der Warteschlange in 1 Minute).Spring Batch schreiben zu ActiveMQ
So sieht mein Job aus.
@Bean
public Job multiThreadedStepJob() {
Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end();
Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
Flow splitFlow = new FlowBuilder<Flow>("splitflow")
.split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();
return jobBuilders.get("multiThreadedStepJob")
.start(splitFlow).end().build();
}
Der erste Schritt:
@Bean
public Step step() {
return stepBuilders.get("step")
.<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
.reader(reader())
.writer(writer())
.build();
}
der zweite Schritt:
@Bean
public Step step2() {
return stepBuilders.get("step2")
.<OrderDTO, OrderDTO>chunk(100)
.reader(reader2())
.writer(writer2())
.build();
}
Ich denke, dass meine Fehler in dem Schreiber Schritt und der Leser Schritt 2 sind, weil ich laufen kann der andere Leser und Schreiber zusammen und ich habe keine Probleme.
@Bean
public JmsItemWriter<OrderDTO> writer() {
JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemWriter;
}
@Bean
public JmsItemReader<OrderDTO> reader2() {
JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
itemReader.setItemType(OrderDTO.class);
return itemReader;
}
Sie verwenden die gleiche JmsTemplate in die Warteschlange für die Verbindung:
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
}
@Bean
public Queue queue() {
return new ActiveMQQueue("orderList");
}
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setTrustAllPackages(true);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(30);
factory.setPrefetchPolicy(prefetchPolicy);
PooledConnectionFactory pool = new PooledConnectionFactory(factory);
pool.setMaxConnections(10);
pool.setMaximumActiveSessionPerConnection(10);
pool.isCreateConnectionOnStartup();
return pool;
}
Der Rest der Konfiguration verwende ich die Konfiguration aus @EnableBatchProcessing ist. Weiß jemand, warum das so langsam geht?