Creating a Java 8 Stream from unbounded data using Spliterator

Posted by {"name"=>"Palash Ray", "email"=>"paawak@gmail.com", "url"=>"https://www.linkedin.com/in/palash-ray/"} on July 16, 2016 · 6 mins read

Problem Statement

I have a large XML file. I would like to read it, and group-by and aggregate the rows in it using Java 8. DOM parser with JAXB will not be able to handle this, as its a really large file. I would like to create a Stream from the unbounded data contained in the XML file.

Solution

I read the XML by streaming with Stax. Since I do not load the entire file in memory, I am good. I go a step further, and use JAXB to un-marshall small portions of this file, which I will call a row. I use a Spliterator backed by a BlockingQueue to create a Stream out of it. After I have the stream, I apply the famous grouping-by function and aggregate the rows.

The XML

The sample XML looks like this:


		1
		1552
		93
		1
		17
		24710.35
		0.04
		0.02
		N
		O
		1996-03-13
		1996-02-12
		1996-03-22
		DELIVER IN PERSON
		TRUCK
		blithely regular ideas caj
	

There would be thousands of elements "T". I have modeled my POJO on the element "T". I use Stax to read the xml. When I read one element "T", I use Jaxb to un-marshall it to a Java object and then add it to the Stream.

The POJO

I have modeled the POJO as below:

@XmlRootElement(name = "T")
@XmlAccessorType(XmlAccessType.FIELD)
public class LineItemRow {
    @XmlElement(name = "L_ORDERKEY")
    private int orderKey;
    @XmlElement(name = "L_PARTKEY")
    private int partKey;
    @XmlElement(name = "L_SUPPKEY")
    private int supplierKey;
    @XmlElement(name = "L_LINENUMBER")
    private int lineNumber;
    @XmlElement(name = "L_QUANTITY")
    private int quantity;
    @XmlElement(name = "L_EXTENDEDPRICE")
    private float extendedPrice;
    @XmlElement(name = "L_DISCOUNT")
    private float discount;
    @XmlElement(name = "L_TAX")
    private float tax;
    @XmlElement(name = "L_RETURNFLAG")
    private String returnFlag;
    @XmlElement(name = "L_LINESTATUS")
    private String lineStatus;
    @XmlElement(name = "L_SHIPDATE")
    @XmlJavaTypeAdapter(LocalDateAdapter.class)
    private LocalDate shippingDate;
    @XmlElement(name = "L_COMMITDATE")
    @XmlJavaTypeAdapter(LocalDateAdapter.class)
    private LocalDate commitDate;
    @XmlElement(name = "L_RECEIPTDATE")
    @XmlJavaTypeAdapter(LocalDateAdapter.class)
    private LocalDate receiptDate;
    @XmlElement(name = "L_SHIPINSTRUCT")
    private String shippingInstructions;
    @XmlElement(name = "L_SHIPMODE")
    private String shipMode;
    @XmlElement(name = "L_COMMENT")
    private String comment;
...

The Stax Parser

The heart of this is the Stax parser:

public class XmlParser {
    private static final Logger LOGGER = LoggerFactory.getLogger(XmlParser.class);
    private final String xmlElementName;
    private final Class classToUnmarshall;
    private final CountDownLatch countDownLatch;
    private final JaxbUnmarshaller jaxbUnmarshaller;
    private final BlockingQueue blockingQueue;
    private final StaxSpliterator staxSpliterator;
    private final Stream stream;
    public XmlParser(String xmlElementName, Class classToUnmarshall, CountDownLatch countDownLatch) {
	this.xmlElementName = xmlElementName;
	this.classToUnmarshall = classToUnmarshall;
	this.countDownLatch = countDownLatch;
	jaxbUnmarshaller = new JaxbUnmarshaller();
	blockingQueue = new ArrayBlockingQueue<>(10_000);
	staxSpliterator = new StaxSpliterator<>(blockingQueue);
	stream = StreamSupport.stream(staxSpliterator, true);
    }
    public Stream parse(InputStream inputStream) {
	Runnable doParse = () -> {
	    try {
		doParse(inputStream);
	    } catch (XMLStreamException e) {
		throw new RuntimeException(e);
	    }
	};
	new Thread(doParse).start();
	return stream;
    }
    private void doParse(InputStream inputStream) throws XMLStreamException {
	XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
	XMLStreamReader xmlStreamReader = xmlInputFactory.createXMLStreamReader(inputStream);
	StringBuilder buffer = newStringBuilder();
	while (xmlStreamReader.hasNext()) {
	    int eventType = xmlStreamReader.next();
	    if (eventType == XMLStreamConstants.START_ELEMENT) {
		String element = xmlStreamReader.getLocalName();
		if (xmlElementName.equals(element)) {
		    buffer = newStringBuilder();
		}
		buffer.append("<").append(element).append(">");
	    } else if (eventType == XMLStreamConstants.END_ELEMENT) {
		String element = xmlStreamReader.getLocalName();
		buffer.append("");
		if (xmlElementName.equals(element)) {
		    T newElement = jaxbUnmarshaller.unmarshall(new ByteArrayInputStream(buffer.toString().getBytes(StandardCharsets.UTF_8)), classToUnmarshall);
		    LOGGER.trace("publishing: {}", newElement);
		    blockingQueue.add(newElement);
		    buffer.setLength(0);
		}
	    } else if (eventType == XMLStreamConstants.CHARACTERS) {
		buffer.append(xmlStreamReader.getText().trim());
	    } else if (eventType == XMLStreamConstants.END_DOCUMENT) {
		staxSpliterator.endOfDocument();
		LOGGER.info("end of xml document");
		countDownLatch.countDown();
	    }
	}
    }
    private StringBuilder newStringBuilder() {
	return new StringBuilder(600);
    }
}

 
I use the CountDownLatch only because I need my JUnit to be alive till the document is read fully. It would not be needed in an actual server environment. Note the usage of the BlockingQueue.

Spliterator implementation

class StaxSpliterator implements Spliterator, EndOfDocumentListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(StaxSpliterator.class);
    private final BlockingQueue buffer;
    private AtomicBoolean endOfDocument = new AtomicBoolean(false);
    private static final long TIMEOUT = 100;;
    StaxSpliterator(BlockingQueue buffer) {
	this.buffer = buffer;
    }
    @Override
    public boolean tryAdvance(Consumer action) {
	if (endOfDocument.get()) {
	    return false;
	}
	T nonNullElement = null;
	LOGGER.trace("the buffer is empty, waiting for some time...");
	try {
	    nonNullElement = buffer.poll(TIMEOUT, TimeUnit.MILLISECONDS);
	} catch (InterruptedException e) {
	    throw new RuntimeException(e);
	}
	if (nonNullElement == null) {
	    LOGGER.trace("terminating as received null after waiting");
	    return false;
	}
	action.accept(nonNullElement);
	return true;
    }
    @Override
    public Spliterator trySplit() {
	return null;
    }
    @Override
    public long estimateSize() {
	return Long.MAX_VALUE;
    }
    @Override
    public int characteristics() {
	return Spliterator.NONNULL | Spliterator.CONCURRENT;
    }
    @Override
    public void endOfDocument() {
	LOGGER.info("end of document event received");
	endOfDocument.set(true);
    }
}

 

