jdbc로 다중 데이터베이스를 설정하여 JPA에서 사용하는 방법을 소개합니다.
spring boot 3.0부터는 @EnableBatchProcessing
을 권장하지 않습니다. 이걸 쓰면 자동으로 등록해주던 빈들을 수동으로 설정해야합니다.
이 글에서는 @EnableBatchProcessing
를 사용하지 않습니다.
이말고도 다른 방법이 얼마든지 있으니 참고만 해주시기 바랍니다.
IDEA: intellij
사용한 데이터베이스: postgreSQL
spring batch 5.0.5
1. spring batch 메타데이터 데이터베이스 만들기
데이터베이스 종류별로 spring batch schema를 만드는 sql이 있습니다.org.springframework.boot:spring-boot-starter-batch
의존성을 추가했다면 Extenal Libraries > org.springframework.boot:spring-boot-starter-batch
> ...jar
> org.springframework.batch.core
에 있습니다.
데이터베이스에 직접 복붙해서 sql을 실행시켜주면 메타데이터 테이블들이 만들어집니다.
postgreSQL은 다음과 같습니다.
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
2. application.yml
이제 다중 데이터베이스를 설정합니다.
spring:
jpa:
show-sql: true
hibernate:
ddl-auto: update
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
batch:
datasource:
driver-class-name: org.postgresql.Driver
jdbcUrl: jdbc:postgresql://<DB IP 주소>/<batch 데이터베이스 명>
username: DB owner 명
password: password
payment:
datasource:
driver-class-name: org.postgresql.Driver
jdbcUrl: jdbc:postgresql://<DB IP 주소>/<데이터베이스 명>
username: DB owner 명
password: password
저는 메타데이터를 batch 데이터베이스로 따로 관리했습니다. 그리고 다른 데이터베이스는 paymnet로 했습니다.driverClassName
으로 해도 됩니다.
3. DataSourceConfig
각 데이터베이스마다 DataSourceConfig.java
를 만들어줍니다.
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
basePackages = "com.example.batch",
entityManagerFactoryRef = "batchEntityManagerFactory",
transactionManagerRef = "batchTransactionManager"
)
public class BatchDataSourceConfig {
@Bean(name = "batchDataSource")
@Primary
@ConfigurationProperties(prefix = "spring.batch.datasource")
public DataSource batchDataSource() {
return DataSourceBuilder.create().build();
}
@Primary
@Bean(name = "batchEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean batchEntityManagerFactory(
EntityManagerFactoryBuilder builder,
@Qualifier("batchDataSource") DataSource batchDataSource
) {
return builder
.dataSource(batchDataSource)
.packages("com.example.batch")
.build();
}
@Primary
@Bean(name = "batchTransactionManager")
public PlatformTransactionManager batchTransactionManager(
final @Qualifier("batchEntityManagerFactory") EntityManagerFactory batchEntityManagerFactory
) {
return new JpaTransactionManager(batchEntityManagerFactory);
}
}
- @EnableJpaRepositories는 JPA Repository를 사용하기 위한 어노테이션입니다.
- @EnableJpaRepositories(basePackages = "@Repository가 있는 패키지 경로")
- @ConfigurationProperties(prefix = "application.yml에 지정한 dataSource")
- 두번째 메서드에서 EntityManagerFactoryBuilder의 packages메서드에는 테이블을 매핑시킬 엔티티 클래스의 패키지 경로를 넣어줍니다.
- entityManagerFactoryRef, transactionManagerRef는 이 데이터베이스에 사용할 jpa 엔티티매니저와 트랜잭션 매니저 이름입니다. 반드시 데이터베이스마다 달라야합니다.
- 이제 spring batch가 사용하는 데이터베이스에 jpa로 접근할 수 있습니다.
다음은 payment 데이터베이스입니다.
@Primary를 붙이지 않은 이유는 지정하지 않은 엔티티는 jpa가 spring batch의 데이터베이스에서 접근하도록 하기 위함입니다.
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
basePackages = "com.example.batch",
entityManagerFactoryRef = "paymentEntityManagerFactory",
transactionManagerRef = "paymentTransactionManager"
)
public class PaymentDataSourceConfig {
@Bean(name = "paymentDataSource")
@ConfigurationProperties(prefix = "spring.payment.datasource")
public DataSource paymentDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "paymentEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean paymentEntityManagerFactory(
EntityManagerFactoryBuilder builder,
@Qualifier("paymentDataSource") DataSource paymentDataSource
) {
return builder
.dataSource(paymentDataSource)
.packages("com.example.batch")
.build();
}
@Bean(name = "paymentTransactionManager")
public PlatformTransactionManager paymentTransactionManager(
@Qualifier("paymentEntityManagerFactory") EntityManagerFactory paymentEntityManagerFactory) {
return new JpaTransactionManager(paymentEntityManagerFactory);
}
}
4. Job
job을 작성합니다.
먼저 의존성을 주입해줍니다.
- JobRepository는 배치 메타데이터 엔티티를 사용하기 위해 필요한 레포지토리로, JobBuilder로 새로운 job을 만들어줄때 사용됩니다.
- PlatformTransactionManager와 EntityManagerFactory는 job에서 사용할 DataSourceConfig에서 등록한 Bean을 @Qualifier로 주입해줍니다.
- jpaRepository를 사용하려면 해당하는 레포지토리를 주입시켜줍니다.
@Configuration
public class PaymentCancelJob {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final PaymentRepository paymentRepository;
private final EntityManagerFactory entityManagerFactory;
public PaymentCancelJob(JobRepository jobRepository,
@Qualifier("paymentTransactionManager") PlatformTransactionManager transactionManager,
PaymentRepository paymentRepository,
@Qualifier("paymentEntityManagerFactory") EntityManagerFactory entityManagerFactory) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.paymentRepository = paymentRepository;
this.entityManagerFactory = entityManagerFactory;
}
...
}
다음으로 chunk 방식에서 사용되는 ItemReader, ItemProcessor, ItemWriter를 작성합니다.
- 각 객체들은 bean 객체이므로, 다른 job과 구분짓기 위해 고유한 메서드 명을 지어줘야합니다.
- ItemWriter에서 JpaItemWriterBuilder를 통해 작업한 데이터를 데이터베이스에 저장합니다.
@Bean
public ItemReader<Item> myJobItemReader() {
}
@Bean
public ItemProcessor<InputItem, OutputItem> myJobItemProcessor() {
}
@Bean
public JpaItemWriter<Item> myJobItemWriter() {
return new JpaItemWriterBuilder<InputEntity>()
.entityManagerFactory(entityManagerFactory)
.build();
}
다음은 step 함수를 작성합니다.
chunk의 첫번째 인자는 한 스텝 당 작업할 item 개수입니다.
@Bean
@Qualifier("myJobStep")
public Step myJobStep() {
return new StepBuilder("myJobStep", jobRepository)
.<InputItem, OutputItem>chunk(10, transactionManager)
.reader(myJobItemReader())
.processor(myJobItemProcessor())
.writer(myJobItemWriter())
.build();
}
마지막으로 job 함수를 작성합니다.
@Bean
@Qualifier("myJob")
public Job myJob(
@Qualifier("myJobStep") Step myJobStep) {
return new JobBuilder("myJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(myJobStep)
.build();
}
최종적으로 제가 작성한 job은 다음과 같습니다.
@Configuration
@Slf4j
public class PaymentCancelJob {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final PaymentRepository paymentRepository;
private final EntityManagerFactory entityManagerFactory;
private final CustomJobParameter customJobParameter;
public PaymentCancelJob(JobRepository jobRepository,
@Qualifier("paymentTransactionManager") PlatformTransactionManager transactionManager,
PaymentRepository paymentRepository,
@Qualifier("paymentEntityManagerFactory") EntityManagerFactory entityManagerFactory,
CustomJobParameter customJobParameter) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.paymentRepository = paymentRepository;
this.entityManagerFactory = entityManagerFactory;
this.customJobParameter = customJobParameter;
}
@Bean
@StepScope
public PaymentCancelItemReader paymentCancelReader() {
return new PaymentCancelItemReader(paymentRepository, customJobParameter);
}
@Bean
public ItemProcessor<Payment, Payment> paymentCancelProcessor() {
return payment -> {
payment.setPaymentStatus(PaymentStatus.CANCEL);
return payment;
};
}
@Bean
public JpaItemWriter<Payment> paymentCancelWriter() {
return new JpaItemWriterBuilder<Payment>()
.entityManagerFactory(entityManagerFactory)
.build();
}
@Bean
@JobScope
@Qualifier("updatePaymentStatusStep")
public Step updatePaymentStatusStep() {
return new StepBuilder("updatePaymentStatusStep", jobRepository)
.<Payment, Payment>chunk(10, transactionManager)
.reader(paymentCancelReader())
.processor(paymentCancelProcessor())
.writer(paymentCancelWriter())
.allowStartIfComplete(true)
.build();
}
@Bean
@Qualifier("updatePaymentStatusJob")
public Job updatePaymentStatusJob(
@Qualifier("updatePaymentStatusStep") Step updatePaymentStatusStep) {
return new JobBuilder("updatePaymentStatusJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(updatePaymentStatusStep)
.build();
}
}
단순히 payment 레코드의 state를 CANCLE로 바꾸는 작업입니다.
여기서는 chunk 방식의 job을 작성했고, ItemReader는 따로 구현체를 작성했습니다.
그리고 jobParameter를 좀더 편하게 사용하기 위해 CustomJobParameter를 만들었습니다.
5. Scheduler
보통 스케줄러를 사용해 배치 잡을 돌리는데, 저는 간단하게 spring scheduler를 사용했습니다.
quartz를 사용하면 더 세심하게 스케줄링을 할 수 있습니다.
먼저 의존성 주입으로 JobLauncher
, Job
을 주입합니다.
이때 실행하려는 job은 직접 정의한 Bean 객체이므로 @Qualifier
로 지정해야합니다.
@Component
@EnableScheduling
public class myJobSchedule {
private final JobLauncher jobLauncher;
private final Job myJob;
public PaymentCancelSchedule(JobLauncher jobLauncher,
@Qualifier("myJob") Job myJob
) {
this.jobLauncher = jobLauncher;
this myJob = myJob;
}
...
}
이제 job을 실행하는 함수를 작성합니다.
- jobParameter 객체를 만들어 JobLauncher.run에 넘겨줍니다.
- 제가 jobParameter에 값을 넣으면 스프링배치는 Job을 실행할때 이 값들을 사용합니다.
- Job은 jobParameter로 job을 구분하기 때문에 여러개의 job이 있거나 job을 여러번 실행한다면 이를 구분하기 위한 식별값이 필요합니다.
- 식별값으로 현재 시간이나 랜덤 값을 넣어줄 수 있습니다.
@Scheduled(cron = "0 0 2 * * ?", zone = "Asia/Seoul")
public void runJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("myJobUuid", UUID.randomUUID().toString()).toJobParameters();
jobLauncher.run(myJob, jobParameters);
}
}
제가 작성한 스케줄러는 다음과 같습니다.
JobExplorer는 메타데이터 테이블에서 마지막으로 실행된 job을 찾을때 사용했습니다.
@Slf4j
@Component
@EnableScheduling
public class PaymentCancelSchedule {
private final JobLauncher jobLauncher;
private final Job updatePaymentStatusJob;
private final JobExplorer jobExplorer;
public PaymentCancelSchedule(JobLauncher jobLauncher,
@Qualifier("updatePaymentStatusJob") Job updatePaymentStatusJob,
JobExplorer jobExplorer
) {
this.jobLauncher = jobLauncher;
this.updatePaymentStatusJob = updatePaymentStatusJob;
this.jobExplorer = jobExplorer;
}
@Scheduled(cron = "0 0 14 * * ?", zone = "Asia/Seoul")
public void runJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLocalDateTime("paymentJobStartTime", DateRangeUtil.getStartTime(2))
.addLocalDateTime("paymentJobEndTime", DateRangeUtil.getEndTime(2))
.addString("paymentCancelUuid", UUID.randomUUID().toString()).toJobParameters();
JobInstance jobInstance = jobExplorer.getLastJobInstance("updatePaymentStatusJob");
if (jobInstance != null) {
JobExecution jobExecution = jobExplorer.getLastJobExecution(jobInstance);
if (jobExecution != null &&
(jobExecution.getStatus() == BatchStatus.STOPPED ||
jobExecution.getStatus() == BatchStatus.FAILED
)) {
runUpdatePaymentAmountPaidJob(jobParameters);
}
}
runUpdatePaymentAmountPaidJob(jobParameters);
}
private void runUpdatePaymentAmountPaidJob(JobParameters jobParameters) {
try {
jobLauncher.run(updatePaymentStatusJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error(">>>>>>> error with updatePaymentStatusJob: {}", e.getMessage());
}
}
}
지금까지 잘 따라오셨다면 job이 잘 실행될 겁니다.
지금까지 @EnableBatchProcessing
없는 다중 데이터베이스 설정으로 스프링 배치 작업을 작성하는 방법이었습니다.
지적은 언제나 환영합니다.