メインページに戻る | Spring Framework & Spring Bootに戻る

Spring Batch

Spring Batchとは

Spring Batchは、エンタープライズシステムの日常的な運用に不可欠なバッチ処理を開発するための包括的なフレームワークです。大量データの処理、トランザクション管理、ジョブの再起動機能、ステップベースの処理など、バッチアプリケーション開発に必要な機能を提供します。Spring Batchは、Spring Frameworkの機能を活用し、依存性注入、宣言的トランザクション管理、バッチ処理に特化したリスナーなどの機能を統合しています。

Spring Batchは、特に以下のような状況で有用です:

+------------------------------------------+
|           Spring Batch Framework         |
|                                          |
|  +-------------+       +-------------+   |
|  |             |       |             |   |
|  |    Job      |------>|    Step     |   |
|  |             |       |             |   |
|  +-------------+       +------+------+   |
|                               |          |
|                               v          |
|         +--------------------+           |
|         |                    |           |
|  +------+------+    +-------+------+    |
|  |             |    |              |    |
|  |  ItemReader |    | ItemProcessor|    |
|  |             |    |              |    |
|  +------+------+    +-------+------+    |
|         |                    |           |
|         |                    v           |
|         |           +----------------+   |
|         |           |                |   |
|         |           |  ItemWriter    |   |
|         |           |                |   |
|         |           +----------------+   |
|         |                                |
|         +--------------------------------+
|                                          |
+------------------------------------------+
        

図1: Spring Batchアーキテクチャの概要 - ジョブ、ステップ、ItemReader/Processor/Writerの関係

Spring Batchの主な目的は以下の通りです:

Spring Batchの主な特徴

ジョブとステップの詳細

Spring Batchでは、バッチ処理は「ジョブ」と呼ばれる単位で構成されます。各ジョブは1つ以上の「ステップ」から構成され、各ステップは特定の処理タスクを実行します。ジョブとステップの階層構造により、バッチ処理を論理的に分割し、再利用性と保守性を向上させることができます。

ジョブは、JobBuilderFactoryを使用して定義します:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job importUserJob(Step step1, Step step2) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(new JobCompletionNotificationListener())
                .flow(step1)
                .next(step2)
                .end()
                .build();
    }
}

ステップは、StepBuilderFactoryを使用して定義します:

@Bean
public Step step1(ItemReader<Person> reader, ItemProcessor<Person, Person> processor, ItemWriter<Person> writer) {
    return stepBuilderFactory.get("step1")
            .<Person, Person>chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
}

チャンク処理の詳細

Spring Batchのチャンク処理は、大量データを効率的に処理するための重要な機能です。チャンク処理では、データを小さな「チャンク」に分割し、各チャンクをトランザクション境界内で処理します。これにより、メモリ効率が向上し、エラー発生時の影響を最小限に抑えることができます。

チャンク処理は、以下の3つのコンポーネントから構成されます:

チャンク処理の流れは以下の通りです:

  1. ItemReaderがチャンクサイズ分のアイテムを読み込む
  2. 各アイテムがItemProcessorによって処理される
  3. 処理されたアイテムがItemWriterによって一括で書き込まれる
  4. トランザクションがコミットされる
  5. 次のチャンクの処理が開始される

トランザクション管理の詳細

Spring Batchは、ステップレベルでのトランザクション管理を提供します。各ステップ(またはチャンク)は、独自のトランザクション境界内で実行され、すべての処理が成功した場合にのみコミットされます。エラーが発生した場合、トランザクションはロールバックされ、ステップは失敗としてマークされます。

トランザクション管理は、@TransactionalアノテーションやTransactionManagerを使用して設定できます:

@Bean
public Step step1(PlatformTransactionManager transactionManager) {
    return stepBuilderFactory.get("step1")
            .<Person, Person>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .transactionManager(transactionManager)
            .build();
}

Spring Batchの主要コンポーネント

Job

バッチ処理の最上位単位。1つ以上のステップから構成される

Step

ジョブ内の独立した処理単位。チャンク指向または tasklet 指向

ItemReader

データソースからアイテムを読み込むためのインターフェース

ItemProcessor

読み込んだアイテムを処理するためのインターフェース

ItemWriter

処理したアイテムを書き込むためのインターフェース

JobRepository

ジョブの実行状態を永続化するためのリポジトリ

JobLauncher

ジョブを実行するためのインターフェース

JobExplorer

