Showing posts with label Spring Batch. Show all posts
Showing posts with label Spring Batch. Show all posts

Monday, May 19, 2014

Loading Appnexus Reports using Spring batch

Introduction to Appnexus reporting API

Unlike other Apis which works on request-response model, Appnexus reporting API is quite different as the report could not be directly downloaded, but has to be scheduled instead.  It has the following sequence of steps:

  • Step 1. Create a JSON-formatted report request
  • Step 2. POST the request to the Report Service
  • Step 3. GET the report status from the Report Service
  • Step 4. GET the report data from the Report Download Service

The following sequence diagram shows the steps involved in requesting and downloading a report from Appnexus.

Created with Raphaël 2.1.0MyServerMyServerAppnexusAppnexusStep 1I need a report(spec)report_idStep 2Is report ready?nope!Step 2(repeat)Is report ready?yesStep 3/download?report_id=xxcsv file

Modelling the above sequence using Spring batch

Configuration XML

The following listing shows the sample configuration file that is required for creating the batch job.


<batch:job id="reportLoadJob">
 <batch:step id="reportStep1" next="reportStep2">
  <batch:tasklet ref="reportRequestTasklet" />
 </batch:step>
 <batch:step id="reportStep2" next="reportStep3">
  <batch:tasklet ref="reportStatusCheckTasklet" />
 </batch:step>
 <batch:step id="reportStep3" next="reportStep4">
  <batch:tasklet ref="reportDownloadTasklet" />
 </batch:step>
 <batch:step id="reportStep4">
  <batch:tasklet>
   <batch:chunk reader="cvsFileItemReader" 
    writer="mysqlItemWriter" commit-interval="2">
  </batch:chunk>
   </batch:tasklet>
 </batch:step>
</batch:job>


Step 1: Report Request Tasklet

This is the Tasklet that is responsible for hitting Appnexus reporting API with the required parameters, and receives the report ID that would be eventually generated by Appnexus.


Step 2: Report Status Check Tasklet

This is the Tasklet that is responsible for hitting Appnexus reporting API with the report ID that was fetched in the previous step to check whether the requested report has been generated. If not, the step is repeated again with some time interval such that it is then until the report is ready. Once the report is ready, you will receive a response with status okay, along with the download URL, which will be used by the subsequent step to download the report.


Step 3: Report Download Tasklet

In this Tasklet, the download URL which was received in the previous step will be used to download the created CSV file. The received a file is stored in the local file system and the name of the file is updated in the job execution context, such that the next step could read the file name from it.


Step 4: Report Persist Tasklet

This is the step that is responsible for reading the data from the CSV file and persisting it into the database. For this to be done, first of all an entity that represents each line of the file has to be created. Then they could use chunk oriented processing to save the data in the database. This Tasklet is different from all other Tasklet we have seen so far, because they here the Tasklet consist of a reader, a writer and commit interval.

The reader is responsible for reading the CSV file online at a time, and it is passed on to the writer in chunks specified by the commit interval, following which the writer will save the items to the database.

Thus is quite easy to model a multistep job using Spring batch. Kindly post a comment if you either like it or if you feel the content is inappropriate in some way. If you need help modelling any of your jobs using Spring batch, kindly post a comment, that will be taken up as a topic for the subsequent posts. 

Sunday, May 18, 2014

Load Appnexus Country List using Spring Batch: Part 2

In the previous post we saw the configuration required for loading countries from Appnexus using Spring batch. In this post we will look at the implementation of the countryLoadTasklet and pagingAwareStepExecutionListener.

Country Load Tasklet

This class extends from a superclass AbstractPageLoadTasklet which is designed to accommodate any Tasklet which deals with pagable response.

@Scope("step")
public abstract class AbstractPageLoadTasklet
 <TYPE, REPO extends JpaRepository<TYPE, ?>, 
  RESPONSE extends AbstractPagableResponse>
 implements Tasklet {

 @Autowired
 private AppnexusRestClient appnexusClient;

 @Value("#{stepExecutionContext['" + StepAttribute.START_ELEMENT + "']}")
 private int startElement;

 @Override
 public RepeatStatus execute(StepContribution stepContribution,
 ChunkContext chunkContext) throws Exception {
 // ...
 }

 public abstract List<TYPE> 
 getItemsFromResponse(RESPONSE response);

 public abstract ResponseContainer<RESPONSE> 
 getPageableResponse(
  AppnexusRestClient appnexusClient, int startElement);

 public abstract REPO getRepository();

 protected void save(List<TYPE> items) {
 getRepository().save(items);
 }
}

Kindly pay attention to the fact that this is abstract class and there are a few methods which are specific to the Tasklet has been made abstract. The implementation of the class itself is very simple:

