I have a large XML file. I would like to read it, and group-by and aggregate the rows in it using Java 8. DOM parser with JAXB will not be able to handle this, as its a really large file. I would like to create a Stream from the unbounded data contained in the XML file.
I read the XML by streaming with Stax. Since I do not load the entire file in memory, I am good. I go a step further, and use JAXB to un-marshall small portions of this file, which I will call a row. I use a Spliterator backed by a BlockingQueue to create a Stream out of it. After I have the stream, I apply the famous grouping-by function and aggregate the rows.
The sample XML looks like this:
There would be thousands of elements "T". I have modeled my POJO on the element "T". I use Stax to read the xml. When I read one element "T", I use Jaxb to un-marshall it to a Java object and then add it to the Stream.
I have modeled the POJO as below:
@XmlRootElement(name = "T") @XmlAccessorType(XmlAccessType.FIELD) public class LineItemRow { @XmlElement(name = "L_ORDERKEY") private int orderKey; @XmlElement(name = "L_PARTKEY") private int partKey; @XmlElement(name = "L_SUPPKEY") private int supplierKey; @XmlElement(name = "L_LINENUMBER") private int lineNumber; @XmlElement(name = "L_QUANTITY") private int quantity; @XmlElement(name = "L_EXTENDEDPRICE") private float extendedPrice; @XmlElement(name = "L_DISCOUNT") private float discount; @XmlElement(name = "L_TAX") private float tax; @XmlElement(name = "L_RETURNFLAG") private String returnFlag; @XmlElement(name = "L_LINESTATUS") private String lineStatus; @XmlElement(name = "L_SHIPDATE") @XmlJavaTypeAdapter(LocalDateAdapter.class) private LocalDate shippingDate; @XmlElement(name = "L_COMMITDATE") @XmlJavaTypeAdapter(LocalDateAdapter.class) private LocalDate commitDate; @XmlElement(name = "L_RECEIPTDATE") @XmlJavaTypeAdapter(LocalDateAdapter.class) private LocalDate receiptDate; @XmlElement(name = "L_SHIPINSTRUCT") private String shippingInstructions; @XmlElement(name = "L_SHIPMODE") private String shipMode; @XmlElement(name = "L_COMMENT") private String comment; ...
The heart of this is the Stax parser:
public class XmlParser{ private static final Logger LOGGER = LoggerFactory.getLogger(XmlParser.class); private final String xmlElementName; private final Class classToUnmarshall; private final CountDownLatch countDownLatch; private final JaxbUnmarshaller jaxbUnmarshaller; private final BlockingQueue blockingQueue; private final StaxSpliterator staxSpliterator; private final Stream stream; public XmlParser(String xmlElementName, Class classToUnmarshall, CountDownLatch countDownLatch) { this.xmlElementName = xmlElementName; this.classToUnmarshall = classToUnmarshall; this.countDownLatch = countDownLatch; jaxbUnmarshaller = new JaxbUnmarshaller(); blockingQueue = new ArrayBlockingQueue<>(10_000); staxSpliterator = new StaxSpliterator<>(blockingQueue); stream = StreamSupport.stream(staxSpliterator, true); } public Stream parse(InputStream inputStream) { Runnable doParse = () -> { try { doParse(inputStream); } catch (XMLStreamException e) { throw new RuntimeException(e); } }; new Thread(doParse).start(); return stream; } private void doParse(InputStream inputStream) throws XMLStreamException { XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance(); XMLStreamReader xmlStreamReader = xmlInputFactory.createXMLStreamReader(inputStream); StringBuilder buffer = newStringBuilder(); while (xmlStreamReader.hasNext()) { int eventType = xmlStreamReader.next(); if (eventType == XMLStreamConstants.START_ELEMENT) { String element = xmlStreamReader.getLocalName(); if (xmlElementName.equals(element)) { buffer = newStringBuilder(); } buffer.append("<").append(element).append(">"); } else if (eventType == XMLStreamConstants.END_ELEMENT) { String element = xmlStreamReader.getLocalName(); buffer.append("").append(element).append(">"); if (xmlElementName.equals(element)) { T newElement = jaxbUnmarshaller.unmarshall(new ByteArrayInputStream(buffer.toString().getBytes(StandardCharsets.UTF_8)), classToUnmarshall); LOGGER.trace("publishing: {}", newElement); blockingQueue.add(newElement); buffer.setLength(0); } } else if (eventType == XMLStreamConstants.CHARACTERS) { buffer.append(xmlStreamReader.getText().trim()); } else if (eventType == XMLStreamConstants.END_DOCUMENT) { staxSpliterator.endOfDocument(); LOGGER.info("end of xml document"); countDownLatch.countDown(); } } } private StringBuilder newStringBuilder() { return new StringBuilder(600); } }
I use the CountDownLatch only because I need my JUnit to be alive till the document is read fully. It would not be needed in an actual server environment. Note the usage of the BlockingQueue.
class StaxSpliteratorimplements Spliterator , EndOfDocumentListener { private static final Logger LOGGER = LoggerFactory.getLogger(StaxSpliterator.class); private final BlockingQueue buffer; private AtomicBoolean endOfDocument = new AtomicBoolean(false); private static final long TIMEOUT = 100;; StaxSpliterator(BlockingQueue buffer) { this.buffer = buffer; } @Override public boolean tryAdvance(Consumer super T> action) { if (endOfDocument.get()) { return false; } T nonNullElement = null; LOGGER.trace("the buffer is empty, waiting for some time..."); try { nonNullElement = buffer.poll(TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } if (nonNullElement == null) { LOGGER.trace("terminating as received null after waiting"); return false; } action.accept(nonNullElement); return true; } @Override public Spliterator trySplit() { return null; } @Override public long estimateSize() { return Long.MAX_VALUE; } @Override public int characteristics() { return Spliterator.NONNULL | Spliterator.CONCURRENT; } @Override public void endOfDocument() { LOGGER.info("end of document event received"); endOfDocument.set(true); } }
This part is very simple. We actually stream a GZip file by using a GZIPInputStream:
public class XmlParserIT { private static final Logger LOGGER = LoggerFactory.getLogger(XmlParserIT.class); @Test public void testParse() throws XMLStreamException, IOException, InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); XmlParserxmlParser = new XmlParser ("T", LineItemRow.class, countDownLatch); Stream stream = xmlParser.parse(new GZIPInputStream(XmlParserIT.class.getResourceAsStream("/datasets/xml/www.cs.washington.edu/lineitem.xml.gz"))); Map > groupedByData = stream.collect(Collectors.groupingBy((LineItemRow row) -> { LOGGER.trace("grouping by: {}", row); return row.getOrderKey(); })); groupedByData.entrySet().parallelStream().map((Entry > entry) -> { int orderKey = entry.getKey(); DoubleStream doubleStream = entry.getValue().parallelStream().mapToDouble((LineItemRow row) -> { return row.getExtendedPrice(); }); LineItemRow aggregatedRow = new LineItemRow(); aggregatedRow.setOrderKey(orderKey); aggregatedRow.setExtendedPrice((float) doubleStream.sum()); return aggregatedRow; }).forEach((LineItemRow row) -> { LOGGER.info("new aggregated event: {}", row); }); countDownLatch.await(); } }
https://github.com/paawak/blog/tree/master/code/reactive-streams/spliterator-demo
I found some large xmls from the below location:
http://www.cs.washington.edu/research/xmldatasets/www/repository.html