The grouping logic

This part is very simple. We actually stream a GZip file by using a GZIPInputStream:

public class XmlParserIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(XmlParserIT.class);
    @Test
    public void testParse() throws XMLStreamException, IOException, InterruptedException {
	CountDownLatch countDownLatch = new CountDownLatch(1);
	XmlParser xmlParser = new XmlParser("T", LineItemRow.class, countDownLatch);
	Stream stream = xmlParser.parse(new GZIPInputStream(XmlParserIT.class.getResourceAsStream("/datasets/xml/www.cs.washington.edu/lineitem.xml.gz")));
	Map> groupedByData = stream.collect(Collectors.groupingBy((LineItemRow row) -> {
	    LOGGER.trace("grouping by: {}", row);
	    return row.getOrderKey();
	}));
	groupedByData.entrySet().parallelStream().map((Entry> entry) -> {
	    int orderKey = entry.getKey();
	    DoubleStream doubleStream = entry.getValue().parallelStream().mapToDouble((LineItemRow row) -> {
		return row.getExtendedPrice();
	    });
	    LineItemRow aggregatedRow = new LineItemRow();
	    aggregatedRow.setOrderKey(orderKey);
	    aggregatedRow.setExtendedPrice((float) doubleStream.sum());
	    return aggregatedRow;
	}).forEach((LineItemRow row) -> {
	    LOGGER.info("new aggregated event: {}", row);
	});
	countDownLatch.await();
    }
}

 

Sources

https://github.com/paawak/blog/tree/master/code/reactive-streams/spliterator-demo
I found some large xmls from the below location:
http://www.cs.washington.edu/research/xmldatasets/www/repository.html