Detail 1: Implementing Tasklet interface

public abstract class AbstractPageLoadTasklet implements Tasklet {

It extends from Tasklet interface, which is provided by Spring batch.

Detail 2: Injection of AppnexusRestClient

@Autowired
private AppnexusRestClient appnexusClient;

Auto wiring of AppnexusRestClient, the creation of which was discussed in one of the previous post.

Detail 3: Injection of startElement

@Value("#{stepExecutionContext['" + StepAttribute.START_ELEMENT + "']}")
private int startElement;

An interesting observation to make here is the injection of startElement, which is actually being read from the step execution context. As we saw in the previous post, each time the step is being executed, the start element will be updated depending upon the page that has to be fetched. This ensures that the subsequent pages that are being fetched our progressive till we reached the end. The job of updating the start element at the end of each step is performed by pagingAwareStepExecutionListener the details of which are given below.

Detail 4: Implementation of the execute method

This method has three parts:

1. Reading the list of countries by using the appnexusClient. 

ResponseContainer<RESPONSE> responseContainer = 
 getPageableResponse(appnexusClient, startElement);
RESPONSE response = responseContainer.getResponse();
// fetch items from response
List<TYPE> items = getItemsFromResponse(response);

2. Updating the job execution context with the variables: 1.start element 2. Number of elements 3.Count (total number of elements) 

// Part 2: update execution context
ExecutionContext executionContext = chunkContext.getStepContext()
 .getStepExecution().getExecutionContext();
executionContext.put(StepAttribute.START_ELEMENT, response.getStartElement());
executionContext.put(StepAttribute.NUM_ELEMENTS, response.getNumElements());
executionContext.put(StepAttribute.COUNT, response.getCount());

3. Saving the fetch items into the database using the repository. 

// Part 3: save items
save(items);
return RepeatStatus.FINISHED;


CountryLoadTasklet 

The following listing show the implementation of the country Load Tasklet, which extends from AbstractPageLoadTasklet.

public class CountryLoadTasklet extends
  AbstractPageLoadTasklet<Country, CountryRepo, CountryListResponse> {

 @Autowired
 private CountryRepo countryRepo;

 @Override
 public List<Country> getItemsFromResponse(CountryListResponse response) {
  return response.getCountries();
 }

 @Override
 public ResponseContainer<CountryListResponse> getPageableResponse(
   AppnexusRestClient appnexusClient, int startElement) {
  return appnexusClient.getCountryList(startElement);
 }

 @Override
 public CountryRepo getRepository() {
  return countryRepo;
 }
}

Kindly pay attention to the implementation of all the abstract methods which are defined in the superclass. The implementation of these methods itself is simple, because there is no logic in the implementation of any of these methods, as the superclass has all the intelligence required to fetch the pages from Appnexus. 

PagingAwareStepExecutionListener

The following listing shows the implementation of this class.
Detail 1: Implementing StepExecutionListener interface 


@Component
public class PagingAwareStepExecutionListener implements StepExecutionListener {

 @Override
 public void beforeStep(StepExecution stepExecution) {
    // ...
 }

 @Override
 public ExitStatus afterStep(StepExecution stepExecution) {
    // ...
 }
}

StepExecutionListener interface is a component provided by Spring batch and listeners must implement this interface. This interface consists of two methods, beforeStep and afterStep. 

Detail 2: Implementation of beforeStep method:

This method is invoked before the execution of each step. The implementation is quite simple, which involves reading the start element from the job execution context, which were supplied by the use of before invoking the job and updating the value in the step execution context such that the step can read it to fetch the right page.

Details 3: Implementation of afterStep method:

This method performs the following:

1. Reads the paging information from the step execution context, which was populated by the step itself.

ExecutionContext context = stepExecution.getExecutionContext();
int startElement = context.getInt(StepAttribute.START_ELEMENT);
int count = context.getInt(StepAttribute.COUNT);
int numElements = context.getInt(StepAttribute.NUM_ELEMENTS);
int remainingElements = count - (startElement + numElements);

2. Checks if more items are to be read from Appnexus. If there are more items, update the start element in the step execution context accordingly and return CONTINUE as the exit status, otherwise returned END as exit status.

if (remainingElements <= 0) {
 System.out.println("Task finished.");
 return new ExitStatus("END");
}

stepExecution.getJobExecution()
 .getExecutionContext()
 .putInt(StepAttribute.START_ELEMENT, startElement + numElements);

return new ExitStatus("CONTINUE (Items Remaining: " 
 + remainingElements + ")");

Conclusion

We have seen the implementation details of the two most important components the Tasklet and the Execution Listener. We are also seeing the abstract class AbstractPageLoadTasklet, which contains the code logic hence reducing the implementation of CountryLoadTasklet to the minimum. In the next post we will design a service layer for the batch jobs.

Saturday, May 17, 2014

Load Appnexus Country List using Spring Batch: Part 1

In this post we will discuss the details of loading the list of countries from Appnexus using Spring Batch.

Sequence of steps

The following diagram shows the sequence of steps involved in fetching the list of countries from Appnexus.


Created with Raphaël 2.1.0MyServerMyServerAppnexusAppnexusAuthentication RequestAuthentication TokenAuthentication CompleteGET /country?start_element=0country_listPage 1num_element=100, start_element=0, count=250GET /country?start_element=100country_listPage 2num_element=100, start_element=100, count=250GET /country?start_element=200country_listPage 3num_element=50, start_element=200, count=250
Summary of steps

