top of page
  • Writer's pictureLeo Chashnikov

One Billion Row Challenge - view from sidelines

In the last couple of days I’ve been hearing, reading and poking around the 1 Billion Row Challenge (1BRC) - a ”contest” for Java / JVM developers to produce a piece of code that can read 1 billion row file and aggregate it in the shortest amount of time possible. I’ve never been much of a JVM performance expert, and, as a late arriver, decided to cheer from the sidelines, and also figure out the simplest / most common optimization techniques that are used to improve performance. Bonus points if they are generally applicable, and do not overcomplicate the solution.


DISCLAIMER: this post is based on a solution by arjenw (Arjen Wisse). I’ve copy-pasted it and slightly simplified (while sacrificing performance) for this article, but all credits for implementation goes to Arjen. His code was very easy to read and understand, it made the main performance ideas I was interested in obvious and easy to explain.

In this post I’ll provide a brief high-level explanation of the solution ideas. All performance measurements were done on my Macbook, they’re not comparable with the leaderboard in the contest, but provided just for comparison “what change allows for what improvement”.


To start with, let’s look into the dataset and what we need to do with it.

Our file has 1B rows of temperature measurements in format

StationName:42.0

Goal is to get a sorted list of all stations, along with min, max and average temperatures that were observed there. 


Baseline

Collector<Measurement, MeasurementAggregator, ResultRow> collector = 
	Collector.of(MeasurementAggregator::new		(a, m) -> {            
			a.min = Math.min(a.min, m.value);
			a.max = Math.max(a.max, m.value);
			a.sum += m.value			a.count++;        
		},       
	     (agg1, agg2) -> {  
			var res = new MeasurementAggregator(); 
			res.min = Math.min(agg1.min, agg2.min);
			res.max = Math.max(agg1.max, agg2.max);
			res.sum = agg1.sum + agg2.sum;
			res.count = agg1.count + agg2.count;
			return res;        
		},        
		agg -> { 
			return new ResultRow(agg.min, (Math.round(agg.sum * 10.0) / 10.0) / agg.count, agg.max);        
		});

Map<String, ResultRow> measurements = new TreeMap<>(Files.lines(Paths.get(FILE))
	.map(l -> new Measurement(l.split(";")))
	.collect(groupingBy(m -> m.station(), collector)));

Baseline implementation is as straightforward as possible - read file line by line, create an instance of Station class for each entry, merge results by adding values of stations with the same name. We calculate min and max right away on each merge, along with overall sum and count - to get average temperature once we processed all entries.

This implementation takes 192 seconds (3 minutes 12 seconds) to run.


Parallelize

As most machines today have more than one core, we can easily take advantage of it. How easy is it? Well, in the case of Java Streams you just need to change 2 lines of code for that.


Map<String, ResultRow> measurements = new TreeMap<>(Files.lines(Paths.get(FILE))
	.map(l -> new Measurement(l.split(";")))
	.parallel()
	.collect(groupingByConcurrent(m -> m.station(), collector)));

Measurement creation stays the same, our Collector also doesn’t need any changes - we just run our stream on all the cores we have.

With just this change - that, I’d argue, didn’t add any complexity to the code - we went from 192 seconds to just 81 seconds (1 minute 21 seconds).

Quite good for 2 lines. 


Interesting part - HashMap and chunks

Now let’s take a step back and look at what we can improve about our implementation.

We read the file line by line, and create a separate object for each line we read. So, 1B objects - that’s a lot. On the second step we anyway need to merge stations, till we have just 1 entry per station - so why not do it right away? We’d also prefer to distribute work to each processor core, so let’s split out file into chunks according to the number of cores available. 


var results = IntStream.range(0, segmentCount)
	.mapToObj(i -> parseSegment(file, fileSize, segmentSize, i))
	.reduce(StationList::merge)

In this code, each call to parseSegment function produces a separate StationList object, that has a HashMap inside of it, where keys are hashes of station names. For further improvements we can tune cache size from the start to avoid collisions, but even without that part we’ll get a great performance boost.

StationList::merge in reduce function just adds together two HashMaps, one element at a time.

Let’s just take a brief look into add function to see what’s happening inside


private static class StationList implements Iterable<Station> {    
// choose a value that _eliminates_ collisions on the test set.
// we want to set it to something big enough to not have collisions,
// but also we don't want memory waste, so just arbitrary huge (1B) won't work    
    private final static int MAX_ENTRY = 100_000;
    private final Station[] array = new Station[MAX_ENTRY];
    private int size = 0;
    public void add(byte[] data, int stationNameLength, int stationHash, int value) {
        add(
                stationHash, // just a hash of station name
                () -> new Station(data, stationNameLength, stationHash, value), // create a new Station if we need one 
               existing -> existing.append(value) // if this Station already exists in StationList, we'll append its measurement 
       );    }

    private void add(int hash, Supplier<Station> create, Consumer<Station> update) {
        var position = hash % MAX_ENTRY;
        Station existing;        // handling possible collisions by shifting position
        while ((existing = array[position]) != null && existing.hash != hash) {
            position = (position + 1) % MAX_ENTRY;
        }        // there was no such station, create one
        if (existing == null) {
            array[position] = create.get();
            size++;
        }
        else {
            // station with such name already in StationList, merge values
            update.accept(existing);
        }
    }
}

 

Even without any parallelization, this version executes in just 45 seconds. When we make our Stream in the main function parallel - we get to 3.5 seconds. So this option is ~64 times better than the baseline.


Miscellaneous


Here I would mention other small improvements, that, while making code more performant, making much less readable, so unless you really need a couple more seconds that it provides - I’d really advise you against doing such a thing in production.

Let’s look at implementation of actual segment parsing. We could’ve been reading file line by line, but it would mean that we first convert each line into String, then split out new String from it, as well as parse double numbers separately. You remember that our HashMap works on hashes? We’re perfectly fine calculating them directly from bytes. And we actually can store the name of the Station as bytes as well, until we need to print it out when we’re done with all the aggregations.

All in all, that comes down to function like this:


private static void parseAndAddStation(MappedByteBuffer bb, StationList stationList) {
    var buffer = new byte[100];
    byte b;
    var i = 0;
    int hash = 0;

    while ((b = bb.get()) != ';') {
    // we're calculating hash and recording name at the same time
        hash = hash * 31 + b;
        buffer[i++] = b;
    }

    int measurement = getMeasurement(bb);    
   stationList.add(buffer, i, Math.abs(hash), measurement);
}

I won’t be demonstrating the getMeasurement method here - suffice to say that we’re reading bytes one by one and checking their values to parse measurement in the most efficient way with some bit gymnastics. 


Conclusion


I wrote this post first and foremost for myself, to make sure that I understand the solution in full, and can explain it to an imaginary audience (let’s be honest, no real audience is reading this blog). Idea of HashMap and reducing the number of allocations is great exactly because it’s not new or unique - it’s a great technique that can work in many different cases and for different tasks. 

Simple parallelization is also very easily achievable, so if your task requires some CPU time - there’s little reason not to utilize it. Word of caution, as usual - measure before you optimise, and make sure you’re optimising the right part of your code. Profiler is your friend.

There are many awesome solutions provided to the challenge, very inventive and interesting - but I’m not nearly qualified enough to give any commentary or overview on them. I’m only in position to read and learn from the awesome community - and that’s what I’ll be doing now.

18,164 views0 comments

Comments


bottom of page