Merging Spring Batch jobs to reduce Data look up time

When designing a batch job, a common requirement is to take a list of records and enhance them with data from another source.

There are a couple of ways to achieve this. One way would be to read a record from the main list and then gather the additional information for that record before reading the next record. This gathering would typically be done in an ItemProcessor class.

This is fine when there is a relatively small number of records to process, but for a larger dataset, it may be more efficient to retrieve the data as two (or more) lists and merge them together.

In one example I had recently, a job needed to read 15 million records from a CSV file and enhance each one with some values from corresponding records in a database. The database lookup for each record would have taken about 1 millisecond, so to process the 15 million records would have taken over 2 hours if the job was run using a single thread. Running a query to retrieve all the additional values in one go and merge the results with the CSV file records, reduced that time to about 20 minutes – a reduction of more than 80%. Using multiple threads would have helped with the overall time for either solution, but you get the idea.

Let’s see how I performed the merging. This relies on the different lists being in the same order according to whatever key you will be using to match records in the different lists.

The merging is performed by creating a custom reader that utilises a helper class to read items from an ItemReader looking for records for the next key or accumulating them for a particular key. The helper class looks like this:

package my.package;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;

/**
 * Methods for finding and reading items from an ItemReader and accumulating them according to a key value.
 * 
 * @param <T> The class of the items to be processed
 * @param <K> The class of the key value of the items being processed. Used for positioning the reader to a particular place in the input 
 */
public abstract class ItemAccumulator<T, K> implements ItemStream {
		
	private ItemReader<T> reader;
	
	private T lastItem;
	private List<T> lastItemList;
	
	/**
	 * Extract the key of an item.
	 * 
	 * @param item
	 * @return the key value for the item
	 */
	public abstract K getKey(T item);
		
	/**
	 * Determine whether an item matches the key or is for a key greater than the specified key.
	 * <p/>
	 * This is used when reading forward to find records for a particular key, if any exist. 
	 * 
	 * @param item item to be checked
	 * @param key key value
	 * @return true if the item matches or is greater than the key, otherwise false.
	 */
	public boolean checkPositionKey(T item, K key) {
		return getKey(item) == null || ((Comparable<K>)getKey(item)).compareTo(key) < 0;

	}

	/**
	 * Determine whether an item matches the key.
	 * 
	 * @param item item to be checked
	 * @param key key value
	 * @return true if the item matches the key, otherwise false.
	 */
	public boolean checkEqualKey(T item, K key) {
		
		return getKey(item).equals(key);
	}

	/**
	 * Read forward through the Reader looking for the first record with 
	 * a key that is equal to or greater than the key passed in.
	 * 
	 * @param key Key to search for. 
	 * @return First item matching search criteria, or null if no suitable item found.
	 * @throws Exception
	 */
	public T findEventRecord(K key) throws Exception {
		
		T secondRecord = lastItem;
		
		while (secondRecord != null && checkPositionKey(secondRecord, key)) {
			secondRecord = reader.read();
		}
		
		return secondRecord;
	}
	
	/**
	 * Read the items for a particular key, if any exist.
	 * <p/>
	 * The reader will be positioned on the first record for the next key.
	 * 
	 * @param key
	 * @return List of items
	 * @throws Exception
	 */
	public List<T> readNextItems(K key) throws Exception {

		// If haven't read first record yet, then do that now.
		if (lastItem == null) {
			lastItem = reader.read();
		}
		
		// does last read list match requested key ?
		if (lastItem != null && lastItemList != null && !lastItemList.isEmpty() 
			&& checkEqualKey(lastItemList.get(0), key)) {
			return lastItemList;
		}
		
		// Look for first item for key 
		lastItem = readNextItem(key);

		if (lastItem != null) {

			lastItemList = new ArrayList<T>();

			while (lastItem != null && checkEqualKey(lastItem, key)) {
				lastItemList.add(lastItem);
				// Read next item. On EOF, data will be null.
				lastItem = reader.read();
			}
		} else {
			lastItemList = new ArrayList<T>(0);
		}

		return lastItemList;
	}

	/**
	 * Read the items for a particular key, if any exist.
	 * <p/>
	 * The reader will be positioned on the first record for the next key.
	 * 
	 * @param key
	 * @return next item for the key, or null if no such record
	 * @throws Exception
	 */
	public T readNextItem(K key) throws Exception {

		// If haven't read first record yet, then do that now.
		if (lastItem == null) {
			lastItem = reader.read();
		}
		
		if (lastItem != null) {
			// Haven't reached EOF for the data,
			// so look for more data for the current key
			if (!checkEqualKey(lastItem, key)) {
				lastItem = findEventRecord(key);
			}

			if (lastItem != null && !checkEqualKey(lastItem, key)) {
				return null;
			}
		}

		return lastItem;
	}

