본문 바로가기

SPRING

Spring Batch- MultiResourceItemReader & HibernateItemWriter example

이 포스트에서는 MultiResourceItemReader  사용하여 다수의 flat file을 읽고 하이버네이트를 사용하여 database에 쓰기 위한(HibernateItemWriter 사용) Spring Batch 사용방법을 배울 것이다

Following technologies being used:

  • Spring Batch 3.0.1.RELEASE
  • Spring core 4.0.6.RELEASE
  • Hibernate 4.3.6.Final
  • MySQL Server 5.6
  • Joda Time 2.3
  • JDK 1.6
  • Eclipse JUNO Service Release 2

 

Step 1: Create project directory structure

여기서 하려는 것은 다수의 file src/main/resources/csv/*.txt로부터 읽고 Hibernate를 사용하여Mysql database에 쓸 것이다

 

 

Step 2: Create Database Table

create table EXAM_RESULT (

   id INT NOT NULL auto_increment PRIMARY KEY,  

   student_name VARCHAR(30) NOT NULL,

   dob DATE NOT NULL,

   percentage  double NOT NULL

);

 

 

Step3: Prepare Input files

아래는 /src/main/resources/csv/*.txt  flat file의 내용이다. 아 파일의 내용이 hibernate를 사용하는 database에 저장될 것이다.

ExamResult-Year2001.txt

Brian Burlet   |   01/02/1985  |   76
Jimmy Snuka    |   01/02/1983  |   39
Renard konig   |   01/02/1970  |   61
Kevin Richard  |   01/02/2002  |   59

 

ExamResult-Year2002.txt

 

Sam Disilva    |   01/05/1992  |   76
Bob corbet     |   10/07/1990  |   29
Rick Ricky     |   01/02/1973  |   54


 

ExamResult-Year2003.txt

 

Igor Watson    |   01/02/1986  |   34
Peet Sampras   |   01/02/1978  |   97
Rita Paul      |   01/02/1993  |   92
Han Yenn       |   01/02/1965  |   83

Step 4: Update pom.xml to include required dependencies

 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.websystique.springbatch</groupId> <artifactId>SpringBatchMultiReaderHibernateWriter</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SpringBatchMultiReaderHibernateWriter</name> <url>http://maven.apache.org</url> <properties> <springframework.version>4.0.6.RELEASE</springframework.version> <springbatch.version>3.0.1.RELEASE</springbatch.version> <hibernate.version>4.3.6.Final</hibernate.version> <javassist.version>3.18.1-GA</javassist.version> <mysql.connector.version>5.1.31</mysql.connector.version> <joda-time.version>2.3</joda-time.version> <c3p0.version>0.9.5-pre8</c3p0.version> </properties> <dependencies> <!-- Spring Core --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${springframework.version}</version> </dependency> <!-- Spring ORM support --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-orm</artifactId> <version>${springframework.version}</version> </dependency> <!-- Spring Batch --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${springbatch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${springbatch.version}</version> </dependency> <!-- Hibernate related dependencies --> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-core</artifactId> <version>${hibernate.version}</version> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>${javassist.version}</version> </dependency> <!-- Joda-Time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joda-time.version}</version> </dependency> <!-- To map JodaTime with database type --> <dependency> <groupId>org.jadira.usertype</groupId> <artifactId>usertype.core</artifactId> <version>3.0.0.CR1</version> </dependency> <!-- MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.connector.version}</version> </dependency> <!-- ComboPooledDataSource --> <dependency> <groupId>com.mchange</groupId> <artifactId>c3p0</artifactId> <version>${c3p0.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </pluginManagement> </build> </project>

하이버네이트를 사용하기 떄문에, hibernate-core dependency 포함시켰다.

Pooled data source ComboPooledDataSource를 사용할 것이다.

Mysql database에 연결하기 위해서 Mysql-connector-java가 필요하고, Jodatime MYSQL date 사이에 전환을 처리하기 위해서 usertype.core이 필요하다.

 

Step 5: Create Entity Class

아래는 표준 JPA annotatios를 사용하는 Entity class이다

 

package com.websystique.springbatch.model; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Table; import org.hibernate.annotations.Type; import org.joda.time.LocalDate; @Entity @Table(name = "EXAM_RESULT") public class ExamResult { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private long id; @Column(name = "STUDENT_NAME", nullable = false) private String studentName; @Column(name = "DOB", nullable = false) @Type(type="org.jadira.usertype.dateandtime.joda.PersistentLocalDate") private LocalDate dob; @Column(name = "PERCENTAGE", nullable = false) private double percentage; public long getId() { return id; } public void setId(long id) { this.id = id; } public String getStudentName() { return studentName; } public void setStudentName(String studentName) { this.studentName = studentName; } public LocalDate getDob() { return dob; } public void setDob(LocalDate dob) { this.dob = dob; } public double getPercentage() { return percentage; } public void setPercentage(double percentage) { this.percentage = percentage; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (id ^ (id >>> 32)); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (!(obj instanceof ExamResult)) return false; ExamResult other = (ExamResult) obj; if (id != other.id) return false; return true; } @Override public String toString() { return "ExamResult [id=" + id + ", studentName=" + studentName + ", dob=" + dob + ", percentage=" + percentage + "]"; } }

여기에서 특별한 부분은 @Type 선언이다. 

@Type Hibernate jodatime Localate database Date를 쉽게 map 할 수 있도록 도와준다

 

Step 6: Create Mapper to map the File fields to Entity Class

 

 

package com.websystique.springbatch;

import org.joda.time.LocalDate;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.websystique.springbatch.model.ExamResult;

public class ExamResultFieldSetMapper implements FieldSetMapper

{ @Override public ExamResult mapFieldSet(FieldSet fieldSet) throws BindException { ExamResult result = new ExamResult(); result.setStudentName(fieldSet.readString(0)); result.setDob(new LocalDate(fieldSet.readDate(1,"dd/MM/yyyy"))); result.setPercentage(fieldSet.readDouble(2)); return result; } }

 

Step 7: Create an ItemProcessor

ItemProcessor은 선택적이고, item(항목)을 읽은 후에 호출되어지고, item을 쓰기 전에 호출되어진다.

각각의 item business logic을 이행하기 위한 기회를 주기 위함이다.

예를들면, percentage60보다 작은 item을 제외할 수 있다( percentage >= 60)

 

package com.websystique.springbatch;

import org.springframework.batch.item.ItemProcessor;

import com.websystique.springbatch.model.ExamResult;

public class ExamResultItemProcessor implements ItemProcessor

{ @Override public ExamResult process(ExamResult result) throws Exception { System.out.println("Processing result :"+result); if(result.getPercentage() < 60){ return null; } return result; } }

 

Step 8: Add a Job listener(JobExecutionListener)

Job listener은 선택적이며,  job(작업)을 시작하기 전에, 완료한 후에 business logic을 실행시킬 수 있다. 예를들자면, job 전에 환경설정 셋팅을 하거나 job 완료 후 cleanup할 수 있다.

 

package com.websystique.springbatch;

import java.util.List;

import org.joda.time.DateTime;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class ExamResultJobListener implements JobExecutionListener {
	
	 private DateTime startTime, stopTime;
     
	    @Override
	    public void beforeJob(JobExecution jobExecution) {
	        startTime = new DateTime();
	        System.out.println("ExamResult Job starts at :"+startTime);
	    }
	     
	 
	    @Override
	    public void afterJob(JobExecution jobExecution) {
	        stopTime = new DateTime();
	        System.out.println("ExamResult Job stops at :"+stopTime);
	        System.out.println("Total time take in millis :"+getTimeInMillis(startTime , stopTime));
	         
	        if(jobExecution.getStatus() == BatchStatus.COMPLETED){
	            System.out.println("ExamResult job completed successfully");
	            //Here you can perform some other business logic like cleanup
	        }else if(jobExecution.getStatus() == BatchStatus.FAILED){
	            System.out.println("ExamResult job failed with following exceptions ");
	            List

exceptionList = jobExecution.getAllFailureExceptions(); for(Throwable th : exceptionList){ System.err.println("exception :" +th.getLocalizedMessage()); } } } private long getTimeInMillis(DateTime start, DateTime stop){ return stop.getMillis() - start.getMillis(); } }

 

Step 9: Create Spring Context with Hibernate SessionFactory & Spring Batch job configuration

모아진 datasource [context-datasource.xml]에 만들기.

 

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close"> <property name="driverClass" value="com.mysql.jdbc.Driver" /> <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/selfStudy" /> <property name="user" value="selfStudyId" /> <property name="password" value="selfStudyPw" /> </bean> </beans>

 

Hibernate SessionFactory[context-model.xml] 만들기

 

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd" default-autowire="byName" default-init-method="init"> <import resource = "classpath:context-datasource.xml" /> <bean id="sessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean" > <property name="dataSource" ref="dataSource"/> <property name="packagesToScan"> <list> <value>com.websystique.springbatch.model</value> </list> </property> <property name="hibernateProperties"> <props> <prop key="hibernate.dialect">org.hibernate.dialect.MySQLDialect</prop> <!-- <prop key="hibernate.show_sql">true</prop> --> <!-- <prop key="hibernate.format_sql">true</prop> --> </props> </property> </bean> <bean id="transactionManager" class="org.springframework.orm.hibernate4.HibernateTransactionManager" /> <tx:annotation-driven transaction-manager="transactionManager"/> </beans>

 

 

Spring-batch 내용을 [spring-batch-context.xml]에 모두 넣기

 

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> <import resource = "classpath:context-model.xml" /> <!-- JobRepository and JobLauncher 를 환경 설정/ classes 구성 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" /> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id = "multiResourceItemReader" class = "org.springframework.batch.item.file.MultiResourceItemReader"> <property name = "resources" value = "classpath:csv/ExamResult*.txt" /> <property name = "delegate" ref = "flatFileItemReader" /> </bean> <!-- ItemReader는 input file로부터 한 줄을 한단계씩 읽는다 --> <bean id = "flatFileItemReader" class = "org.springframework.batch.item.file.FlatFileItemReader" scope = "step"> <property name = "lineMapper"> <bean class = "org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name = "fieldSetMapper"> <!-- Mapper는 기록물 안에 있는 각각의 item(항목)을 POJO 속성에 map시킨다 --> <bean class="com.websystique.springbatch.ExamResultFieldSetMapper" /> </property> <property name="lineTokenizer"> <!-- A tokenizer class는 구체적인 문자에 의해서 input record 안에 있는 항목들을 분활할 때 사용한다 --> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="delimiter" value="|" /> </bean> </property> </bean> </property> </bean> <!-- ItemWriter는 data를 database에 씀 --> <bean id = "databaseItemWriter" class = "org.springframework.batch.item.database.HibernateItemWriter"> <property name = "sessionFactory" ref = "sessionFactory" /> </bean> <!-- Optional ItemProcessor to perform business logic/filtering on the input records --> <bean id="itemProcessor" class="com.websystique.springbatch.ExamResultItemProcessor" /> <!-- Optional JobExecutionListener to perform business logic before and after the job --> <bean id="jobListener" class="com.websystique.springbatch.ExamResultJobListener" /> <!-- Actual Job --> <batch:job id = "examResultJob"> <batch:step id = "step1"> <batch:tasklet transaction-manager = "transactionManager"> <batch:chunk reader = "multiResourceItemReader" writer = "databaseItemWriter" processor = "itemProcessor" commit-interval = "10" /> </batch:tasklet> </batch:step> <batch:listeners> <batch:listener ref = "jobListener" /> </batch:listeners> </batch:job> </beans>

MultiResourceItemReader를 사용하여 정의된 directory에 있는 특정한 패턴에 부합하는 모든 파일을 읽을 것이다.

그후 MultiResourceItemReader는, file readinginput file로부터 ‘|'로 분리하여 필드를 읽는 FlatFileItemReader에 위임할 것이다.

출력 작업에서는, property 값으로 sessionFactory를 필요로 하는 HibernateItemWriter 사용할 것이다.

ItemProcessor를 사용하여, percentage > 60을 만족하지 않는 기록물을 필터링할 것이다.

jobExecutionListenerjob을 실행하기 전과 후에 필요로 하는 임의적인 logic을 포함시킬 것이다.

 

Step 10: Create Main application to finally run the job

 

package com.websystique.springbatch; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class App { public static void main( String[] args ) { ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-context.xml"); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("examResultJob"); try { JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Job Exit Status : "+ execution.getStatus()); } catch (JobExecutionException e) { System.out.println("Job ExamResult failed"); e.printStackTrace(); } } }