Sunday, May 18, 2014

Load Appnexus Country List using Spring Batch: Part 2

In the previous post we saw the configuration required for loading countries from Appnexus using Spring batch. In this post we will look at the implementation of the countryLoadTasklet and pagingAwareStepExecutionListener.

Country Load Tasklet

This class extends from a superclass AbstractPageLoadTasklet which is designed to accommodate any Tasklet which deals with pagable response.

@Scope("step")
public abstract class AbstractPageLoadTasklet
 <TYPE, REPO extends JpaRepository<TYPE, ?>, 
  RESPONSE extends AbstractPagableResponse>
 implements Tasklet {

 @Autowired
 private AppnexusRestClient appnexusClient;

 @Value("#{stepExecutionContext['" + StepAttribute.START_ELEMENT + "']}")
 private int startElement;

 @Override
 public RepeatStatus execute(StepContribution stepContribution,
 ChunkContext chunkContext) throws Exception {
 // ...
 }

 public abstract List<TYPE> 
 getItemsFromResponse(RESPONSE response);

 public abstract ResponseContainer<RESPONSE> 
 getPageableResponse(
  AppnexusRestClient appnexusClient, int startElement);

 public abstract REPO getRepository();

 protected void save(List<TYPE> items) {
 getRepository().save(items);
 }
}

Kindly pay attention to the fact that this is abstract class and there are a few methods which are specific to the Tasklet has been made abstract. The implementation of the class itself is very simple:

Detail 1: Implementing Tasklet interface

public abstract class AbstractPageLoadTasklet implements Tasklet {

It extends from Tasklet interface, which is provided by Spring batch.

Detail 2: Injection of AppnexusRestClient

@Autowired
private AppnexusRestClient appnexusClient;

Auto wiring of AppnexusRestClient, the creation of which was discussed in one of the previous post.

Detail 3: Injection of startElement

@Value("#{stepExecutionContext['" + StepAttribute.START_ELEMENT + "']}")
private int startElement;

An interesting observation to make here is the injection of startElement, which is actually being read from the step execution context. As we saw in the previous post, each time the step is being executed, the start element will be updated depending upon the page that has to be fetched. This ensures that the subsequent pages that are being fetched our progressive till we reached the end. The job of updating the start element at the end of each step is performed by pagingAwareStepExecutionListener the details of which are given below.

Detail 4: Implementation of the execute method

This method has three parts:

1. Reading the list of countries by using the appnexusClient. 

ResponseContainer<RESPONSE> responseContainer = 
 getPageableResponse(appnexusClient, startElement);
RESPONSE response = responseContainer.getResponse();
// fetch items from response
List<TYPE> items = getItemsFromResponse(response);

2. Updating the job execution context with the variables: 1.start element 2. Number of elements 3.Count (total number of elements) 

// Part 2: update execution context
ExecutionContext executionContext = chunkContext.getStepContext()
 .getStepExecution().getExecutionContext();
executionContext.put(StepAttribute.START_ELEMENT, response.getStartElement());
executionContext.put(StepAttribute.NUM_ELEMENTS, response.getNumElements());
executionContext.put(StepAttribute.COUNT, response.getCount());

3. Saving the fetch items into the database using the repository. 

// Part 3: save items
save(items);
return RepeatStatus.FINISHED;


CountryLoadTasklet 

The following listing show the implementation of the country Load Tasklet, which extends from AbstractPageLoadTasklet.

public class CountryLoadTasklet extends
  AbstractPageLoadTasklet<Country, CountryRepo, CountryListResponse> {

 @Autowired
 private CountryRepo countryRepo;

 @Override
 public List<Country> getItemsFromResponse(CountryListResponse response) {
  return response.getCountries();
 }

 @Override
 public ResponseContainer<CountryListResponse> getPageableResponse(
   AppnexusRestClient appnexusClient, int startElement) {
  return appnexusClient.getCountryList(startElement);
 }

 @Override
 public CountryRepo getRepository() {
  return countryRepo;
 }
}

Kindly pay attention to the implementation of all the abstract methods which are defined in the superclass. The implementation of these methods itself is simple, because there is no logic in the implementation of any of these methods, as the superclass has all the intelligence required to fetch the pages from Appnexus. 

PagingAwareStepExecutionListener

The following listing shows the implementation of this class.
Detail 1: Implementing StepExecutionListener interface 


@Component
public class PagingAwareStepExecutionListener implements StepExecutionListener {

 @Override
 public void beforeStep(StepExecution stepExecution) {
    // ...
 }

 @Override
 public ExitStatus afterStep(StepExecution stepExecution) {
    // ...
 }
}

StepExecutionListener interface is a component provided by Spring batch and listeners must implement this interface. This interface consists of two methods, beforeStep and afterStep. 

Detail 2: Implementation of beforeStep method:

This method is invoked before the execution of each step. The implementation is quite simple, which involves reading the start element from the job execution context, which were supplied by the use of before invoking the job and updating the value in the step execution context such that the step can read it to fetch the right page.

Details 3: Implementation of afterStep method:

This method performs the following:

1. Reads the paging information from the step execution context, which was populated by the step itself.

ExecutionContext context = stepExecution.getExecutionContext();
int startElement = context.getInt(StepAttribute.START_ELEMENT);
int count = context.getInt(StepAttribute.COUNT);
int numElements = context.getInt(StepAttribute.NUM_ELEMENTS);
int remainingElements = count - (startElement + numElements);

2. Checks if more items are to be read from Appnexus. If there are more items, update the start element in the step execution context accordingly and return CONTINUE as the exit status, otherwise returned END as exit status.

if (remainingElements <= 0) {
 System.out.println("Task finished.");
 return new ExitStatus("END");
}

stepExecution.getJobExecution()
 .getExecutionContext()
 .putInt(StepAttribute.START_ELEMENT, startElement + numElements);

return new ExitStatus("CONTINUE (Items Remaining: " 
 + remainingElements + ")");

Conclusion

We have seen the implementation details of the two most important components the Tasklet and the Execution Listener. We are also seeing the abstract class AbstractPageLoadTasklet, which contains the code logic hence reducing the implementation of CountryLoadTasklet to the minimum. In the next post we will design a service layer for the batch jobs.

No comments :

Post a Comment