跳转到主内容
趣航编程网 - 趣学编程,启航技术之路!

Spring Batch终极指南:原理、实战与性能优化

🌟 Spring Batch终极指南:原理、实战与性能优化 单机日处理10亿数据?

揭秘企业级批处理架构的核心引擎!

一、Spring Batch 究竟是什么?

Spring batch是用于创建批处理应用程序(执行一系列作业)的开源轻量级平台。

1.1 批处理的定义与挑战 批处理(Batch Processing) : 对 大量数据 进行 无需人工干预 的自动化处理,通常具有以下特征: 大数据量(GB/TB级) 长时间运行(分钟/小时级) 无需用户交互 定时/周期执行 传统批处理痛点 : 传统方案缺乏容错机制无状态管理数据源 1.2 Spring Batch 核心价值 Spring Batch 是 Spring 生态系统 中的批处理框架,提供: ✅ 健壮的容错机制 (跳过/重试/重启) ✅ 事务管理 (Chunk级别事务) ✅ 元数据跟踪 (执行状态持久化) ✅ 可扩展架构 (并行/分区处理) ✅ 丰富的读写器 (文件/DB/消息队列) 💡 行业地位 :金融领域对账、电信话单处理、电商订单结算等场景事实标准 二、核心架构深度解析 2.1 架构组成图解 11..*Job+String name+List steps+start(Step)+next(Step)+decision(JobExecutionDecider)Step+ItemReader reader+ItemProcessor processor+ItemWriter writer+Tasklet tasklet+ChunkOrientedTaskletJobRepository+save(JobExecution)+getLastJobExecution(String jobName, JobParameters)JobLauncher+run(Job, JobParameters) 2.2 关键组件职责 组件职责生命周期

Job批处理作业的顶级容器整个批处理过程Step作业的独立执行单元Job内部阶段ItemReader数据读取接口(文件/DB/JMS)每个Chunk开始ItemProcessor业务处理逻辑读取后,写入前ItemWriter数据写出接口Chunk结束时JobRepository存储执行元数据(状态/参数/异常)整个执行过程

三、实战:银行交易对账系统 3.1 场景需求 每日处理100万+交易记录 比对银行系统与内部系统的差异 生成差异报告并告警 3.2 系统架构 Spring BatchProcessorReaderWriter银行交易文件Spring Batch内部系统数据库差异报告告警系统 3.3 代码实现 步骤1:配置批处理作业

@Configuration

@EnableBatchProcessing public class ReconciliationJobConfig {

@Autowired private JobBuilderFactory jobBuilderFactory;

@Autowired private StepBuilderFactory stepBuilderFactory;

// 定义Job @Bean public Job bankReconciliationJob(Step reconciliationStep) { return jobBuilderFactory.get("bankReconciliationJob") .incrementer(new DailyJobIncrementer()) // 每日参数 .start(reconciliationStep) .listener(new JobCompletionListener()) .build(); } }

步骤2:配置Step与读写器

@Bean

public Step reconciliationStep( ItemReader reader, ItemProcessor processor, ItemWriter writer) {

return stepBuilderFactory.get("reconciliationStep") .chunk(1000) // 每1000条提交 .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .skipLimit(100) // 最多跳过100条错误 .skip(DataValidationException.class) .retryLimit(3) .retry(DeadlockLoserDataAccessException.class) .build(); }

// 文件读取器(CSV格式) @Bean @StepScope public FlatFileItemReader reader( @Value("#{jobParameters['inputFile']}") Resource resource) {

return new FlatFileItemReaderBuilder() .name("transactionReader") .resource(resource) .delimited() .names("id", "amount", "date", "account") .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(Transaction.class); }}) .build(); }

// 数据库比对处理器 @Bean public ItemProcessor processor( JdbcTemplate jdbcTemplate) {