ジョブの実行状態を読み取るためのインターフェース

Spring Batchの使用例

基本的なバッチジョブの定義

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job importUserJob(Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(new JobCompletionNotificationListener())
                .flow(step1)
                .end()
                .build();
    }
    
    @Bean
    public Step step1(ItemReader<Person> reader, ItemProcessor<Person, Person> processor, ItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}

CSVファイルからデータを読み込む例

@Bean
public FlatFileItemReader<Person> reader() {
    return new FlatFileItemReaderBuilder<Person>()
            .name("personItemReader")
            .resource(new ClassPathResource("sample-data.csv"))
            .delimited()
            .names(new String[]{"firstName", "lastName"})
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }})
            .build();
}

データ処理の例

@Bean
public ItemProcessor<Person, Person> processor() {
    return person -> {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();
        
        final Person transformedPerson = new Person(firstName, lastName);
        
        return transformedPerson;
    };
}

データベースへの書き込み例

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Person>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
            .dataSource(dataSource)
            .build();
}

Spring Batchのテスト

Spring Batchは、バッチジョブのテストを容易にするための機能を提供します。@SpringBatchTestアノテーションを使用することで、テストに必要なコンテキストを自動的に設定できます。

基本的なジョブテスト

@SpringBatchTest
@SpringBootTest
class ImportUserJobTests {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;
    
    @Test
    void testJob() throws Exception {
        // ジョブの実行
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        
        // 結果の検証
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }
}

ステップのテスト

@SpringBatchTest
@SpringBootTest
class ImportUserStepTests {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;
    
    @Test
    void testStep1() throws Exception {
        // ステップの実行
        JobExecution jobExecution = jobLauncherTestUtils.launchStep("step1");
        
        // 結果の検証
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }
}

Spring Batchの応用例

並列処理

Spring Batchは、並列処理をサポートしており、複数のステップを並列に実行したり、単一のステップ内で並列処理を行ったりすることができます。

@Bean
public Job parallelJob(Step step1, Step step2) {
    return jobBuilderFactory.get("parallelJob")
            .start(splitFlow(step1, step2))
            .end()
            .build();
}

@Bean
public Flow splitFlow(Step step1, Step step2) {
    return new FlowBuilder<SimpleFlow>("splitFlow")
            .split(new SimpleAsyncTaskExecutor())
            .add(flow1(step1), flow2(step2))
            .build();
}

@Bean
public Flow flow1(Step step1) {
    return new FlowBuilder<SimpleFlow>("flow1")
            .start(step1)
            .build();
}

@Bean
public Flow flow2(Step step2) {
    return new FlowBuilder<SimpleFlow>("flow2")
            .start(step2)
            .build();
}

条件分岐

Spring Batchは、ステップの実行結果に基づいて条件分岐を行うことができます。

@Bean
public Job conditionalJob(Step step1, Step successStep, Step failureStep) {
    return jobBuilderFactory.get("conditionalJob")
            .start(step1)
            .on("COMPLETED").to(successStep)
            .from(step1).on("FAILED").to(failureStep)
            .end()
            .build();
}

リスナーの使用

Spring Batchは、ジョブとステップのライフサイクルイベントをフックするためのリスナー機能を提供します。

public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    private final JdbcTemplate jdbcTemplate;
    
    public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time to verify the results");
            
            jdbcTemplate.query("SELECT first_name, last_name FROM people",
                    (rs, row) -> new Person(rs.getString(1), rs.getString(2))
            ).forEach(person -> log.info("Found <" + person + "> in the database."));
        }
    }
}

Spring Batchの設定

基本設定

Spring Batchの基本設定は、@EnableBatchProcessingアノテーションを使用して行います。このアノテーションにより、Spring Batchに必要なインフラストラクチャが自動的に設定されます。

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    // バッチジョブの設定
}

データベース設定

Spring Batchは、ジョブリポジトリとしてデータベースを使用します。デフォルトでは、メモリ内データベースが使用されますが、本番環境では永続的なデータベースを使用することが推奨されます。

# application.properties
spring.datasource.url=jdbc:mysql://localhost:3306/batch
spring.datasource.username=user
spring.datasource.password=password
spring.batch.jdbc.initialize-schema=always

ジョブパラメータの設定

Spring Batchは、ジョブパラメータを使用してジョブの実行を制御することができます。ジョブパラメータは、ジョブの実行時に指定され、ジョブ内で参照できます。

