Spring Batch 완전 가이드 — Chunk vs Tasklet
1. 비유 — 공장 생산 라인
배치(Batch) 처리는 공장 생산 라인과 같습니다. 수백만 개의 제품을 하나씩 수작업으로 처리하면 너무 느립니다. 공장에서는 컨베이어 벨트(Chunk)로 일정량씩 묶어 처리합니다. 재료를 가져오는 사람(ItemReader), 가공하는 사람(ItemProcessor), 포장해서 내보내는 사람(ItemWriter)이 분업합니다. 만약 중간에 기계가 멈춰도(실패), 처리된 부분부터 재시작할 수 있습니다.
2. Spring Batch 도메인 구조
graph TD
A[JobLauncher] -->|"실행"| B[Job]
B -->|"포함"| C[Step 1]
B -->|"포함"| D[Step 2]
B -->|"포함"| E[Step 3]
C --> F["Tasklet 방식"]
D --> G["Chunk 방식"]
G --> H[ItemReader]
G --> I[ItemProcessor]
G --> J[ItemWriter]
K[JobRepository] -->|"메타데이터 저장"| B
K -->|"실행 이력 관리"| C
L[JobParameters] -->|"파라미터 전달"| B
2.1 메타 테이블 구조
Spring Batch는 실행 이력을 DB에 저장합니다.
erDiagram
BATCH_JOB_INSTANCE ||--o{ BATCH_JOB_EXECUTION : "has"
BATCH_JOB_EXECUTION ||--o{ BATCH_JOB_EXECUTION_PARAMS : "has"
BATCH_JOB_EXECUTION ||--o{ BATCH_STEP_EXECUTION : "has"
BATCH_JOB_EXECUTION ||--|| BATCH_JOB_EXECUTION_CONTEXT : "has"
BATCH_STEP_EXECUTION ||--|| BATCH_STEP_EXECUTION_CONTEXT : "has"
BATCH_JOB_INSTANCE {
BIGINT JOB_INSTANCE_ID PK
VARCHAR JOB_NAME
VARCHAR JOB_KEY
}
BATCH_JOB_EXECUTION {
BIGINT JOB_EXECUTION_ID PK
BIGINT JOB_INSTANCE_ID FK
VARCHAR STATUS
VARCHAR EXIT_CODE
DATETIME START_TIME
DATETIME END_TIME
}
BATCH_STEP_EXECUTION {
BIGINT STEP_EXECUTION_ID PK
BIGINT JOB_EXECUTION_ID FK
VARCHAR STEP_NAME
BIGINT READ_COUNT
BIGINT WRITE_COUNT
BIGINT SKIP_COUNT
VARCHAR STATUS
}
3. Job 설정
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Job importUserJob(JobRepository jobRepository,
Step step1, Step step2,
JobCompletionNotificationListener listener) {
return new JobBuilder("importUserJob", jobRepository)
.incrementer(new RunIdIncrementer()) // 매번 새 JobInstance 생성
.listener(listener)
.start(step1)
.next(step2)
.build();
}
}
3.1 Job 흐름 제어
@Bean
public Job conditionalJob(JobRepository jobRepository) {
return new JobBuilder("conditionalJob", jobRepository)
.start(step1())
.on("COMPLETED").to(step2()) // 성공 시
.on("FAILED").to(failureStep()) // 실패 시
.from(step2())
.on("*").to(step3()) // 항상
.from(failureStep())
.on("*").end() // 종료
.end()
.build();
}
flowchart TD
A[Step 1] -->|COMPLETED| B[Step 2]
A -->|FAILED| C[Failure Step]
B --> D[Step 3]
C --> E["Job 종료"]
D --> F["Job 완료"]
4. Chunk 기반 처리
4.1 Chunk 처리 흐름
sequenceDiagram
participant S as Step
participant R as ItemReader
participant P as ItemProcessor
participant W as ItemWriter
participant TX as Transaction
S->>TX: 트랜잭션 시작
loop chunk-size 만큼 반복
S->>R: read() 호출
R-->>S: 아이템 1개
S->>P: process(item)
P-->>S: 처리된 아이템
end
S->>W: write(chunk) 한 번에 전달
W-->>S: 완료
S->>TX: 트랜잭션 커밋
Note over S: null 반환될 때까지 반복
Note over S: 다음 chunk 시작
4.2 기본 Chunk Step 구성
@Bean
public Step chunkStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("chunkStep", jobRepository)
.<User, ProcessedUser>chunk(100, transactionManager) // chunk size: 100
.reader(userItemReader())
.processor(userItemProcessor())
.writer(userItemWriter())
.faultTolerant()
.skipLimit(10)
.skip(ValidationException.class)
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
5. ItemReader 구현
5.1 JdbcCursorItemReader (대용량 DB 읽기)
@Bean
public JdbcCursorItemReader<User> jdbcCursorItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<User>()
.name("userItemReader")
.dataSource(dataSource)
.sql("SELECT id, name, email, status FROM users WHERE status = 'ACTIVE'")
.rowMapper(new BeanPropertyRowMapper<>(User.class))
.fetchSize(100) // DB에서 한 번에 가져오는 크기
.build();
}
5.2 JdbcPagingItemReader (페이지 단위 읽기)
@Bean
public JdbcPagingItemReader<User> jdbcPagingItemReader(DataSource dataSource) {
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
return new JdbcPagingItemReaderBuilder<User>()
.name("pagingUserReader")
.dataSource(dataSource)
.selectClause("SELECT id, name, email")
.fromClause("FROM users")
.whereClause("WHERE status = 'ACTIVE'")
.sortKeys(sortKeys)
.pageSize(100)
.rowMapper(new BeanPropertyRowMapper<>(User.class))
.build();
}
5.3 FlatFileItemReader (CSV 읽기)
@Bean
public FlatFileItemReader<UserCsvDto> csvItemReader() {
return new FlatFileItemReaderBuilder<UserCsvDto>()
.name("csvUserReader")
.resource(new ClassPathResource("users.csv"))
.delimited()
.delimiter(",")
.names("id", "name", "email", "age")
.targetType(UserCsvDto.class)
.linesToSkip(1) // 헤더 행 스킵
.build();
}
5.4 커스텀 ItemReader
@Component
@StepScope
public class ApiItemReader implements ItemReader<ExternalData> {
private final ExternalApiClient apiClient;
private final List<ExternalData> buffer = new ArrayList<>();
private int nextIndex = 0;
private int page = 0;
private static final int PAGE_SIZE = 100;
@Override
public ExternalData read() throws Exception {
if (nextIndex >= buffer.size()) {
fetchNextPage();
if (buffer.isEmpty()) {
return null; // 데이터 끝
}
}
return buffer.get(nextIndex++);
}
private void fetchNextPage() {
buffer.clear();
nextIndex = 0;
List<ExternalData> data = apiClient.fetchPage(page++, PAGE_SIZE);
buffer.addAll(data);
}
}
6. ItemProcessor 구현
@Component
@StepScope
public class UserItemProcessor implements ItemProcessor<User, ProcessedUser> {
private final EmailValidator emailValidator;
@Override
public ProcessedUser process(User user) throws Exception {
// null 반환 시 해당 아이템은 건너뜀 (skip)
if (!emailValidator.isValid(user.getEmail())) {
log.warn("유효하지 않은 이메일, 건너뜀: {}", user.getEmail());
return null;
}
if (user.getAge() < 18) {
return null; // 미성년자 제외
}
// 데이터 변환
return ProcessedUser.builder()
.userId(user.getId())
.fullName(user.getFirstName() + " " + user.getLastName())
.email(user.getEmail().toLowerCase())
.processedAt(LocalDateTime.now())
.build();
}
}
6.1 CompositeItemProcessor — 여러 Processor 체인
@Bean
public CompositeItemProcessor<User, FinalUser> compositeProcessor() {
CompositeItemProcessor<User, FinalUser> processor = new CompositeItemProcessor<>();
processor.setDelegates(List.of(
new ValidationProcessor(),
new TransformProcessor(),
new EnrichmentProcessor()
));
return processor;
}
7. ItemWriter 구현
7.1 JdbcBatchItemWriter
@Bean
public JdbcBatchItemWriter<ProcessedUser> jdbcBatchItemWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<ProcessedUser>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO processed_users (user_id, full_name, email, processed_at) " +
"VALUES (:userId, :fullName, :email, :processedAt) " +
"ON DUPLICATE KEY UPDATE full_name = :fullName, email = :email")
.dataSource(dataSource)
.build();
}
7.2 FlatFileItemWriter (CSV 출력)
@Bean
@StepScope
public FlatFileItemWriter<ProcessedUser> csvItemWriter(
@Value("#{jobParameters['outputFile']}") String outputFile) {
BeanWrapperFieldExtractor<ProcessedUser> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[]{"userId", "fullName", "email"});
DelimitedLineAggregator<ProcessedUser> aggregator = new DelimitedLineAggregator<>();
aggregator.setDelimiter(",");
aggregator.setFieldExtractor(extractor);
return new FlatFileItemWriterBuilder<ProcessedUser>()
.name("csvUserWriter")
.resource(new FileSystemResource(outputFile))
.lineAggregator(aggregator)
.headerCallback(writer -> writer.write("userId,fullName,email"))
.append(false)
.build();
}
7.3 CompositeItemWriter
@Bean
public CompositeItemWriter<ProcessedUser> compositeWriter() {
CompositeItemWriter<ProcessedUser> writer = new CompositeItemWriter<>();
writer.setDelegates(List.of(
jdbcBatchItemWriter(), // DB 저장
csvItemWriter(), // CSV 파일 출력
kafkaItemWriter() // Kafka 발행
));
return writer;
}
8. Tasklet 방식
단순하거나 단일 작업에 적합합니다.
// 파일 삭제 Tasklet
@Component
public class FileCleanupTasklet implements Tasklet {
@Value("${batch.temp-dir}")
private String tempDir;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
File directory = new File(tempDir);
if (directory.exists()) {
FileUtils.cleanDirectory(directory);
log.info("임시 디렉토리 정리 완료: {}", tempDir);
}
return RepeatStatus.FINISHED; // CONTINUABLE 반환 시 반복 실행
}
}
// Step에 적용
@Bean
public Step cleanupStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("cleanupStep", jobRepository)
.tasklet(fileCleanupTasklet(), transactionManager)
.build();
}
8.1 Chunk vs Tasklet 비교
| 항목 | Chunk | Tasklet |
|---|---|---|
| 처리 단위 | N개씩 묶어 처리 | 전체를 한 번에 |
| 트랜잭션 | Chunk 단위 | Step 전체 |
| 재시작 | Chunk 단위 재시작 가능 | 처음부터 재시작 |
| 적합한 경우 | 대용량 데이터 처리 | DB 테이블 초기화, 파일 조작, API 단순 호출 |
| 메모리 | 효율적 | 단순 |
9. JobParameters와 @StepScope
// JobParameters 전달
JobParameters params = new JobParametersBuilder()
.addString("targetDate", "2026-05-02")
.addLong("batchSize", 500L)
.addString("outputPath", "/data/output/")
.toJobParameters();
jobLauncher.run(importJob, params);
// @StepScope: Step 실행 시점에 빈 생성 (JobParameters 접근 가능)
@Bean
@StepScope
public JdbcCursorItemReader<Order> orderReader(
DataSource dataSource,
@Value("#{jobParameters['targetDate']}") String targetDate) {
return new JdbcCursorItemReaderBuilder<Order>()
.name("orderReader")
.dataSource(dataSource)
.sql("SELECT * FROM orders WHERE order_date = ?")
.preparedStatementSetter(ps -> ps.setString(1, targetDate))
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.build();
}
10. 파티셔닝 — 병렬 처리
10.1 파티셔닝 구조
graph TD
A[Master Step] -->|"파티션 생성"| B[Partitioner]
B --> C["Worker Step 1: ID 1~100000"]
B --> D["Worker Step 2: ID 100001~200000"]
B --> E["Worker Step 3: ID 200001~300000"]
B --> F["Worker Step N: ..."]
C --> G["각자 독립적으로 처리"]
D --> G
E --> G
F --> G
@Bean
public Step masterStep(JobRepository jobRepository,
TaskExecutor taskExecutor) {
return new StepBuilder("masterStep", jobRepository)
.partitioner("workerStep", rangePartitioner())
.step(workerStep())
.gridSize(4) // 파티션 수
.taskExecutor(taskExecutor)
.build();
}
@Bean
public Partitioner rangePartitioner() {
return gridSize -> {
Map<String, ExecutionContext> partitions = new HashMap<>();
long totalCount = userRepository.count();
long partitionSize = totalCount / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putLong("minId", i * partitionSize + 1);
context.putLong("maxId", (i == gridSize - 1) ? totalCount : (i + 1) * partitionSize);
partitions.put("partition" + i, context);
}
return partitions;
};
}
@Bean
@StepScope
public JdbcCursorItemReader<User> partitionedReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcCursorItemReaderBuilder<User>()
.name("partitionedUserReader")
.dataSource(dataSource)
.sql("SELECT * FROM users WHERE id BETWEEN ? AND ?")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new BeanPropertyRowMapper<>(User.class))
.build();
}
10.2 멀티스레드 Step
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("batch-");
executor.initialize();
return executor;
}
@Bean
public Step multiThreadStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("multiThreadStep", jobRepository)
.<User, ProcessedUser>chunk(100, transactionManager)
.reader(synchronizedReader()) // 스레드 안전한 Reader 필요!
.processor(userProcessor())
.writer(userWriter())
.taskExecutor(batchTaskExecutor())
.throttleLimit(4)
.build();
}
// JdbcPagingItemReader는 Thread-safe, CursorItemReader는 동기화 필요
@Bean
public SynchronizedItemStreamReader<User> synchronizedReader() {
return new SynchronizedItemStreamReaderBuilder<User>()
.delegate(jdbcCursorItemReader())
.build();
}
11. 재시작과 재시도
11.1 재시작 (Restart)
// 실패한 Job 재시작 — JobRepository가 이전 상태를 기억
jobLauncher.run(job, lastFailedJobParams); // 실패한 Step부터 재시작
// 특정 Step을 항상 처음부터 실행하도록 설정
@Bean
public Step alwaysRestartStep(JobRepository jobRepository) {
return new StepBuilder("alwaysRestartStep", jobRepository)
.<User, ProcessedUser>chunk(100, transactionManager)
.allowStartIfComplete(true) // 완료돼도 재실행 허용
.startLimit(3) // 최대 3번까지만 시작 허용
.reader(reader())
.writer(writer())
.build();
}
11.2 Skip (건너뛰기)
@Bean
public Step skipableStep(JobRepository jobRepository) {
return new StepBuilder("skipableStep", jobRepository)
.<User, ProcessedUser>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.skipLimit(5) // 최대 5건까지 skip 허용
.skip(ValidationException.class)
.skip(ParseException.class)
.noSkip(DatabaseException.class) // 이 예외는 skip 금지
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.listener(new SkipListener<User, ProcessedUser>() {
@Override
public void onSkipInRead(Throwable t) {
log.warn("읽기 중 skip: {}", t.getMessage());
}
@Override
public void onSkipInProcess(User item, Throwable t) {
log.warn("처리 중 skip, item={}: {}", item.getId(), t.getMessage());
}
@Override
public void onSkipInWrite(ProcessedUser item, Throwable t) {
log.warn("쓰기 중 skip, item={}: {}", item.getUserId(), t.getMessage());
}
})
.build();
}
12. 스케줄링 연동
12.1 Spring Scheduler + Batch
@Component
@EnableScheduling
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job dailyReportJob;
// 매일 새벽 2시 실행
@Scheduled(cron = "0 0 2 * * ?")
public void runDailyReport() {
try {
JobParameters params = new JobParametersBuilder()
.addString("date", LocalDate.now().toString())
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(dailyReportJob, params);
log.info("배치 실행 상태: {}", execution.getStatus());
} catch (Exception e) {
log.error("배치 실행 실패", e);
}
}
}
12.2 JobLauncher 비동기 설정
@Bean
public JobLauncher asyncJobLauncher(JobRepository jobRepository) throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // 비동기 실행
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
극한 시나리오
@Configuration
public class LargeScaleBatchConfig {
// 1. JPA 대신 JDBC 사용 (성능)
// 2. Cursor 대신 Paging (메모리)
// 3. 파티셔닝 (병렬)
// 4. 청크 사이즈 튜닝
@Bean
public Job processHundredMillionRecords(JobRepository jobRepository) {
return new JobBuilder("hundredMillionJob", jobRepository)
.start(masterStep())
.build();
}
@Bean
public Step masterStep() {
return new StepBuilder("masterStep", jobRepository)
.partitioner("workerStep", columnRangePartitioner())
.step(workerStep())
.gridSize(10) // 10개 파티션
.taskExecutor(partitionTaskExecutor())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<RawData> largeReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JdbcPagingItemReaderBuilder<RawData>()
.name("largeReader")
.dataSource(dataSource)
.selectClause("SELECT id, data")
.fromClause("FROM raw_data")
.whereClause("WHERE id BETWEEN " + minId + " AND " + maxId)
.sortKeys(Map.of("id", Order.ASCENDING))
.pageSize(1000) // 청크 사이즈와 일치 권장
.rowMapper(new BeanPropertyRowMapper<>(RawData.class))
.build();
}
@Bean
public Step workerStep() {
return new StepBuilder("workerStep", jobRepository)
.<RawData, ProcessedData>chunk(1000, transactionManager) // 1000건씩
.reader(largeReader(null, null))
.processor(rawDataProcessor())
.writer(processedDataWriter())
.faultTolerant()
.skipLimit(1000)
.skip(Exception.class)
.build();
}
}
예상 처리 시간 (10개 파티션, 청크 1000):
- 단순 처리: 1억 / 10 / 1000 = 10,000회 쓰기 작업
- DB 쓰기 지연 10ms 기준: 100초 수준
14. 전체 흐름 정리
flowchart TD
A[JobLauncher.run] --> B["JobRepository에 JobExecution 생성"]
B --> C["Job 실행"]
C --> D["Step 1 시작"]
D --> E{"Chunk 기반?"}
E -->|Yes| F[ItemReader.read × chunkSize]
F --> G[ItemProcessor.process × chunkSize]
G --> H[ItemWriter.write chunk]
H --> I["트랜잭션 커밋"]
I --> J{"더 읽을 데이터?"}
J -->|Yes| F
J -->|No| K["Step 완료"]
E -->|No| L[Tasklet.execute]
L --> K
K --> M{"다음 Step?"}
M -->|Yes| D
M -->|No| N["Job 완료"]
N --> O["JobRepository에 상태 저장"]
15. 요약
| 개념 | 설명 | 핵심 포인트 |
|---|---|---|
| Job | 배치 작업 단위 | 여러 Step으로 구성 |
| Step | Job의 처리 단계 | Chunk or Tasklet |
| Chunk | N건씩 묶어 처리 | Read → Process → Write |
| ItemReader | 데이터 읽기 | null 반환 시 종료 |
| ItemProcessor | 데이터 변환/필터 | null 반환 시 해당 건 skip |
| ItemWriter | 데이터 저장 | 리스트 단위로 받음 |
| Tasklet | 단순 작업 | FINISHED or CONTINUABLE |
| @StepScope | Step 실행 시 빈 생성 | JobParameters 주입 가능 |
| 파티셔닝 | 데이터를 나눠 병렬 처리 | 대용량 필수 기법 |
| JobRepository | 배치 메타데이터 관리 | 재시작 핵심 |
댓글