  1. Fetch the authentication token from Appnexus using the user name and password. 
  2. Requesting country endpoint with start element zero. 
  3. Identifying the remaining elements from the three values start element, num_elements and count. 
  4. Requesting for subsequent elements using the same endpoint. 
  5. Continue the process until all the pages are read. 


Using for loop 

The above interaction could be simply achieved by using a for loop or a while loop. But the object to this post is to present you with an alternate mechanism of achieving the same and the advantages of it. I have chosen Spring Batch for this purpose, in the rest of this post we will look at a brief introduction to Spring batch followed by the advantages of choosing it.

Introduction to Spring Batch 

Spring batch is a framework designed to provide the necessary infrastructure for the execution of batch jobs. For more information about Spring batch refer the Wikipedia entry or the official homepage.

Introduction to batch jobs 

For a job to be classified as a batch job it should possess a few characteristics. For instance,

  • it should be long-running
  • it should not need human interaction
  • most often it will be run on a periodic basis. 

These are just a few characteristics that are vital for a batch job, for more information on batch jobs and their characteristics kindly refer to the following Wikipedia entry on batch jobs.

Why batch infrastructure? 

As I suggested in one of the above paragraphs, the specific problem of loading the list of countries from Appnexus could be solved just using a for loop, but then why should use Spring batch to do it. There are a couple of disadvantages in doing it with plain Java.

  • No history of the jobs run. 
  • No restart capability. 
  • No protection against the job being running again. 
  • Retry and failure handling has to be implemented ourselves. 
  • In case of complex jobs with multiple steps, the flow has to be managad manually. 

Spring batch provides a lot of functionality like history of jobs, restarting capability, protection against unwanted reputation of jobs, error handling and custom step flow. Getting into more details about Spring batch is not the purpose of this post, kindly refer to the official documentation for more information.

Spring Batch Terminology

Job

A Job is an entity that encapsulates an entire batch process.

Step

A Step is a domain object that encapsulates an independent, sequential phase of a batch job. Therefore, every Job is composed entirely of one or more steps. A Step contains all of the information necessary to define and control the actual batch processing.

Tasklet

The Tasklet is a simple interface that has one method, execute, which will be a called repeatedly by the TaskletStep until it either returns RepeatStatus.FINISHED or throws an exception to signal a failure.

All the above definitions are taken from the official documentation, kindly refer to it for more information on Spring batch architecture and terminology.

Required Components

Going by the about definition, we could adopt the sequence listed in the above diagram as follows:

Appnexus country load Job - Overall flow of loading all the countries from Appnexus.

Appnexus country load Step - As you can see the job has 3 calls to Appnexus country endpoint, which is naturally the steps involved.

To summarise, Appnexus country load has one job, which has three steps, but then all the steps are similar (except for the start element which is different). So this leaves us with a single job with a single step which is repeatedly called until all the pages are read.

Spring Batch Configuration

Please find below the configuration file for the job we just discussed: country-load-job.xml

<batch:job id="countryLoadJob">
 <batch:step id="step1">
  <batch:tasklet ref="countryLoadTasklet" />
  <batch:next on="CONTINUE*" to="step1" />
  <batch:end on="END" />
  <batch:listeners>
   <batch:listener ref="pagingAwareStepExecutionListener" />
  </batch:listeners>
 </batch:step>
</batch:job>

As you can see here, the configuration is actually simple. As you can see from the XML, there is only one step, "step1", the step is being called repeatedly as long as the status is "CONTINUE" and the job is terminated once the status is "END". Kindly pay attention to the listener "pagingAwareStepExecutionListener" which determines the status of the step depending upon the values start element, number of elements and the total number of elements.

This brings us to the end of this post on loading Appnexus countrie using Spring batch. In the following post we shall get into the details of the implementation of countryLoadTasklet and pagingAwareStepExecutionListener.

Kindly post a comment if you either like it or if you feel the content is inappropriate in some way.