@Bean
public Job parameterizedJob(Step step1) {
    return jobBuilderFactory.get("parameterizedJob")
            .incrementer(new RunIdIncrementer())
            .flow(step1)
            .end()
            .build();
}

@Bean
@StepScope
public FlatFileItemReader<Person> reader(@Value("#{jobParameters['inputFile']}") String inputFile) {
    return new FlatFileItemReaderBuilder<Person>()
            .name("personItemReader")
            .resource(new FileSystemResource(inputFile))
            .delimited()
            .names(new String[]{"firstName", "lastName"})
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }})
            .build();
}

Spring Batchのベストプラクティス

  1. 適切なチャンクサイズの選択: チャンクサイズは、メモリ使用量とパフォーマンスのバランスを考慮して選択します。一般的に、数百から数千のアイテムが適切です。
  2. エラーハンドリングの実装: エラーが発生した場合の処理を適切に実装し、データの整合性を確保します。特に、スキップポリシーやリトライポリシーを設定することが重要です。
  3. ジョブの再起動機能の活用: ジョブの再起動機能を活用し、失敗したジョブを最後に成功したポイントから再開できるようにします。
  4. 適切なトランザクション境界の設定: トランザクション境界を適切に設定し、データの整合性を確保します。特に、長時間実行されるジョブでは、トランザクションのタイムアウトに注意が必要です。
  5. パフォーマンスの最適化: 並列処理、マルチスレッド処理、分散処理などの機能を活用して、パフォーマンスを最適化します。
  6. ジョブの監視と管理: ジョブの実行状態を監視し、問題が発生した場合に迅速に対応できるようにします。Spring Batch Adminなどのツールを活用することが有効です。
  7. テストの自動化: バッチジョブのテストを自動化し、継続的インテグレーションパイプラインに組み込みます。これにより、バグの早期発見と品質の向上が可能になります。
  8. ドキュメントの整備: バッチジョブの目的、入力、出力、実行条件などを明確にドキュメント化し、運用担当者や他の開発者が理解しやすくします。
  9. 段階的な導入: 既存のバッチ処理をSpring Batchに移行する場合は、段階的に導入します。まず、単純なジョブから始め、徐々に複雑なジョブに移行していきます。

アンチパターン

Spring Batchと他のフレームワークの比較

特徴 Spring Batch Apache Camel Quartz
主な用途 バッチ処理 メッセージングとインテグレーション ジョブスケジューリング
データ処理 チャンク指向 メッセージ指向 なし
スケジューリング 外部スケジューラが必要 基本的なスケジューリング機能 高度なスケジューリング機能
トランザクション管理 強力 基本的 なし
エラーハンドリング 強力 基本的 基本的

Spring Batchの導入ステップ

  1. Spring Bootプロジェクトに依存関係を追加
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
  2. バッチ処理を有効化するために@EnableBatchProcessingアノテーションを追加
  3. ジョブリポジトリのためのデータベースを設定
  4. ジョブとステップを定義
  5. ItemReader、ItemProcessor、ItemWriterを実装
  6. エラーハンドリングとリカバリ機能を設定
  7. ジョブの実行と監視の仕組みを整備

Spring Bootとの統合

Spring Batchは、Spring Bootと統合することで、より簡単に使用できます。Spring Boot Starter Batchを使用することで、必要な依存関係が自動的に追加され、基本的な設定が自動的に行われます。

Spring Boot Starterの使用

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

Spring Bootの設定

# application.properties
spring.batch.job.enabled=true
spring.batch.jdbc.initialize-schema=always

Spring Bootアプリケーションでのジョブの実行

@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

まとめ

Spring Batchは、エンタープライズシステムのバッチ処理を開発するための包括的なフレームワークです。大量データの処理、トランザクション管理、ジョブの再起動機能、ステップベースの処理など、バッチアプリケーション開発に必要な機能を提供します。Spring Batchは、Spring Frameworkの機能を活用し、依存性注入、宣言的トランザクション管理、バッチ処理に特化したリスナーなどの機能を統合しています。

Spring Batchの主な利点は以下の通りです:

Spring Batchは、大量データの処理、定期的なジョブの実行、複雑なビジネスルールに基づくデータ変換など、様々なバッチ処理のニーズに対応できる強力なフレームワークです。特に、エンタープライズシステムでの使用に適しており、堅牢で信頼性の高いバッチ処理を実現することができます。