Online sorting of incoming streams of data

Print

The other day I came across an interesting problem.  The idea goes something like this:

There are a number of input streams that produce data, in order (each stream produces data with a timestamp; and the data in each stream are always reported in order). The task is to design an efficient algorithm to aggregate the values from all streams, and record, or display, them in the correct order.

One possible real-life scenario where this might be required would be a system of sensors recording data points and submitting them to a centralized server for processing. Each sensor would be recording the data points regularly but when transmitting the information, the timestamps of the events from the different sources will not be ordered in any way.

The first idea that comes to mind might be to read in the first value of each stream into an array, sort the array, record the values in the array, then proceed to the next set of data points and repeat the process. Unfortunately, this will not work because sorting only the first values of each stream only guarantees we'll have identified the first value but offers no guarantee about the second, third and so forth. We would have to add the next value from the stream providing the first value and sort again before we could record the second. To illustrate, here's an example:

      A                     B                      C                     D                       E
    ----------------------------------------------------------------------------------------------------------
      14                    10                     16                    13                      17
      16                    11                     17                    15                      19
      19                    16                     18                    17                      20

As it's easy to see, after sorting the first set of values, we would get the ordering:   10, 13, 14, 16, 17. However, the second value we should report is 11, not 13. So, each time we would pop a value from the sorted array we would have to know which stream it came from and insert the next value from that stream before sorting the array again to get the next value. Sorting the array this way seems like a terribly inefficient algorithm. Each array sort would take O(k log k) - where k is the number of streams. And for every value we would need to sort again so the total run time would be something on the order of O(n k log k), where n is the total number of values.

We could improve the algorithm if we could maintain a sorted data structure of the top values of each stream, and efficiently insert the next value. In fact we don't really care about the top values being sorted at all. We only want to easily retrieve the smallest of them, and to be able to remove it and insert another value without much penalty.

The perfect data structure for the job seems to be a min-heap. We could build the heap with the top values of each stream. Reading the top value from the heap will always give the smallest value and replacing it will be an O(log k) operation. We would only need to keep track of which stream the value came from, record it and replace it with the next value from that stream. This way the time complexity is only O(n * log k) for the all the values.

On the example above, running this algorithm would produce something like this:

 build min-heap                // (14, 10, 16, 13, 17)
  while (heap not empty)
       record min value          // ==> 10
       if (stream not empty)
           replace top value in min-heap with the next value from the stream  // 11

The heap can be stored in an array like so:

       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 10.. 13.. 16.. 14.. 17..             ]

Replacing 10 with the next value, and the sifting it down would look like this:

       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 11.. 13.. 16.. 14.. 17..             ]

Popping 11, the next value would then be 16 (also from stream B) ..

       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 16.. 13.. 16.. 14.. 17..             ]
       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 13.. 16.. 16.. 14.. 17..             ]
       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 13.. 14.. 16.. 16.. 17..             ]

At that point, the next value to be reported would be 13 - from stream D, so the next value to be inserted in the min-heap would be 15

       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 13.. 14.. 16.. 16.. 17..             ]
       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 15.. 14.. 16.. 16.. 17..             ]
       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   .. 14.. 15.. 16.. 16.. 17..             ]

The only detail to be settled now is how to determine the source stream for the values in the heap. This is needed in order to know where the next value to insert into the heap should come from.  One potential solution would be to encode the timestamp and stream index into an object and store references in the heap.  Another would be to store the index of the stream in the heap, but perform all comparisons on the top value of the referenced streams.

For example, the initial contents of the heap would then be:

       H:  [ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ][ 6 ][ 7 ][...]
           [   ..  B..  D..  C..  A..  E..             ]

 

is the founder of Donaq, a software development consulting company with a focus on mobility. You can find Mike on Google+ and on LinkedIn.
Design copyright (c) Miky Dinescu