	/**
	 * Read the items for the next key.
	 * <p/>
	 * The reader will be positioned on the first record for the next key.
	 * 
	 * @param key
	 * @return List of items
	 * @throws Exception
	 */
	public List<T> readNextItems() throws Exception {
		
		// If haven't read first record yet, then do that now.
		if (lastItem == null) {
			lastItem = reader.read();
		}

		K lastKey = lastItem == null ? null : getKey(lastItem);
		
		lastItemList = new ArrayList<T>();

		// Haven't reached EOF for the data,
		// so look for more data for the current key	
		while (lastItem != null && checkEqualKey(lastItem, lastKey)) {
			lastItemList.add(lastItem);
			// Read next record. On EOF, data will be null.
			lastItem = reader.read();
		}

		return lastItemList;
	}

	@Override
	public void open(ExecutionContext executionContext) throws ItemStreamException {
		if (reader instanceof ItemStream)
			((ItemStream) reader).open(executionContext);
	}

	@Override
	public void update(ExecutionContext executionContext) throws ItemStreamException {		
		if (reader instanceof ItemStream)
			((ItemStream) reader).update(executionContext);
	}

	@Override
	public void close() throws ItemStreamException {		
		if (reader instanceof ItemStream)
			((ItemStream) reader).close();
	}

	public void setReader(ItemReader<T> reader) {
		this.reader = reader;
	}
}

Okay, there’s quite a bit of code here. The class gives you methods to read the items for the next key in the data, to read the first item for a particular key of all items for a key. I have found that this covers all of my requirements.

Let’s look at how you would use it. Imagine a situation where there is a list of PrimaryObj objects that need to be merged with a list of SecondaryObj objects.

To create the concrete classes you’ll need an accumulator class for each model class – like this for PrimaryObj and something similar for SecondObj:

package my.package;

import my.package.ItemAccumulator;
import my.package.model.PrimaryObj;

/**
 * Accumulate PrimaryObj objects.
 */
public class PrimaryRecordAccumulator extends ItemAccumulator<PrimaryObj, Long> {

	@Override
	public Long getKey(PrimaryObj item) {
		return item.getItemKey();

In this case the key used to match the records is a Long value, but it could be any class that implements Comparable.

Here’s an example of a reader class that reads a list of PrimaryObj records for the next key value, and also reads any SecondaryObj records for that key. Both lists are added to a MergeWrapper object that is returned by the reader:

package my.package;

import java.util.List;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import my.package.PrimaryRecordAccumulator ;
import my.package.SecondaryRecordAccumulator ;
import my.package.model.MergeWrapper;
import my.package.model.PrimaryObj;

/**
 * Read the PrimaryObj and SecondObj records for a key 
 * and return them in a wrapper object. 
 */
public class MergeDataReader implements ItemStreamReader<MergeWrapper> {
	
	private PrimaryRecordAccumulator primaryAccumulator;
	private SecondaryRecordAccumulator secondaryAccumulator;
	
	@Override
	public MergeWrapper read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {		
		List<PrimaryObj> items = primaryAccumulator.readNextItems();
		if (items == null || items.size() == 0) {
			return null;
		}
		
		MergeWrapper wrapper = new MergeWrapper();
		wrapper.setPrimaryItems(items);
		Long key = primaryAccumulator.getKey(items.get(0));
		wrapper.setSecondaryItems(secondaryAccumulator.readNextItems(key));
		
		return wrapper;
	}
	
	@Override
	public void open(ExecutionContext executionContext) throws ItemStreamException {
		primaryAccumulator.open(executionContext);
		secondaryAccumulator.open(executionContext);
	}

	@Override
	public void update(ExecutionContext executionContext) throws ItemStreamException {
		primaryAccumulator.update(executionContext);
		secondaryAccumulator.update(executionContext);
	}

	@Override
	public void close() throws ItemStreamException {
		primaryAccumulator.close();
		secondaryAccumulator.close();
	}

	public void setDocumentAccumulator(PrimaryRecordAccumulator primaryAccumulator) {
		this.primaryAccumulator = primaryAccumulator;
	}

	public void setEventAccumulator(SecondaryRecordAccumulator secondaryAccumulator) {
		this.secondaryAccumulator = secondaryAccumulator;
	}
}

The reader just gathers the matching records from both sources together into a wrapper object. It would be possible to combine the lists of records together in the reader, but I prefer to do that in a processor – in my view the reader should just read the data.

It is possible to handle three or more data sources that need to be merged. You just need to change the reader to gather the records from however many sources you have and store them in the wrapper object.

To Conclude..

This approach works best if the lists of items to be processed are of a similar size.

If the secondary list is many times the size of the primary list, you will end up reading and ignoring a lot of data. If this is from a database, you may be able to tweak the query to avoid reading unnecessary rows, but if the data is from a flat file, there is little you can do but read through the entire file. In fact, if the secondary data is from a flat file, unless you are able to load that flat file into a database in a separate step in the job, this is the only option as flat files don’t support random access.

Using this approach has been beneficial for me using both databases and flat files. But it can only work if all the data sources are read in in the same order and have the same common key that can be used to match items. If this is not the case, then you will have to look up the additional data required.

 

 

Photo by Steve Johnson on Unsplash

Jeremy Yearron January 7, 2019

Leave a Reply

Your email address will not be published. Required fields are marked *