🌟 Spring Batch终极指南:原理、实战与性能优化 单机日处理10亿数据?
揭秘企业级批处理架构的核心引擎!
一、Spring Batch 究竟是什么?
Spring batch是用于创建批处理应用程序(执行一系列作业)的开源轻量级平台。
1.1 批处理的定义与挑战
批处理(Batch Processing)
:
对
大量数据
进行
无需人工干预
的自动化处理,通常具有以下特征:
大数据量(GB/TB级)
长时间运行(分钟/小时级)
无需用户交互
定时/周期执行
传统批处理痛点
:
传统方案缺乏容错机制无状态管理数据源
1.2 Spring Batch 核心价值
Spring Batch 是
Spring 生态系统
中的批处理框架,提供:
✅
健壮的容错机制
(跳过/重试/重启)
✅
事务管理
(Chunk级别事务)
✅
元数据跟踪
(执行状态持久化)
✅
可扩展架构
(并行/分区处理)
✅
丰富的读写器
(文件/DB/消息队列)
💡
行业地位
:金融领域对账、电信话单处理、电商订单结算等场景事实标准
二、核心架构深度解析
2.1 架构组成图解
11..*Job+String name+List
Job批处理作业的顶级容器整个批处理过程Step作业的独立执行单元Job内部阶段ItemReader数据读取接口(文件/DB/JMS)每个Chunk开始ItemProcessor业务处理逻辑读取后,写入前ItemWriter数据写出接口Chunk结束时JobRepository存储执行元数据(状态/参数/异常)整个执行过程
三、实战:银行交易对账系统 3.1 场景需求 每日处理100万+交易记录 比对银行系统与内部系统的差异 生成差异报告并告警 3.2 系统架构 Spring BatchProcessorReaderWriter银行交易文件Spring Batch内部系统数据库差异报告告警系统 3.3 代码实现 步骤1:配置批处理作业
@Configuration
@EnableBatchProcessing
public class ReconciliationJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
// 定义Job
@Bean
public Job bankReconciliationJob(Step reconciliationStep) {
return jobBuilderFactory.get("bankReconciliationJob")
.incrementer(new DailyJobIncrementer()) // 每日参数
.start(reconciliationStep)
.listener(new JobCompletionListener())
.build();
}
}
步骤2:配置Step与读写器
@Bean
public Step reconciliationStep(
ItemReader reader,
ItemProcessor processor,
ItemWriter writer) {
return stepBuilderFactory.get("reconciliationStep")
.chunk(1000) // 每1000条提交
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(100) // 最多跳过100条错误
.skip(DataValidationException.class)
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
// 文件读取器(CSV格式)
@Bean
@StepScope
public FlatFileItemReader reader(
@Value("#{jobParameters['inputFile']}") Resource resource) {
return new FlatFileItemReaderBuilder()
.name("transactionReader")
.resource(resource)
.delimited()
.names("id", "amount", "date", "account")
.fieldSetMapper(new BeanWrapperFieldSetMapper() {{
setTargetType(Transaction.class);
}})
.build();
}
// 数据库比对处理器
@Bean
public ItemProcessor processor(
JdbcTemplate jdbcTemplate) {
return transaction -> {
// 查询内部系统记录
String sql = "SELECT amount FROM internal_trans WHERE id = ?";
BigDecimal internalAmount = jdbcTemplate.queryForObject(
sql, BigDecimal.class, transaction.getId());
// 比对金额差异
if (internalAmount.compareTo(transaction.getAmount()) != 0) {
return new ReconciliationResult(transaction,
"AMOUNT_MISMATCH",
transaction.getAmount() + " vs " + internalAmount);
}
return null; // 无差异不写入
};
}
// 差异报告写入器
@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder()
.itemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO recon_results (trans_id, error_type, detail) " +
"VALUES (:transaction.id, :errorType, :detail)")
.dataSource(dataSource)
.build();
}
步骤3:启动作业
// 命令行启动(带日期参数)
@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job bankReconciliationJob;
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("inputFile", "classpath:data/trans-20230520.csv")
.addDate("runDate", new Date())
.toJobParameters();
jobLauncher.run(bankReconciliationJob, params);
}
}
四、高级特性实战 4.1 并行处理(分区10万+记录)
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner("slaveStep", columnRangePartitioner())
.step(slaveStep())
.gridSize(8) // 8个并行线程
.taskExecutor(new ThreadPoolTaskExecutor())
.build();
}
@Bean
public Partitioner columnRangePartitioner() {
ColumnRangePartitioner partitioner = new ColumnRangePartitioner();
partitioner.setColumn("id");
partitioner.setTable("transactions");
partitioner.setDataSource(dataSource);
return partitioner;
}
4.2 断点续跑(从失败处恢复)
# 重启上次失败的执行
java -jar recon.jar \
--job.name=bankReconciliationJob \
--run.id=1672531200 \
restart=true
4.3 邮件告警监听器
public class AlertListener implements StepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus() == BatchStatus.FAILED) {
sendAlertEmail("批处理作业失败: " +
stepExecution.getFailureExceptions());
}
return ExitStatus.COMPLETED;
}
private void sendAlertEmail(String message) {
// 实现邮件发送逻辑
}
}
五、性能优化黄金法则 5.1 读写性能优化矩阵 优化点效果实现方式 合理设置Chunk Size减少事务提交次数通过压测找到最佳值(通常500-5000)使用游标读取避免OOMJdbcCursorItemReader分区处理水平扩展Partitioner接口实现异步ItemProcessor提升处理速度AsyncItemProcessor包装批量写入优化减少数据库往返JdbcBatchItemWriter 5.2 内存优化配置
# application.properties
spring.batch.job.enabled=true
spring.batch.initialize-schema=always
# 事务优化
spring.transaction.timeout=3600 # 1小时事务超时
spring.datasource.hikari.maximum-pool-size=20
# JVM参数(10GB数据场景)
-Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200
六、常见生产问题解决方案 问题1:作业重复执行 解决方案 :
// 自定义JobParametersIncrementer
public class DailyJobIncrementer implements JobParametersIncrementer {
@Override
public JobParameters getNext(JobParameters parameters) {
return new JobParametersBuilder(parameters)
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
}
}
问题2:大数据量内存溢出 解决方案 :
@Bean
public JdbcCursorItemReader reader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder()
.name("transactionReader")
.dataSource(dataSource)
.sql("SELECT * FROM transactions WHERE date = ?")
.rowMapper(new BeanPropertyRowMapper<>(Transaction.class))
.preparedStatementSetter((ps, ctx) ->
ps.setDate(1, new java.sql.Date(ctx.getJobParameter("runDate"))))
.fetchSize(5000) // 优化游标大小
.build();
}
问题3:作业监控缺失 解决方案 :集成Prometheus监控
@Bean
public MeterRegistryCustomizer metrics() {
return registry -> {
registry.config().commonTags("application", "batch-service");
new BatchMetrics().bindTo(registry);
};
}
七、最佳实践总结 事务边界 :Chunk Size = 事务粒度 幂等设计 :Writer需支持重复写入 资源隔离 :每个Job独立数据源 监控告警 :Prometheus + Grafana 看板 版本控制 :Liquibase管理数据库变更 需求分析设计Job/Step选择读写器实现处理逻辑单元测试性能压测部署监控