return transaction -> { // 查询内部系统记录 String sql = "SELECT amount FROM internal_trans WHERE id = ?"; BigDecimal internalAmount = jdbcTemplate.queryForObject( sql, BigDecimal.class, transaction.getId());

// 比对金额差异 if (internalAmount.compareTo(transaction.getAmount()) != 0) { return new ReconciliationResult(transaction, "AMOUNT_MISMATCH", transaction.getAmount() + " vs " + internalAmount); } return null; // 无差异不写入 }; }

// 差异报告写入器 @Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO recon_results (trans_id, error_type, detail) " + "VALUES (:transaction.id, :errorType, :detail)") .dataSource(dataSource) .build(); }

步骤3:启动作业

// 命令行启动(带日期参数)

@SpringBootApplication public class BatchApplication implements CommandLineRunner {

@Autowired private JobLauncher jobLauncher;

@Autowired private Job bankReconciliationJob;

public static void main(String[] args) { SpringApplication.run(BatchApplication.class, args); }

@Override public void run(String... args) throws Exception { JobParameters params = new JobParametersBuilder() .addString("inputFile", "classpath:data/trans-20230520.csv") .addDate("runDate", new Date()) .toJobParameters();

jobLauncher.run(bankReconciliationJob, params); } }

四、高级特性实战 4.1 并行处理(分区10万+记录)

@Bean

public Step masterStep() { return stepBuilderFactory.get("masterStep") .partitioner("slaveStep", columnRangePartitioner()) .step(slaveStep()) .gridSize(8) // 8个并行线程 .taskExecutor(new ThreadPoolTaskExecutor()) .build(); }

@Bean public Partitioner columnRangePartitioner() { ColumnRangePartitioner partitioner = new ColumnRangePartitioner(); partitioner.setColumn("id"); partitioner.setTable("transactions"); partitioner.setDataSource(dataSource); return partitioner; }

4.2 断点续跑(从失败处恢复)

# 重启上次失败的执行

java -jar recon.jar \ --job.name=bankReconciliationJob \ --run.id=1672531200 \ restart=true

4.3 邮件告警监听器

public class AlertListener implements StepExecutionListener {

@Override public ExitStatus afterStep(StepExecution stepExecution) { if (stepExecution.getStatus() == BatchStatus.FAILED) { sendAlertEmail("批处理作业失败: " + stepExecution.getFailureExceptions()); } return ExitStatus.COMPLETED; }

private void sendAlertEmail(String message) { // 实现邮件发送逻辑 } }

五、性能优化黄金法则 5.1 读写性能优化矩阵 优化点效果实现方式 合理设置Chunk Size减少事务提交次数通过压测找到最佳值(通常500-5000)使用游标读取避免OOMJdbcCursorItemReader分区处理水平扩展Partitioner接口实现异步ItemProcessor提升处理速度AsyncItemProcessor包装批量写入优化减少数据库往返JdbcBatchItemWriter 5.2 内存优化配置

# application.properties

spring.batch.job.enabled=true spring.batch.initialize-schema=always

# 事务优化 spring.transaction.timeout=3600 # 1小时事务超时 spring.datasource.hikari.maximum-pool-size=20

# JVM参数(10GB数据场景) -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200

六、常见生产问题解决方案 问题1:作业重复执行 解决方案 :

// 自定义JobParametersIncrementer

public class DailyJobIncrementer implements JobParametersIncrementer { @Override public JobParameters getNext(JobParameters parameters) { return new JobParametersBuilder(parameters) .addLong("run.id", System.currentTimeMillis()) .toJobParameters(); } }

问题2:大数据量内存溢出 解决方案 :

@Bean

public JdbcCursorItemReader reader(DataSource dataSource) { return new JdbcCursorItemReaderBuilder() .name("transactionReader") .dataSource(dataSource) .sql("SELECT * FROM transactions WHERE date = ?") .rowMapper(new BeanPropertyRowMapper<>(Transaction.class)) .preparedStatementSetter((ps, ctx) -> ps.setDate(1, new java.sql.Date(ctx.getJobParameter("runDate")))) .fetchSize(5000) // 优化游标大小 .build(); }

问题3:作业监控缺失 解决方案 :集成Prometheus监控

@Bean

public MeterRegistryCustomizer metrics() { return registry -> { registry.config().commonTags("application", "batch-service"); new BatchMetrics().bindTo(registry); }; }

七、最佳实践总结 事务边界 :Chunk Size = 事务粒度 幂等设计 :Writer需支持重复写入 资源隔离 :每个Job独立数据源 监控告警 :Prometheus + Grafana 看板 版本控制 :Liquibase管理数据库变更 需求分析设计Job/Step选择读写器实现处理逻辑单元测试性能压测部署监控

相关文章