Spring Batchは、エンタープライズシステムの日常的な運用に不可欠なバッチ処理を開発するための包括的なフレームワークです。大量データの処理、トランザクション管理、ジョブの再起動機能、ステップベースの処理など、バッチアプリケーション開発に必要な機能を提供します。Spring Batchは、Spring Frameworkの機能を活用し、依存性注入、宣言的トランザクション管理、バッチ処理に特化したリスナーなどの機能を統合しています。
Spring Batchは、特に以下のような状況で有用です:
+------------------------------------------+ | Spring Batch Framework | | | | +-------------+ +-------------+ | | | | | | | | | Job |------>| Step | | | | | | | | | +-------------+ +------+------+ | | | | | v | | +--------------------+ | | | | | | +------+------+ +-------+------+ | | | | | | | | | ItemReader | | ItemProcessor| | | | | | | | | +------+------+ +-------+------+ | | | | | | | v | | | +----------------+ | | | | | | | | | ItemWriter | | | | | | | | | +----------------+ | | | | | +--------------------------------+ | | +------------------------------------------+
図1: Spring Batchアーキテクチャの概要 - ジョブ、ステップ、ItemReader/Processor/Writerの関係
Spring Batchの主な目的は以下の通りです:
Spring Batchでは、バッチ処理は「ジョブ」と呼ばれる単位で構成されます。各ジョブは1つ以上の「ステップ」から構成され、各ステップは特定の処理タスクを実行します。ジョブとステップの階層構造により、バッチ処理を論理的に分割し、再利用性と保守性を向上させることができます。
ジョブは、JobBuilderFactory
を使用して定義します:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job importUserJob(Step step1, Step step2) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(new JobCompletionNotificationListener())
.flow(step1)
.next(step2)
.end()
.build();
}
}
ステップは、StepBuilderFactory
を使用して定義します:
@Bean
public Step step1(ItemReader<Person> reader, ItemProcessor<Person, Person> processor, ItemWriter<Person> writer) {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
Spring Batchのチャンク処理は、大量データを効率的に処理するための重要な機能です。チャンク処理では、データを小さな「チャンク」に分割し、各チャンクをトランザクション境界内で処理します。これにより、メモリ効率が向上し、エラー発生時の影響を最小限に抑えることができます。
チャンク処理は、以下の3つのコンポーネントから構成されます:
チャンク処理の流れは以下の通りです:
Spring Batchは、ステップレベルでのトランザクション管理を提供します。各ステップ(またはチャンク)は、独自のトランザクション境界内で実行され、すべての処理が成功した場合にのみコミットされます。エラーが発生した場合、トランザクションはロールバックされ、ステップは失敗としてマークされます。
トランザクション管理は、@Transactional
アノテーションやTransactionManager
を使用して設定できます:
@Bean
public Step step1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.transactionManager(transactionManager)
.build();
}
バッチ処理の最上位単位。1つ以上のステップから構成される
ジョブ内の独立した処理単位。チャンク指向または tasklet 指向
データソースからアイテムを読み込むためのインターフェース
読み込んだアイテムを処理するためのインターフェース
処理したアイテムを書き込むためのインターフェース
ジョブの実行状態を永続化するためのリポジトリ
ジョブを実行するためのインターフェース
ジョブの実行状態を読み取るためのインターフェース
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job importUserJob(Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(new JobCompletionNotificationListener())
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(ItemReader<Person> reader, ItemProcessor<Person, Person> processor, ItemWriter<Person> writer) {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}
@Bean
public ItemProcessor<Person, Person> processor() {
return person -> {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
return transformedPerson;
};
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.build();
}
Spring Batchは、バッチジョブのテストを容易にするための機能を提供します。@SpringBatchTest
アノテーションを使用することで、テストに必要なコンテキストを自動的に設定できます。
@SpringBatchTest
@SpringBootTest
class ImportUserJobTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
void testJob() throws Exception {
// ジョブの実行
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
// 結果の検証
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
@SpringBatchTest
@SpringBootTest
class ImportUserStepTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
void testStep1() throws Exception {
// ステップの実行
JobExecution jobExecution = jobLauncherTestUtils.launchStep("step1");
// 結果の検証
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
Spring Batchは、並列処理をサポートしており、複数のステップを並列に実行したり、単一のステップ内で並列処理を行ったりすることができます。
@Bean
public Job parallelJob(Step step1, Step step2) {
return jobBuilderFactory.get("parallelJob")
.start(splitFlow(step1, step2))
.end()
.build();
}
@Bean
public Flow splitFlow(Step step1, Step step2) {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(new SimpleAsyncTaskExecutor())
.add(flow1(step1), flow2(step2))
.build();
}
@Bean
public Flow flow1(Step step1) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.build();
}
@Bean
public Flow flow2(Step step2) {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step2)
.build();
}
Spring Batchは、ステップの実行結果に基づいて条件分岐を行うことができます。
@Bean
public Job conditionalJob(Step step1, Step successStep, Step failureStep) {
return jobBuilderFactory.get("conditionalJob")
.start(step1)
.on("COMPLETED").to(successStep)
.from(step1).on("FAILED").to(failureStep)
.end()
.build();
}
Spring Batchは、ジョブとステップのライフサイクルイベントをフックするためのリスナー機能を提供します。
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private final JdbcTemplate jdbcTemplate;
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
jdbcTemplate.query("SELECT first_name, last_name FROM people",
(rs, row) -> new Person(rs.getString(1), rs.getString(2))
).forEach(person -> log.info("Found <" + person + "> in the database."));
}
}
}
Spring Batchの基本設定は、@EnableBatchProcessing
アノテーションを使用して行います。このアノテーションにより、Spring Batchに必要なインフラストラクチャが自動的に設定されます。
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
// バッチジョブの設定
}
Spring Batchは、ジョブリポジトリとしてデータベースを使用します。デフォルトでは、メモリ内データベースが使用されますが、本番環境では永続的なデータベースを使用することが推奨されます。
# application.properties
spring.datasource.url=jdbc:mysql://localhost:3306/batch
spring.datasource.username=user
spring.datasource.password=password
spring.batch.jdbc.initialize-schema=always
Spring Batchは、ジョブパラメータを使用してジョブの実行を制御することができます。ジョブパラメータは、ジョブの実行時に指定され、ジョブ内で参照できます。
@Bean
public Job parameterizedJob(Step step1) {
return jobBuilderFactory.get("parameterizedJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Person> reader(@Value("#{jobParameters['inputFile']}") String inputFile) {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new FileSystemResource(inputFile))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}
特徴 | Spring Batch | Apache Camel | Quartz |
---|---|---|---|
主な用途 | バッチ処理 | メッセージングとインテグレーション | ジョブスケジューリング |
データ処理 | チャンク指向 | メッセージ指向 | なし |
スケジューリング | 外部スケジューラが必要 | 基本的なスケジューリング機能 | 高度なスケジューリング機能 |
トランザクション管理 | 強力 | 基本的 | なし |
エラーハンドリング | 強力 | 基本的 | 基本的 |
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
Spring Batchは、Spring Bootと統合することで、より簡単に使用できます。Spring Boot Starter Batchを使用することで、必要な依存関係が自動的に追加され、基本的な設定が自動的に行われます。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
# application.properties
spring.batch.job.enabled=true
spring.batch.jdbc.initialize-schema=always
@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
Spring Batchは、エンタープライズシステムのバッチ処理を開発するための包括的なフレームワークです。大量データの処理、トランザクション管理、ジョブの再起動機能、ステップベースの処理など、バッチアプリケーション開発に必要な機能を提供します。Spring Batchは、Spring Frameworkの機能を活用し、依存性注入、宣言的トランザクション管理、バッチ処理に特化したリスナーなどの機能を統合しています。
Spring Batchの主な利点は以下の通りです:
Spring Batchは、大量データの処理、定期的なジョブの実行、複雑なビジネスルールに基づくデータ変換など、様々なバッチ処理のニーズに対応できる強力なフレームワークです。特に、エンタープライズシステムでの使用に適しており、堅牢で信頼性の高いバッチ処理を実現することができます。