Showing posts with label algorithms. Show all posts
Showing posts with label algorithms. Show all posts

Monday, October 7, 2013

Optimization Tips & Tricks used by MimeKit: Part 2

In my previous blog post, I talked about optimizing the most critical loop in MimeKit's MimeParser by:

  • Extending our read buffer by an extra byte (which later became 4 extra bytes) that I could set to '\n', allowing me to do the bounds check after the loop as opposed to in the loop, saving us roughly half the instructions.
  • Unrolling the loop in order to check for 4 bytes at a time for that '\n' by using some bit twiddling hacks (for 64-bit systems, we might gain a little more performance by checking 8 bytes at a time).

After implementing both of those optimizations, the time taken for MimeKit's parser to parse nearly 15,000 messages in a ~1.2 gigabyte mbox file dropped from around 10s to about 6s on my iMac with Mono 3.2.3 (32-bit). That is a massive increase in performance.

Even after both of those optimizations, that loop is still the most critical loop in the parser and the MimeParser.ScanContent() method, which contains it, is still the most critical method of the parser.

While the loop itself was a huge chunk of the time spent in that method, the next largest offender was writing the content of the MIME part into a System.IO.MemoryStream.

MemoryStream, for those that aren't familiar with C#, is just what it sounds like it is: a stream backed by a memory buffer (in C#, this happens to be a byte array). By default, a new MemoryStream starts with a buffer of about 256 bytes. As you write more to the MemoryStream, it resizes its internal memory buffer to either the minimum size needed to hold the its existing content plus whatever number of bytes your latest Write() was called with or double the current internal buffer size, whichever is larger.

The performance problem here is that for MIME parts with large amounts of content, that buffer will be resized numerous times. Each time that buffer is resized, due to the way C# works, it will allocate a new buffer, zero the memory, and then copy the old content over to the new buffer. That's a lot of copying and creates a situation where the write operation can become exponentially worse as the internal buffer gets larger. Since MemoryStream contains a GetBuffer() method, its internal buffer really has to be a single contiguous block of memory. This means that there's little we could do to reduce overhead of zeroing the new buffer every time it resizes beyond trying to come up with a different formula for calculating the next optimal buffer size.

At first I decided to try the simple approach of using the MemoryStream constructor that allows specifying an initial capacity. By bumping up the initial capacity to 2048 bytes, things did improve, but only by a very disappointing amount. Larger initial capacities such as 4096 and 8192 bytes also made very little difference.

After brainstorming with my coworker and Mono runtime hacker, Rodrigo Kumpera, we decided that one way to solve this performance problem would be to write a custom memory-backed stream that didn't use a single contiguous block of memory, but instead used a list of non-contiguous memory blocks. When this stream needed to grow its internal memory storage, all it would need to do is allocate a new block of memory and append it to its internal list of blocks. This would allow for minimal overhead because only the new block would need to be zeroed and no data would need to be re-copied, ever. As it turns out, this approach would also allow me to limit the amount of unused memory used by the stream.

I dubbed this new memory-backed stream MimeKit.IO.MemoryBlockStream. As you can see, the implementation is pretty trivial (doesn't even require scary looking bit twiddling hacks like my previous optimization), but it made quite a difference in performance. By using this new memory stream, I was able to shave a full second off of the time needed to parse that mbox file I mentioned earlier, getting the total time spent down to 5s. That's starting to get pretty respectable, performance-wise.

As a comparison, let's compare the performance of MimeKit with what seems to be the 2 most popular C# MIME parsers out there (OpenPOP.NET and SharpMimeTools) and see how we do. I've been hyping up the performance of MimeKit a lot, so it had better live up to expectations, right? Let's see if it does.

Now, since none of the other C# MIME parsers I could find support parsing the Unix mbox file format, we'll write some test programs to parse the same message stream over and over (say, 20 thousand times) to compare MimeKit to OpenPOP.NET.

Here's the test program I wrote for OpenPOP.NET:

using System;
using System.IO;
using System.Diagnostics;
using OpenPop.Mime;

namespace OpenPopParser {
    class Program
    {
        public static void Main (string[] args)
        {
            var stream = File.OpenRead (args[0]);
            var stopwatch = new Stopwatch ();

            stopwatch.Start ();
            for (int i = 0; i < 20000; i++) {
                var message = Message.Load (stream);
                stream.Position = 0;
            }
            stopwatch.Stop ();

            Console.WriteLine ("Parsed 20,000 messages in {0}", stopwatch.Elapsed);
        }
    }
}

Here's the SharpMimeTools parser I wrote for testing:

using System;
using System.IO;
using System.Diagnostics;
using anmar.SharpMimeTools;

namespace SharpMimeParser {
    class Program
    {
        public static void Main (string[] args)
        {
            var stream = File.OpenRead (args[0]);
            var stopwatch = new Stopwatch ();

            stopwatch.Start ();
            for (int i = 0; i < 20000; i++) {
                var message = new SharpMessage (stream);
                stream.Position = 0;
            }
            stopwatch.Stop ();

            Console.WriteLine ("Parsed 20,000 messages in {0}", stopwatch.Elapsed);
        }
    }
}

And here is the test program I used for MimeKit:

using System;
using System.IO;
using System.Diagnostics;
using MimeKit;

namespace MimeKitParser {
    class Program
    {
        public static void Main (string[] args)
        {
            var stream = File.OpenRead (args[0]);
            var stopwatch = new Stopwatch ();

            stopwatch.Start ();
            for (int i = 0; i < 20000; i++) {
                var parser = new MimeParser (stream, MimeFormat.Default);
                var message = parser.ParseMessage ();
                stream.Position = 0;
            }
            stopwatch.Stop ();

            Console.WriteLine ("Parsed 20,000 messages in {0}", stopwatch.Elapsed);
        }
    }
}

Note: Unfortunately, OpenPOP.NET's message parser completely failed to parse the Star Trek message I pulled out of my test suite at random (first message in the jwz.mbox.txt file included in MimeKit's UnitTests project) due to the Base64 decoder not liking some byte or another in the stream, so I had to patch OpenPOP.NET to no-op its base64 decoder (which, if anything, should make it faster).

And here are the results running on my 2011 MacBook Air:

[fejj@localhost OpenPopParser]$ mono ./OpenPopParser.exe ~/Projects/MimeKit/startrek.msg
Parsed 20,000 messages in 00:06:26.6825190

[fejj@localhost SharpMimeParser]$ mono ./SharpMimeParser.exe ~/Projects/MimeKit/startrek.msg
Parsed 20,000 messages in 00:19:30.0402064

[fejj@localhost MimeKit]$ mono ./MimeKitParser.exe ~/Projects/MimeKit/startrek.msg
Parsed 20,000 messages in 00:00:15.6159326

Whooooosh!

Not. Even. Close.

MimeKit is nearly 25x faster than OpenPOP.NET even after making its base64 decoder a no-op and 75x faster than SharpMimeTools.

Since I've been ranting against C# MIME parsers that made heavy use of regex, let me show you just how horrible regex is for parsing messages (performance-wise). There's a C# MIME parser called MIMER that is nearly pure regex, so what better library to illustrate my point? I wrote a very similar loop to the other 2 that I listed above, so I'm not going to bother repeating it again. Instead, I'll just skip to the results:

[fejj@localhost MimerParser]$ mono ./MimerParser.exe ~/Projects/MimeKit/startrek.msg
Parsed 20,000 messages in 00:16:51.4839129

Ouch. MimeKit is roughly 65x faster than a fully regex-based MIME parser. It's actually rather pathetic that this regex parser beats SharpMimeTools.

This is why, as a developer, it's important to understand the limitations of the tools you decide to use. Regex is great for some things but it is a terrible choice for others. As Jamie Zawinski might say,

Some people, when confronted with a problem, think “I know, I'll use regular expressions.” Now they have two problems.

Monday, September 30, 2013

Optimization Tips & Tricks used by MimeKit: Part 1

One of the goals of MimeKit, other than being the most robust MIME parser, is to be the fastest C# MIME parser this side of the Mississippi. Scratch that, fastest C# MIME parser in the World.

Seriously, though, I want to get MimeKit to be as fast and efficient as my C parser, GMime, which is one of the fastest (if not the fastest) MIME parsers out there right now, and I don't expect that any parser is likely to smoke GMime anytime soon, so using it as a baseline to compare against means that I have a realistic goal to set for MimeKit.

Now that you know the why, let's examine the how.

First, I'm using one of those rarely used features of C#: unsafe pointers. While that alone is not all that interesting, it's a corner stone for one of the main techniques I've used. In C#, the fixed statement (which is how you get a pointer to a managed object) pins the object to a fixed location in memory to prevent the GC from moving that memory around while you operate on that buffer. Keep in mind, though, that telling the GC to pin a block of memory is not free, so you should not use this feature without careful consideration. If you're not careful, using pointers could actually make your code slower. Now that we've got that out of the way...

MIME is line-based, so a large part of every MIME parser is going to be searching for the next line of input. One of the reasons most MIME parsers (especially C# MIME parsers) are so slow is because they use a ReadLine() approach and most TextReaders likely use a naive algorithm for finding the end of the current line (as well as all of the extra allocating and copying into a string buffer):

    // scan for the end of the line
    while (inptr < inend && *inptr != (byte) '\n')
        inptr++;

The trick I used in GMime was to make sure that my read buffer was 1 byte larger than the max number of bytes I'd ever read from the underlying stream at a given time. This allowed me to set the first byte in the buffer beyond the bytes I just read from the stream to '\n', thus allowing for the ability to remove the inptr < inend check, opting to do the bounds check after the loop has completed instead. This nearly halves the number of instructions used per loop, making it much, much faster. So, now we have:

    // scan for the end of the line
    while (*inptr != (byte) '\n')
        inptr++;

But is that the best we can do?

Even after using this trick, it was still the hottest loop in my parser:

We've got no choice but to use a linear scan, but that doesn't mean that we can't do it faster. If we could somehow reduce the number of loops and likewise reduce the number of pointer increments, we could eliminate a bunch of the overhead of the loop. This technique is referred to as loop unrolling. Here's what brianonymous (from the ##csharp irc channel on freenode) and I came up with (with a little help from Sean Eron Anderson's bit twiddling hacks):

    uint* dword = (uint*) inptr;
    uint mask;

    do {
        mask = *dword++ ^ 0x0A0A0A0A;
        mask = ((mask - 0x01010101) & (~mask & 0x80808080));
    } while (mask == 0);

And here are the results of that optimization:

Now, keep in mind that on many architectures other than x86, in order to employ the trick above, inptr must first be 4-byte aligned (uint is 32bit) or it could cause a SIGBUS or worse, a crash. This is fairly easy to solve, though. All you need to do is increment inptr until you know that it is 4 byte aligned and then you can switch over to reading 4 bytes at a time as in the above loop. We'll also need to figure out which of those 4 bytes contained the '\n'. An easy way to solve that problem is to just linearly scan those 4 bytes using our previous single-byte-per-loop implementation starting at dword - 1. Here it is, your moment of Zen:

    // Note: we can always depend on byte[] arrays being
    // 4-byte aligned on 32bit and 64bit architectures
    int alignment = (inputIndex + 3) & ~3;
    byte* aligned = inptr + alignment;
    byte* start = inptr;
    uint mask;

    while (inptr < aligned && *inptr != (byte) '\n')
        inptr++;

    if (inptr == aligned) {
        // -funroll-loops
        uint* dword = (uint*) inptr;

        do {
            mask = *dword++ ^ 0x0A0A0A0A;
            mask = ((mask - 0x01010101) & (~mask & 0x80808080));
        } while (mask == 0);

        inptr = (byte*) (dword - 1);
        while (*inptr != (byte) '\n')
            inptr++;
    }

Note: In this above code snippet, 'inputIndex' is the byte offset of 'inptr' into the byte array. Since we can safely assume that index 0 is 4-byte aligned, we can do a simple calculation to get the next multiple of 4 and add that to our 'inptr' to get the next 4-byte aligned pointer.

That's great, but what does all that hex mumbo jumbo do? And why does it work?

Let's go over this 1 step at a time...

    mask = *dword++ ^ 0x0A0A0A0A;

This xor's the value of dword with 0x0A0A0A0A (0x0A0A0A0A is just 4 bytes of '\n'). The xor sets every byte that is equal to 0x0A to 0 in mask. Every other byte will be non-zero.

    mask - 0x01010101

When we subtract 0x01010101 from mask, the result will be that only bytes greater than 0x80 will contain any high-order bits (and any byte that was originally 0x0A in our input will now be 0xFF).

    ~mask & 0x80808080

This inverts the value of mask resulting in no bytes having the highest bit set except for those that had a 0 in that slot before (including the byte we're looking for). By then bitewise-and'ing it with 0x80808080, we get 0x80 for each byte that was originally 0x0A in our input or otherwise had the highest bit set after the bit inversion.

Because there's no way for any byte to have the highest bit set in both sides of the encompassing bitwise-and except for the character we're looking for (0x0A), the mask will always be 0 unless any of the bytes within were originally 0x0A, which would then break us out of the loop.

Well, that concludes part 1 as it is time for me to go to bed so I can wake up at a reasonable time tomorrow morning.

Good night!

Sunday, September 15, 2013

Time for a rant on mime parsers...

Warning: Viewer discretion is advised.

Where should I begin?

I guess I should start by saying that I am obsessed with MIME and, in particular, MIME parsers. No, really. I am obsessed. Don't believe me? I've written and/or worked on several MIME parsers at this point. It started off in my college days working on Spruce which had a horrendously bad MIME parser, and so as you read farther along in my rant about shitty MIME parsers, keep in mind: I've been there, I've written a shitty MIME parser.

As a handful of people are aware, I've recently started implementing a C# MIME parser called MimeKit. As I work on this, I've been searching around on GitHub and Google to see what other MIME parsers exist out there to find out what sort of APIs they provide. I thought perhaps I'll find one that offers a well-designed API that will inspire me. Perhaps, by some miracle, I'd find one that was actually pretty good that I could just contribute to instead of writing my own from scratch (yea, wishful thinking). Instead, all I have found are poorly designed and implemented MIME parsers, many probably belong on the front page of the Daily WTF.

I guess I'll start with some softballs.

First, there's the fact that every single one of them were written as System.String parsers. Don't be fooled by the ones claiming to be "stream parsers", because all any of those did was to slap a TextReader on top of the byte stream and start using reader.ReadLine(). What's so bad about that, you ask? For those not familiar with MIME, I'd like for you to take a look at the raw email sources in your inboxes particularly if you have correspondence with anyone outside of the US. Hopefully most of your friends and colleagues are using more-or-less MIME compliant email clients, but I guarantee you'll find at least a few emails with raw 8bit text.

Now, if the language they were using was C or C++, they might be able to get away with doing this because they'd technically be operating on byte arrays, but with Java and C#, a 'string' is a unicode string. Tell me: how does one get a unicode string from a raw byte array?

Bingo. You need to know the charset before you can convert those bytes into unicode characters.

To be fair, there's really no good way of handling raw 8bit text in message headers, but by using a TextReader approach, you are really limiting the possibilities.

Next up is the ReadLine() approach. One of the 2 early parsers in GMime (pan-mime-parser.c back in the version 0.7 days) used a ReadLine() approach, so I understand the thinking behind this. And really, there's nothing wrong with this approach as far as correctness goes, it's more of a "this can never be fast" complaint. Of the two early parsers in GMime, the pan-mime-parser.c backend was horribly slow compared to the in-memory parser. Of course, that's not very surprising. More surprising to me at the time was that when I wrote GMime's current generation of parser (sometime between v0.7 and v1.0), it was just as fast as the in-memory parser ever was and only ever had up to 4k in a read buffer at any given time. My point is, there are far better approaches than ReadLine() if you want your parser to be reasonably performant... and why wouldn't you want that? Your users definitely want that.

Okay, now come the more serious problems that I encountered in nearly all of the mime parser libraries I found.

I think that every single mime parser I've found so far uses the "String.Split()" approach for parsing address headers and/or for parsing parameter lists on headers such as Content-Type and Content-Disposition.

Here's an example from one C# MIME parser:

string[] emails = addressHeader.Split(',');

Here's how this same parser decodes encoded-word tokens:

private static void DecodeHeaders(NameValueCollection headers)
{
    ArrayList tmpKeys = new ArrayList(headers.Keys);

    foreach (string key in headers.AllKeys)
    {
        //strip qp encoding information from the header if present
        headers[key] = Regex.Replace(headers[key].ToString(), @"=\?.*?\?Q\?(.*?)\?=",
            new MatchEvaluator(MyMatchEvaluator), RegexOptions.IgnoreCase | RegexOptions.Multiline);
        headers[key] = Regex.Replace(headers[key].ToString(), @"=\?.*?\?B\?(.*?)\?=",
            new MatchEvaluator(MyMatchEvaluatorBase64), RegexOptions.IgnoreCase | RegexOptions.Multiline);
    }
}

private static string MyMatchEvaluator(Match m)
{
    return DecodeQP(m.Groups[1].Value);
}

private static string MyMatchEvaluatorBase64(Match m)
{
    System.Text.Encoding enc = System.Text.Encoding.UTF7;
    return enc.GetString(Convert.FromBase64String(m.Groups[1].Value));
}

Excuse my language, but what the fuck? It completely throws away the charset in each of those encoded-word tokens. In the case of quoted-printable tokens, it assumes they are all ASCII (actually, latin1 may work as well?) and in the case of base64 encoded-word tokens, it assumes they are all in UTF-7!?!? Where in the world did he get that idea? I can't begin to imagine his code working on any base64 encoded-word tokens in the real world. If anything is deserving of a double facepalm, this is it.

I'd just like to point out that this is what this project's description states:

A small, efficient, and working mime parser library written in c#.
...
I've used several open source mime parsers before, but they all either
fail on one kind of encoding or the other, or miss some crucial
information. That's why I decided to finally have a go at the problem
myself.

I'll grant you that his MIME parser is small, but I'd have to take issue with the "efficient" and "working" adjectives. With the heavy use of string allocations and regex matching, it could hardly be considered "efficient". And as the code pointed out above illustrates, "working" is a bit of an overstatement.

Folks... this is what you get when you opt for a "lightweight" MIME parser because you think that parsers like GMime are "bloated".

On to parser #2... I like to call this the "Humpty Dumpty" approach:

public static StringDictionary parseHeaderFieldBody ( String field, String fieldbody ) {
    if ( fieldbody==null )
        return null;
    // FIXME: rewrite parseHeaderFieldBody to being regexp based.
    fieldbody = SharpMimeTools.uncommentString (fieldbody);
    StringDictionary fieldbodycol = new StringDictionary ();
    String[] words = fieldbody.Split(new Char[]{';'});
    if ( words.Length>0 ) {
        fieldbodycol.Add (field.ToLower(), words[0].ToLower().Trim());
        for (int i=1; i<words.Length; i++ ) {
            String[] param = words[i].Trim(new Char[]{' ', '\t'}).Split(new Char[]{'='}, 2);
            if ( param.Length==2 ) {
                param[0] = param[0].Trim(new Char[]{' ', '\t'});
                param[1] = param[1].Trim(new Char[]{' ', '\t'});
                if ( param[1].StartsWith("\"") && !param[1].EndsWith("\"")) {
                    do {
                        param[1] += ";" + words[++i];
                    } while ( !words[i].EndsWith("\"") && i<words.Length);
                }
                fieldbodycol.Add ( param[0], SharpMimeTools.parserfc2047Header (param[1].TrimEnd(';').Trim('\"', ' ')) );
            }
        }
    }
    return fieldbodycol;
}

I'll give this guy some credit, at least he saw that his String.Split() approach was flawed and so tried to compensate by piecing Humpty Dumpty back together again. Of course, with his String.Trim()ing, he just won't be able to put him back together again with any level of certainty. The white space in those quoted tokens may have significant meaning.

Many of the C# MIME parsers out there like to use Regex all over the place. Here's a snippet from one parser that is entirely written in Regex (yea, have fun maintaining that...):

if (m_EncodedWordPattern.RegularExpression.IsMatch(field.Body))
{
    string charset = m_CharsetPattern.RegularExpression.Match(field.Body).Value;
    string text = m_EncodedTextPattern.RegularExpression.Match(field.Body).Value;
    string encoding = m_EncodingPattern.RegularExpression.Match(field.Body).Value;

    Encoding enc = Encoding.GetEncoding(charset);

    byte[] bar;

    if (encoding.ToLower().Equals("q"))
    {
        bar = m_QPDecoder.Decode(ref text);
    }
    else
    {
        bar = m_B64decoder.Decode(ref text);
    }                    
    text = enc.GetString(bar);

    field.Body = Regex.Replace(field.Body,
        m_EncodedWordPattern.TextPattern, text);
    field.Body = field.Body.Replace('_', ' ');
}

Let's pretend that the regex pattern strings are correct in their definitions (because they are god-awful to read and I can't be bothered to double-check them), the replacing of '_' with a space is wrong (it should only be done in the "q" case) and the Regex.Replace() is just evil. Not to mention that there could be multiple encoded-words per field.Body which this code utterly fails to handle.

Guys. I know you love regular expressions and that they are very very useful, but they are no substitute for writing a real tokenizer. This is especially true if you want to be lenient in what you accept (and in the case of MIME, you really need to be).

Thursday, April 7, 2011

Optimizing Merge Sort

A number of years ago I wrote about the Merge Sort algorithm. One of the advantages of Merge Sort is that it is a stable sort, meaning that elements that compare as being equal remain in their original order after being sorted.

Well, today I had need of employing a stable sorting routine for sorting elements by a ZIndex in Moonlight. Up until today, we had been using qsort() which, while not guaranteed to be a stable sort on any platform, happens to be implemented in glibc as a stable sort except in out-of-memory conditions. Since we'd like Moonlight to work on platforms other than Linux+glibc (such as Mac OS or BSD), it has become important enough to implement properly.

To start, I dusted off my generic MergeSort() implementation from years ago when I was writing articles about various sorting algorithms. This is what I had to start with:

#define MID(lo, hi) (lo + ((hi - lo) >> 1))

static void
msort (void *array, void *buf, size_t low, size_t high, size_t size,
       int (* compare) (const void *, const void *))
{
    register char *lo, *hi, *b;
    char *al, *am, *ah;
    size_t mid;
    
    mid = MID (low, high);
    
    if (mid + 1 < high)
        msort (array, buf, mid + 1, high, size, compare);
    
    if (mid > low)
        msort (array, buf, low, mid, size, compare);
    
    ah = ((char *) array) + ((high + 1) * size);
    am = ((char *) array) + ((mid + 1) * size);
    al = ((char *) array) + (low * size);
    
    b = (char *) buf;
    lo = al;
    hi = am;
    
    while (lo < am && hi < ah) {
        if (compare (lo, hi) <= 0) {
            memcpy (b, lo, size);
            lo += size;
        } else {
            memcpy (b, hi, size);
            hi += size;
        }
        
        b += size;
    }
    
    if (lo < am)
        memcpy (b, lo, am - lo);
    else if (hi < ah)
        memcpy (b, hi, (ah + size) - hi);
    
    memcpy (al, buf, ah - al);
}

int
MergeSort (void *base, size_t nmemb, size_t size,
           int (* compare) (const void *, const void *))
{
    void *tmp;
    
    if (nmemb < 2)
        return 0;
    
    if (!(tmp = malloc (nmemb * size))) {
        errno = ENOMEM;
        return -1;
    }
    
    msort (base, tmp, 0, nmemb - 1, size, compare);
    
    free (tmp);
    
    return 0;
}

Since performance is very important, I clocked this implementation against qsort() and got the following results on my Intel Core2 Quad Q6600 2.4 GHz machine using arrays of 10 million ints:

  • Randomized input: 14.13s vs qsort()'s 6.77s
  • Sorted input: 4.41s vs qsort()'s 1.54s
  • Reversed input: 4.26s vs qsort()'s 1.90s

Clearly the above MergeSort() implementation did not fare well against glibc's qsort() on my system, so it was time to look at what I could do to improve the performance.

The most obvious optimization I could see was to try and batch my memcpy() calls. In other words, instead of calling memcpy() to copy each and every element into our temporary buffer, it'd be more efficient to copy blocks of elements at a time:

static void
msort (void *array, void *buf, size_t low, size_t high, size_t size,
       int (* compare) (const void *, const void *))
{
    char *al, *am, *ah, *ls, *hs, *lo, *hi, *b;
    size_t mid;
    
    mid = MID (low, high);
    
    if (mid + 1 < high)
        msort (array, buf, mid + 1, high, size, compare);
    
    if (mid > low)
        msort (array, buf, low, mid, size, compare);
    
    ah = ((char *) array) + ((high + 1) * size);
    am = ((char *) array) + ((mid + 1) * size);
    al = ((char *) array) + (low * size);
    
    b = (char *) buf;
    lo = al;
    hi = am;
    
    do {
        ls = lo;
        hs = hi;
        
        if (lo > al || hi > am) {
            /* our last loop already compared lo & hi and found lo <= hi */
            lo += size;
        }
        
        while (lo < am && compare (lo, hi) <= 0)
            lo += size;
 
        if (lo > ls) {
            memcpy (b, ls, lo - ls);
            b += (lo - ls);
        }
 
        if (lo < am) {
            /* our last compare tells us hi < lo */
            hi += size;
            
            while (hi < ah && compare (hi, lo) < 0)
                hi += size;
            
            memcpy (b, hs, hi - hs);
            b += (hi - hs);
        }
    } while (lo < am && hi < ah);
    
    if (lo < am)
        memcpy (b, lo, am - lo);
    else if (hi < ah)
        memcpy (b, hi, ah - hi);
    
    memcpy (al, buf, ah - al);
}

The results were promising. For the exact same inputs (including the exact same random array), we now get:

  • Randomized input: 10.45s
  • Sorted input: 2.08s
  • Reversed input: 2.03s

The only other way that we can reduce the number of memcpy() calls we make is to avoid copying leading and trailing elements into our temporary buffer if it's not necessary to merge them. Here's the solution I came up with:

static void
msort (void *array, void *buf, size_t low, size_t high, size_t size,
       int (* compare) (const void *, const void *))
{
    char *a1, *al, *am, *ah, *ls, *hs, *lo, *hi, *b;
    size_t copied = 0;
    size_t mid;
    
    mid = MID (low, high);
    
    if (mid + 1 < high)
        msort (array, buf, mid + 1, high, size, compare);
    
    if (mid > low)
        msort (array, buf, low, mid, size, compare);
    
    ah = ((char *) array) + ((high + 1) * size);
    am = ((char *) array) + ((mid + 1) * size);
    a1 = al = ((char *) array) + (low * size);
    
    b = (char *) buf;
    lo = al;
    hi = am;
    
    do {
        ls = lo;
        hs = hi;
        
        if (lo > al || hi > am) {
            /* our last loop already compared lo & hi and found lo <= hi */
            lo += size;
        }
        
        while (lo < am && compare (lo, hi) <= 0)
            lo += size;
        
        if (lo < am) {
            if (copied == 0) {
                /* avoid copying the leading items */
                a1 = lo;
                ls = lo;
            }
            
            /* our last compare tells us hi < lo */
            hi += size;
            
            while (hi < ah && compare (hi, lo) < 0)
                hi += size;
            
            if (lo > ls) {
                memcpy (b, ls, lo - ls);
                copied += (lo - ls);
                b += (lo - ls);
            }
            
            memcpy (b, hs, hi - hs);
            copied += (hi - hs);
            b += (hi - hs);
        } else if (copied) {
            memcpy (b, ls, lo - ls);
            copied += (lo - ls);
            b += (lo - ls);
            
            /* copy everything we needed to re-order back into array */
            memcpy (a1, buf, copied);
            return;
        } else {
            /* everything already in order */
            return;
        }
    } while (hi < ah);
    
    if (lo < am) {
        memcpy (b, lo, am - lo);
        copied += (am - lo);
    }
    
    memcpy (a1, buf, copied);
}

Once again, reducing the amount of copying paid off:

  • Randomized input: 9.80s
  • Sorted input: 0.95s
  • Reversed input: 2.05s

Update 2011-05-18: One final optimization that can be tried is pre-calculating the optimum way to copy elements between buffers. This calculation, while not terribly expensive itself, adds up with every call to memcpy(). Let's start off by writing some handy macros:

#define COPYBY(TYPE, a, b, n) {         \
    long __n = (n) / sizeof (TYPE);     \
    register TYPE *__a = (TYPE *) (a);  \
    register TYPE *__b = (TYPE *) (b);  \
                                        \
    do {                                \
        *__a++ = *__b++;                \
    } while (--__n > 0);                \
}

#define MEMCOPY(dest, src, n) {                 \
    switch (copy_mode) {                        \
    case 1: COPYBY (long, dest, src, n); break; \
    case 2: COPYBY (int, dest, src, n); break;  \
    default: memcpy (dest, src, n);             \
    }                                           \
}

Now that these handy macros are written, we can plug them into our Merge Sort implementation:

static void
msort (void *array, void *buf, size_t low, size_t high, size_t size,
       int copy_mode, int (* compare) (const void *, const void *))
{
    char *a1, *al, *am, *ah, *ls, *hs, *lo, *hi, *b;
    size_t copied = 0;
    size_t mid;
    
    mid = MID (low, high);
    
    if (mid + 1 < high)
        msort (array, buf, mid + 1, high, size, compare);
    
    if (mid > low)
        msort (array, buf, low, mid, size, compare);
    
    ah = ((char *) array) + ((high + 1) * size);
    am = ((char *) array) + ((mid + 1) * size);
    a1 = al = ((char *) array) + (low * size);
    
    b = (char *) buf;
    lo = al;
    hi = am;
    
    do {
        ls = lo;
        hs = hi;
        
        if (lo > al || hi > am) {
            /* our last loop already compared lo & hi and found lo <= hi */
            lo += size;
        }
        
        while (lo < am && compare (lo, hi) <= 0)
            lo += size;
        
        if (lo < am) {
            if (copied == 0) {
                /* avoid copying the leading items */
                a1 = lo;
                ls = lo;
            }
            
            /* our last compare tells us hi < lo */
            hi += size;
            
            while (hi < ah && compare (hi, lo) < 0)
                hi += size;
            
            if (lo > ls) {
                MEMCOPY (b, ls, lo - ls);
                copied += (lo - ls);
                b += (lo - ls);
            }
            
            MEMCOPY (b, hs, hi - hs);
            copied += (hi - hs);
            b += (hi - hs);
        } else if (copied) {
            MEMCOPY (b, ls, lo - ls);
            copied += (lo - ls);
            b += (lo - ls);
            
            /* copy everything we needed to re-order back into array */
            MEMCOPY (a1, buf, copied);
            return;
        } else {
            /* everything already in order */
            return;
        }
    } while (hi < ah);
    
    if (lo < am) {
        MEMCOPY (b, lo, am - lo);
        copied += (am - lo);
    }
    
    MEMCOPY (a1, buf, copied);
}

int
MergeSort (void *base, size_t nmemb, size_t size,
           int (* compare) (const void *, const void *))
{
    int copy_mode;
    void *tmp;
    
    if (nmemb < 2)
        return 0;
    
    if (!(tmp = malloc (nmemb * size))) {
        errno = ENOMEM;
        return -1;
    }
    
    if ((((char *) base) - ((char *) 0)) % sizeof (long) == 0 && (size % sizeof (long)) == 0)
        copy_mode = 1;
    else if ((((char *) base) - ((char *) 0)) % sizeof (int) == 0 && (size % sizeof (int)) == 0)
        copy_mode = 2;
    else
        copy_mode = 0;
    
    msort (base, tmp, 0, nmemb - 1, size, copy_mode, compare);
    
    free (tmp);
    
    return 0;
}

This handy trick seems to have worked out rather well:

  • Randomized input: 7.79s
  • Sorted input: 0.99s
  • Reversed input: 1.69s

At this point, I can't think of any other obvious optimizations so I'm going to call it a day.

For a recap, here are the results of all 4 implementations compared side-by-side with the results from qsort():

qsort()msort() v1msort() v2msort() v3msort() v4
random:6.7714.1310.459.807.79
sorted:1.544.412.080.950.99
reversed:1.904.262.032.051.69

Saturday, February 28, 2009

Text Layout Engines

As many of my loyal followers know, I wrote a really fast text layout engine for Moonlight 1.0 which was able to layout text in more-or-less a single pass over the string. Hard to do better than that, especially with my superbly (I'm allowed to stroke my own ego, right?) designed font/glyph caches.

That said, the code had also been superbly disgusting and unmaintainable. Made worse when I had to add hacks to render text selection (Silverlight 1.0's TextBlock is like a GtkLabel in that it just renders text, but Silverlight 2.0's TextBox supports editing and selection and so is therefor more akin to a multi-line GtkEntry widget).

Well, Thursday night, as I was watching House on Hulu, I had one of those "House moments" where he suddenly realizes what the patient is suffering from and how to solve the problem (usually when his friend, Wilson, is talking to him about something random).

I spent all day yesterday (and I mean all day, until 11pm last night) putting together my thoughts for a new design and working out the details and I think I now have a vastly improved solution that not only uses less memory in all but the pathological cases (I now use a UTF-8 string instead of a UCS4 string), but also doesn't require:

  • a pass over the text to break on CR/LF to construct a list of text runs which were what the old TextLayout engine I wrote used instead of a char* (because the layout engine now handles CR/LFs)
  • a whole new set of text runs every time selection boundaries change in a TextBox (because selection is no longer represented by text runs)

Of course, the same brilliance of the old design still apply: no need to re-layout when most text properties (underline, foreground, background, etc) change (obviously we still have to re-layout if font properties change because they change the metrics).

With my new design, my TextLayout class has a Select() method which allows the consumer to change the selected region of text. When you change the selection, my new logic can simply clear the cached glyph clusters for the affected area(s).

A "glyph cluster" is a cached (sub)run of glyphs in a particular text run. A "text run" is a substring of text that share all the same text attributes which does not span across line boundaries.

To break it down, a layout contains a list of lines. Each line contains a list of runs. Each run contains a list of glyph clusters.

Normally, a run will consist of only a single glyph cluster unless it overlaps the selection.

For example, if the first half of the run is within the selection, then the run will contain 2 glyph clusters (one for the selected portion and one for the non-selected portion). However, if the selection is fully contained within a single run but doesn't span the entire run, then it's possible to have up to 3 glyph clusters for that run: pre-selection, selection, post-selection.

The brilliance of doing it this way is that it simplifies keeping track of kerning between selected regions, so that as you drag your selection across some text, the text following your mouse cursor doesn't appear to "jump" to the left or right as you move between characters that are kerned.

Sunday, December 14, 2008

Fun with Tries

A number of years ago, Michael Zucchi introduced me to the Aho-Corasick algorithm which he implemented in Evolution for the "Find in Message" feature. Later, I ended up extracting that logic out and refactoring it a bit into the ETrie class which I ended up using to replace the regex logic (which is common among most GNOME apps that do this) to scan for urls in the message body (in order to insert hyperlinks, etc). This turned out to be an order of magnitude faster.

For those who aren't in the know, the Aho-Corasick algorithm is for searching for multiple strings in a given input. In other words, it searches for one of multiple needles in a single haystack.

The pseudocode for the search itself would look something like this:

q = root
FOR i = 1 TO n
  WHILE q != fail AND g(q, text[i]) == fail
    q = h(q)
  ENDWHILE
  IF q == fail
    q = root
  ELSE
    q = g(q, text[i])
  ENDIF
  IF isElement(q, final)
    RETURN TRUE
  ENDIF
ENDFOR
RETURN FALSE

Of course, ETrie has a slight modification to the above algorithm, which is that instead of returning TRUE or FALSE, it returns a pointer to the beginning of the match or NULL if it find no matches.

The problem with this particular implementation, though, is that it would simply return a pointer to the offset into the string of the first match found. But what if you had multiple pattern strings that started with identical characters?

Like the ETrie implementation, the traditional Aho-Corasick algorithm (if I am remembering correctly) stops searching as soon as it finds any match. The difference, of course, is that given access to the current to the state / tree structure, a programmer could choose to continue matching to see if there is a more greedy match. With ETrie, however, in the interest of simplicity, it just returned the first / shortest match it got to.

In case I'm explaining this badly, say our search patterns are "the", "there", and "therefor". In the case of ETrie, it would always return that it matched "the" even when the "the" that it matched was the starting substring of the larger string, "therefor".

In something I was working on more recently (in c++), I wanted a greedier match (e.g. I wanted it to match "therefor" instead of "the").

In order to achieve this, I modified the implementation a slight bit:

const char *
Trie::Search (const char *buf, size_t buflen, int *matched_id)
{
    const char *inptr, *inend, *prev, *pat;
    size_t matched = 0;
    TrieMatch *m = 0;
    TrieState *q;
    char c;
    
    inend = buf + buflen;
    inptr = buf;
    
    q = &root;
    pat = prev = inptr;
    while (inptr < inend) {
        c = *inptr++;
        
        if (icase && (c >= 'A' && c <= 'Z'))
            c += 0x20;
        
        while (q != 0 && (m = g (q, c)) == 0 && matched == 0)
            q = q->fail;
        
        if (q == &root) {
            if (matched)
                return pat;
            
            pat = prev;
        }
        
        if (q == 0) {
            if (matched)
                return pat;
            
            q = &root;
            pat = inptr;
        } else if (m != 0) {
            q = m->state;
            
            if (q->final > matched) {
                if (matched_id)
                    *matched_id = q->id;
                
                matched = q->final;
            }
        }
        
        prev = inptr;
    }
    
    return matched ? pat : 0;
}

I have published the full source code for trie.cpp and trie.h on my website for your perusal.

Friday, October 31, 2008

Optimizing Moonlight's InkPresenter

This past week I started out staring at endless amounts of javascript trying to figure out what it was supposed to do so that I could figure out what Moonlight was breaking on in order to fix some bugs. As you can imagine, this is a slow and boring process where one's eyes go dry and it feels like you are getting nowhere fast.

As occasionally happens, I give up and move onto the next bug hoping that the next bug will be easier to fix and/or give me some insight into the previous bug. As it turned out, I moved onto bug #409793 which was a performance bug on sites like Ink Journal and Ink Tattoo Studio.

What was happening on these sites was that as more points got added to the stroke, rendering would get slower and slower (thus causing the line to lag behind the mouse cursor) because Moonlight was invalidating the entire InkPresenter canvas on each frame and so having to render the entire thing even though it was unnecessary.

To optimize this, I added a 'dirty' rectangle that kept track of the actual regions we needed to redraw. As points got added to the collection between frames, I added the bounds of the new point plus the region between the new point and the previous/next points (the 'next' point is obviously only needed if the newly added point was an insertion). The result for sites like InkJournal and InkTattooStudio was that we only invalidated the newly appended points at each frame render, vastly improving our performance which now matches Microsoft's Silverlight performance for these sites afaict.

Saturday, June 14, 2008

Calculating the Nearest Power of 2

The typical implementation for finding the nearest power of 2 for a given value is as follows:

static uint32_t
nearest_pow (uint32_t num)
{
    uint32_t n = 1;

    while (n < num)
        n <<= 1;

    return n;
}

This implementation's performance, unfortunately, suffers as the value of num increases. Luckily there is another approach that takes a constant time no matter how large the value:

static uint32_t
nearest_pow (uint32_t num)
{
    uint32_t n = num > 0 ? num - 1 : 0;

    n |= n >> 1;
    n |= n >> 2;
    n |= n >> 4;
    n |= n >> 8;
    n |= n >> 16;
    n++;

    return n;
}

A simple performance test might be:

int main (int argc, char **argv)
{
    uint32_t i, n = 0;

    for (i = 0; i < INT_MAX / 10; i++)
        n += nearest_pow (i);

    return n > 0 ? 1 : 0;
}

The run-time difference between the two implementations on my AMD Athlon (/proc/cpuinfo reports AMD Athlon(TM) XP 3200+ @ 2200.141 MHz) is impressive. For performance testing, I compiled with gcc -O2 which I figure is the typical default for most packaged software on Linux distributions.

The brain-dead approach has the following results:

[fejj@serenity cvs]$ time ./pow

real 0m12.034s
user 0m11.809s
sys 0m0.032s

The bitwise implementation is insanely fast:

[fejj@serenity cvs]$ time ./pow2

real 0m1.361s
user 0m1.304s
sys 0m0.008s

Now... to be fair, the if you are using small values for num, then it's possible that the brain-dead approach might be faster. Let's try the same main() for-loop again, but this time let's request nearest_pow() with a value of 1 each time. Since it is likely that the results will be far too fast to really compare, let's also bump up the number of iterations to UINT_MAX.

[fejj@serenity cvs]$ time ./pow

real 0m0.003s
user 0m0.000s
sys 0m0.004s
[fejj@serenity cvs]$ time ./pow2

real 0m0.002s
user 0m0.000s
sys 0m0.000s

Unfortunately, both are still far too fast to really compare performance. Let's try bumping up the value of num to see if we can find the point at which the while-loop approach starts to fall behind the bitwise approach. To start, let's try passing the value of 2 as the num argument:

[fejj@serenity cvs]$ time ./pow

real 0m0.002s
user 0m0.000s
sys 0m0.004s
[fejj@serenity cvs]$ time ./pow2

real 0m0.002s
user 0m0.000s
sys 0m0.000s

It looks like the bitwise approach may be faster than the while-loop approach for the value of 2, but it's a bit hard to tell for sure with only UINT_MAX loops. We'd have to switch to using a 64bit i to know for sure and I'm not sure it's that important. Let's try 3 and see what we get:

[fejj@serenity cvs]$ time ./pow

real 0m6.053s
user 0m5.968s
sys 0m0.004s
[fejj@serenity cvs]$ time ./pow2

real 0m0.003s
user 0m0.000s
sys 0m0.004s

Well, hot diggity... I think we have ourselves a winner. This suggests that for all values of num larger than 2, the performance of the while-loop approach will be outmatched by the bitwise approach and that for values less-than-or-equal to 2, the performance is nearly identical.

Update: Thanks to the anonymous commenter that noticed that my original main() program was allowing the compiler to optimize out the call to nearest_pow() in the bitwise implementation. As suggested, I updated the for-loop to accumulate the output and then used it after the loop to avoid this problem. It only seemed to change the results for the bitwise implementation in the first test, however (before the change, it reported 0.002s). Still, on my machine it is approx. 10x faster for the first test case and seems to lose no performance even in the optimal conditions for the while-loop implementation.

Update2: I was just pointed to the Linux kernel's fls() implementation for x86. Here is a new implementation using inline assembler for x86:

static uint32_t
nearest_pow (uint32_t num)
{
    int bit;

    __asm__("bsrl %1,%0\n\t"
            "jnz 1f\n\t"
            "movl $-1,%0\n"
            "1:" : "=r" (bit) : "rm" (num));

    return (1 << (bit + 1));
}

The results for the original INT_MAX / 10 iterations using i as the num argument yields the following results:

[fejj@serenity cvs]$ time ./pow3

real 0m1.335s
user 0m1.296s
sys 0m0.004s

The results seem negligibly faster than the C bitwise implementation and obviously less portable :(

Update3: A friend of mine, Stephane Delcroix, has just pointed me at a solution to this problem that he came up the other day:

static uint32_t
nearest_pow (uint32_t num)
{
    uint32_t j, k;
    (j = num & 0xFFFF0000) || (j = num);
    (k = j & 0xFF00FF00) || (k = j);
    (j = k & 0xF0F0F0F0) || (j = k);
    (k = j & 0xCCCCCCCC) || (k = j);
    (j = k & 0xAAAAAAAA) || (j = k);
    return j << 1;
}

The results of this implementation are as follows:

[fejj@serenity cvs]$ time ./pow4

real 0m1.249s
user 0m1.204s
sys 0m0.004s

This is actually faster than both the bitwise and the assembler implementations above!

There are two things to be aware of, though:

  • When num is 0, the value of 0 is returned (which may not be desirable depending on what you are doing with it)
  • If num is a power of 2, then instead of returning num, this implementation will return the next higher power of 2

Saturday, March 3, 2007

Quick Sort

Like Merge Sort, Quick Sort promises O(n lg n) time in the average case, and, like Merge Sort, is also a recursive algorithm. However, it is important to be aware that Quick Sort is O(n2) worst case, where, ironically, the input is already mostly sorted in either direction.

The way Quick Sort works is that it first chooses a pivot point (in our case, we'll just choose the middle element) with which to partition the working sequence into two partitions. Next, it puts all items with values smaller than the pivot value into the first partition and all items with values greater than the pivot value into the second partition. Recursively repeat the process on each partition.

Now that we understand how things are supposed to work, lets implement our QuickSort function (keeping our same call signature from previous sorting posts):

#define MID(lo, hi) (lo + ((hi - lo) >> 1))

#define SWAP(a, b) tmp = array[a], array[a] = array[b], array[b] = tmp

static void
QuickSortR (int *array, size_t lo, size_t hi)
{
    register size_t i, k;
    register int pivot;
    int tmp;
    
    /* make sure we have at least 2 elements */
    if ((hi - lo) < 1)
        return;
    
    /* cache our pivot value */
    pivot = array[MID (lo, hi)];
    
    i = lo;
    k = hi;
    
    do {
        /* find the first element with a value >= pivot value */
        while (i < k && array[i] < pivot)
            i++;
        
        /* find the last element with a value <= pivot value */
        while (k > i && array[k] > pivot)
            k--;
        
        if (i <= k) {
            SWAP (i, k);
            i++;
            k--;
        } else {
            break;
        }
    } while (1);
    
    if (lo < k) {
        /* recursively sort the first partition */
        QuickSortR (array, lo, k);
    }
    
    if (i < hi) {
        /* recursively sort the second partition */
        QuickSortR (array, i, hi);
    }
}

void
QuickSort (int *array, int n)
{
    QuickSortR (array, 0, n - 1);
}

Before we go any further, lets see how this compares to our Merge Sort implementation from earlier.

Since I no longer have the same machine that I timed Merge Sort with, I had to re-time it on my laptop (which is where I've been doing this since I can hack while laying back on my comfortable couch). My laptop is an IBM T40 ThinkPad which sports an Intel Pentium M Processor rated at 1600 MHz.

Since I left off sorting 10,000,000 items in the Merge Sort post, I figured I'd use the same array size here.

For Merge Sort, I'm consistently getting around 6.96s. For our Quick Sort implementation, I'm getting about 5.40s.

It would be nice if we could get rid of the function call overhead of recursion like we did with Binary Insertion Sort.

We're going to have to get crafty in order to work around the need to make our function recursive. What we can do is keep our own stack, but growing it dynamically could potentially kill any performance gains we could hope for... so, lets see what Donald Knuth has to say about the mathematical properties of this sort algorithm.

An auxiliary stack with at most [lg N] entries is needed for temporary storage.

As it happens, log2 of the max unsigned value of any integer type is the number of bits in said integer type. This means that on my system, where integers are 32bit, I'll need a stack size of 32.

What do we store on our stack? Well, what variables do we need for QuickSortR()? We need array, high, and low. If we're going to make QuickSort() non-recursive, then that means we'll always have access to array which means the only variables we need to hold on our stack are high and low.

So here it is, your Moment of Zen:

typedef struct qstack {
    size_t lo, hi;
} qstack_t;

void
QuickSort (int *array, size_t n)
{
    qstack_t stack[32], *sp;
    register size_t i, k;
    register int pivot;
    size_t lo, hi;
    int tmp;
    
    /* initialize our stack */
    sp = stack;
    sp->hi = n - 1;
    sp->lo = 0;
    sp++;
    
    do {
        /* pop our lo and hi indexes off the stack */
        sp--;
        lo = sp->lo;
        hi = sp->hi;
        
        if ((hi - lo) < 1)
            continue;
        
        /* cache our pivot value */
        pivot = array[MID (lo, hi)];
        
        i = lo;
        k = hi;
        
        do {
            /* find the first element with a value >= pivot value */
            while (i < k && array[i] < pivot)
                i++;
            
            /* find the last element with a value <= pivot value */
            while (k > i && array[k] > pivot)
                k--;
            
            if (i <= k) {
                SWAP (i, k);
                i++;
                k--;
            } else {
                break;
            }
        } while (1);
        
        if (lo < k) {
            /* push the first partition onto our stack */
            sp->lo = lo;
            sp->hi = k;
            sp++;
        }
        
        if (i < hi) {
            /* push the second partition onto our stack */
            sp->lo = i;
            sp->hi = hi;
            sp++;
        }
    } while (sp > stack);
}

Once again, we see a slight improvement in execution time: 5.10s. Didn't seem to help all that much, but was worth giving it a shot. Might be more beneficial on non-x86 platforms, though...

As I mentioned in the opening paragraph, one of the major problems with Quick Sort is that for already sorted (or nearly sorted) inputs, Quick Sort hits its pathological corner case and becomes extremely inefficient compared to other sorts. One of the more popular solutions to this problem (as used by most libc qsort() implementations) is to fall back to Insertion Sort once an input segment is small enough. I'll leave this up to the reader to consider ways of implementing this approach.

To learn more about sorting, I would recommend reading Art of Computer Programming, Volume 3: Sorting and Searching (2nd Edition)

25 Byte Sort Routine

On the topic of sorting, I came across something cool the other day in Michael Abrash's Graphics Programming Black Book that I thought I'd share: a 25-byte long integer sort routine (that is to say, the routine itself is only 25 bytes long).

Here it is, your Moment of Zen:

;---------------------------------------------------------------
; Sorts an array of ints. C-callable (small model). 25 bytes.
; void sort (int n, int a[]);
;
; Courtesy of David Stafford.
;---------------------------------------------------------------

      .model small
      .code
        public _sort

top:    mov     dx,[bx]     ; swap two adjacent ints
        xchg    dx,[bx+2]
        xchg    dx,[bx]

        cmp     dx,[bx]     ; did we put them in the right order?
        jl      top         ; no, swap them back

        inc     bx          ; go to the next integer
        inc     bx
        loop    top

_sort:  pop     dx          ; get return address (entry point)
        pop     cx          ; get count
        pop     bx          ; get pointer
        push    bx          ; restore pointer
        dec     cx          ; decrement count
        push    cx          ; save count
        push    dx          ; save return address
        jg      top         ; if cx > 0

        ret

      end

Sunday, February 25, 2007

Merge Sort

Note: This is a repost from an old weblog.

Like Binary Insertion Sort, Merge Sort is a recursive algorithm, but Merge Sort is O(n lg n) whereas Binary Insertion Sort had been O(n2). It works by splitting up the set into halves until each set is down to 1 item. It then merges them back together again in sorted order. It's a very simple concept although a bit hard to explain in words. Allow me to illustrate instead:

Given: [6 8 1 3 7 2 5 4]

The first thing we do is split this set in half:

[6 8 1 3] [7 2 5 4]

We keep splitting in half until we can't split anymore:

[6 8] [1 3] [7 2] [5 4]

[6] [8] [1] [3] [7] [2] [5] [4]

We now merge the items back into their previous sets in sorted order:

[6 8] [1 3] [2 7] [4 5]

And again:

[1 3 6 8] [2 4 5 7]

And again:

[1 2 3 4 5 6 7 8]

Not so bad, right? So how do we implement this? Well, breaking the sets into halves is fairly straight forward so I won't bother explaining that. The “hard” part is the merging, and even that's not so hard. Let me illustrate step by step how we merged 2 sets back into a single set:

set #1: [2 7]
set #2: [4 5]

We start by comparing the first item in each set. Is 2 less than 4? Yes, so we put 2 into our “merged” set:

set #1: [7]
set #2: [4 5]
merged: [2]

Now compare 7 and 4. Since 4 is less than 7, we append 4 into our merged set:

set #1: [7]
set #2: [5]
merged: [2 4]

Now compare 7 and 5. Since 5 is less than 7, we append 5 into our merged set:

set #1: [7]
set #2: []
merged: [2 4 5]

Since set #2 is out of items, we just append the remainder of set #1 into our merged set:

merged: [2 4 5 7]

Tada!

As you can probably see, it looks like we'll need a temporary array to hold our merged set as we merge the 2 subsets back together. If we want to keep to the same API we've been using for the previous sorting routines, we'll need a second function that will do most of the work. Since we know we'll never need our temporary array to be larger than the original input array, we might as well create it in our main function and pass it off to our recursive worker function:

void
MergeSort (int a[], int n)
{
    int *b;

    if (!n || !(b = malloc (sizeof (int) * n)))
        return;

    msort (a, b, 0, n - 1);

    free (b);
}

As you can see here, I called my worker function msort(), which takes as input the original array, the temporary array (b[]), the offset of the first item, and the offset of the last item. Now let's take a look at msort()'s implementation:

static void
msort (int a[], int b[], int lo, int hi)
{
    register int h, i, l;
    int mid;

    if (lo >= hi)
        return;

    mid = lo + ((hi - lo) / 2);
    msort (a, b, lo, mid);
    msort (a, b, mid + 1, hi);

    for (i = 0, l = lo, h = mid + 1; l <= mid && h <= hi; i++) {
        if (a[l] <= a[h])
            b[i] = a[l++];
        else
            b[i] = a[h++];
    }

    while (l <= mid)
        b[i++] = a[l++];
    while (h <= hi)
        b[i++] = a[h++];

    memcpy (a + lo, b, sizeof (int) * i);
}

You'll notice that the first thing we do is find the midpoint of our set starting with lo and ending with hi. We then call msort() on each of the 2 resulting subsets (lo to mid, and mid + 1 to hi). After msort() is finished with each of those subsets, the 2 subsets will be in sorted order and will be stored back in array a[] at positions lo through mid and mid + 1 through hi.

We then need to merge the 2 subsets back together again into a sorted set starting at position lo and ending with position hi. We'll use i to represent the next position to add an item into our temporary array, b[]. We'll use l and h to represent the cursor for each of the subsets (l for low and h for high). Notice that I check a[l] <= a[h] rather than a[l] > a[h], this is so that items with the same value in the array are in the same order they were in originally. For integer arrays, this is no big deal – but it is often a preferred feature when sorting arrays of custom data. Sorting algorithms that have this property are called “stable sorts”.

The for-loop merges 1 item from one of the 2 subsets into the temporary array until one of the subsets becomes empty. At this point, one of the 2 while-loops will be executed, appending the remainder of the non-empty set to the merged set.

Now that we've merged the 2 subsets back together into our temporary array, b[], we need to copy the result back into a[] at the subrange we started with.

And there you have it.

Since I'm sure you're as anxious to find out how fast this is compared to our previous sorts for sorting 100,000 items as I am, let's give it a whirl and find out. Wow, that was fast – it averaged 0.044s for sorting that 100,000 item array of random integers. That's the fastest time yet! Now that we have such a fast algorithm for sorting, we'll need to bump up our array size in order to compare further sorting algorithms. Let's go for 10,000,000. This time I get 6.344s which is still faster than most of our previous sort algorithms at 100,000 items!

The first optimization I see is that we could replace those while-loops with memcpy()'s. Turns out, that doesn't really change the results for our particular input (it might make a difference if the majority of values of one set or the other was larger than those in the other, but this wouldn't be the case for our input).

To learn more about sorting, I would recommend reading Art of Computer Programming, Volume 3: Sorting and Searching (2nd Edition)

Update: I've written a follow-up article on Optimizing Merge Sort that may be of interest as well. This follow-up article deals with optimizing a general-purpose Merge Sort implementation to be on par with the libc qsort() function.

Shell Sort

Note: This is a repost from an old weblog.

The Shell Sort algorithm is designed to move items over large distances each iteration. The idea behind this is that it will get each item closer to its final destination quicker saving a lot of shuffling by comparing items farther apart.

The way it works is it subdivides the dataset into smaller groups where each item in the group is a set distance apart. For example, if we use h to represent our distance and R to represent an item, we might have groups: { R1, Rh+1, R2h+1, ... }, { R2, Rh+2, R2h+2, ... }, ...

We then sort each subgroup individually.

Keep repeating the above process, continually reducing h, until h becomes 1. After one last run-through where h is a value of 1, we stop.

At this point, I'm sure you are wondering where h comes from and what values to reduce it by each time though. If and when you ever figure it out, let me know - you might also want to publish a paper and submit it to ACM so that your name might go down in the history (and algorithm) books. That's right, as far as I'm aware, no one knows the answer to this question, except, perhaps, God Himself :-)

If you are interested, you might take a look at Donald Knuth's book, Art of Computer Programming, Volume 3: Sorting and Searching (starting on page 83), for some mathematical discussion on the matter.

As far as I understand from Sorting and Searching, it is theoretically possible to get the Shell Sort algorithm to approach O(n1.5) given an ideal increment table which is quite impressive.

Knuth gives us a couple of tables to start with: [8 4 2 1] and [7 5 3 1] which seem to work okay, but are far from being ideal for our 100,000 item array that we are trying to sort in this exercise, however, for the sake of keeping our first implementation simple, we'll use the [7 5 3 1] table since it has the charming property where each increment size is 2 smaller than the previous. Yay for simplicity.

void
ShellSort (int a[], int n)
{
    int h, i, j;
    int tmp;

    for (h = 7; h >= 1; h -= 2) {
        for (i = h; i < n; i++) {
            tmp = a[i];
            for (j = i; j >= h && a[j - h] > tmp; j -= h)
                a[j] = a[j - h];
            a[j] = tmp;
        }
    }
}

The nice thing about Shell Sort is that, while the increment table is a complete mystery, the algorithm itself is quite simple and well within our grasp.

Let's plug this into our sort program and see how well it does.

I seem to get a pretty consistent 6.3 seconds for 100,000 items on my AMD Athlon XP 2500 system which is almost as good as the results we were getting from our Binary Insertion Sort implementation.

Now for some optimizations. We know that an ideal set of increment sizes will get us down to close to O(n1.5) and that it is unlikely that the [7 5 3 1] set is ideal, so I suggest we start there.

On a hunch, I just started adding more and more primes to our [7 5 3 1] table and noticed that with each new prime added, it seemed to get a little faster. At some point I decided to experiment a bit and tried using a set of primes farther apart and noticed that with a much smaller set of increments, I was able to get about the same performance as my much larger set of primes. This spurred me on some more and I eventually came up with the following set:

{ 14057, 9371, 6247, 4177, 2777, 1861, 1237, 823, 557, 367, 251, 163, 109, 73, 37, 19, 11, 7, 5, 3, 1 }

In order to use this set, however, we need a slightly more complicated method for determining the next value for h than just h -= 2, so I used a lookup table instead:

static int htab[] = { 14057, 9371, 6247, 4177, 2777, 1861, 1237, 823, 557, 367, 251, 163, 109, 73, 37, 19, 11, 7, 5, 3, 1, 0 };

void
ShellSort (int a[], int n)
{
    int *h, i, j;
    int tmp;

    for (h = htab; *h; h++) {
        for (i = *h; i < n; i++) {
            tmp = a[i];
            for (j = i; j >= *h && a[j - *h] > tmp; j -= *h)
                a[j] = a[j - *h];
            a[j] = tmp;
        }
    }
}

With this new table of increments, I was able to achieve an average sort time of about 0.09 seconds. In fact, ShellSort() in combination with the above increment table will sort an array of 2 million items in about the same amount of time as our optimized BinaryInsertionSort() took to sort 100 thousand items. That's quite an improvement, wouldn't you say!?

To learn more about sorting, I would recommend reading Art of Computer Programming, Volume 3: Sorting and Searching (2nd Edition)

Binary Insertion Sort

Note: this is a repost from an old weblog.

The Binary variant of Insertion Sort uses a binary search to find the appropriate location to insert the new item into the output. Before we can implement this variant of the insertion sort, we first need to know and understand how the binary search algorithm works:

Binary Search

Binary searching, for those who don't know (no need to raise your hand in shame), is a method of searching (duh) in which you attempt to compare as few items in the dataset to the item you are looking for as possible by halving your dataset with each comparison you make. Hence the word binary (meaning two). Allow me to illustrate:

Given the following dataset of 9 items in sorted order, find the value 8:

[1 2 3 4 5 6 7 8 9]

The first step is to check the midpoint (there are 9 elements, so the midpoint would be (9+1)/2 which is the 5th element):

[1 2 3 4 5 6 7 8 9]

Well, 5 is not 8 so we're not done yet. Is 8 less than 5? No. Is 8 greater than 5? Yes. So we know that the value we seek is in the second half of the array (if it's there). We are now left with:

[6 7 8 9]

Oops. 4 does not have a midpoint, so we round off any way we want to (it really doesn't matter). (4+1)/2 = 2.5, so for the sake of argument lets just drop the decimal and check the 2nd element (this is how integer math works anyway, so it's simpler):

[6 7 8 9]

Well, 7 is not 8 so we're not done yet. Is 8 less than 7? No. Is 8 greater than 7? Yes. So we know that the value we seek is in the second half of the array (if it's there). We are now left with:

[8 9]

Again we find the midpoint, which in this case is 1.

[8 9]

Does 8 equal 8? Yes! We're done!

How many tries did that take us? Three. Three tries and we found what we were looking for. And, in the worst possible case for this particular array (searching for the 9), we would have been able to do it in 4 tries.

Not bad, huh?

To implement our BinarySearch() function, the easiest way will be to write it using recursion. Recursion is a technique that allows us to break the problem into smaller and smaller pieces (or, the array in this case - as illustrated above) until we arrive at the solution (or, in this case, the item we are looking for). Recursion is implemented by having a function call itself to process a subset of the data, which, may again call itself (repeatedly until the problem is solved).

Notice in my above explanation of how to search the 9-item array using the binary search algorithm, how I continually break the array into smaller and smaller subsets? That's why binary search is often described as a recursive algorithm - you keep repeating the the process on smaller and smaller subsets until your subset cannot get any smaller or until you find what you are looking for.

Hopefully I've explained that well enough, if not - perhaps taking a look at the following implementation will help?

int
BinarySearch (int a[], int low, int high, int key)
{
    int mid;

    mid = low + ((high - low) / 2);

    if (key > a[mid])
        return BinarySearch (a, mid + 1, high, key);
    else if (key < a[mid])
        return BinarySearch (a, low, mid, key);

    return mid;
}

Note: To get the midpoint, we use the formula low + ((high - low) / 2) instead of (high + low) / 2 because we want to avoid the possibility of an integer overflow.

In my above implementation, a[] is our integer array, low is the low-point of the subset in which we are looking, high is the high-point of the subset in which we are looking, and key is the item that we are looking for. It returns the integer index of the value we are looking for in the array. Here's how we would call this function from our program: BinarySearch (a, 0, 9, 8);

Remember that in C, array indexes start at 0. We pass 9 as the high-point because there are 9 elements in our array. I don't think I need to explain why we pass 8 as the key.

If we then plug this into our Insertion Sort algorithm from yesterday (instead of doing a linear search), we get a Binary Insertion Sort... almost. There's one minor change we have to make to our BinarySearch() function first - since we are not necessarily trying to find an already-existing item in our output, we need to adjust BinarySearch() to return the ideal location for our key even in the event that it doesn't yet exist. Don't worry, this is a very simple adjustment:

int
BinarySearch (int a[], int low, int high, int key)
{
    int mid;

    if (low == high)
        return low;

    mid = low + ((high - low) / 2);

    if (key > a[mid])
        return BinarySearch (a, mid + 1, high, key);
    else if (key < a[mid])
        return BinarySearch (a, low, mid, key);

    return mid;
}

Okay, now we're ready to plug it in. Here's the result:

void
BinaryInsertionSort (int a[], int n)
{
    int ins, i, j;
    int tmp;

    for (i = 1; i < n; i++) {
        ins = BinarySearch (a, 0, i, a[i]);
        tmp = a[i];
        for (j = i - 1; j >= ins; j--)
            a[j + 1] = a[j];
        a[ins] = tmp;
    }
}

There's a couple of optimizations we can make here. If ins is equal to i, then we don't need to shift any items nor do we need to set a[ins] to the value in a[i], so we can wrap those last 4 lines in an if-statement block:

void
BinaryInsertionSort (int a[], int n)
{
    int ins, i, j;
    int tmp;

    for (i = 1; i < n; i++) {
        ins = BinarySearch (a, 0, i, a[i]);
        if (ins < i) {
            tmp = a[i];
            for (j = i - 1; j >= ins; j--)
                a[j + 1] = a[j];
            a[ins] = tmp;
        }
    }
}

If you remember from our Insertion Sort optimizations, the memmove() function really helped speed up shifting a lot of items all at once. But before we do that, lets see what kind of times our current implementation takes to sort 100,000 items (just so we have something to compare against after we make that memmove() optimization).

On my AMD Athlon XP 2500, it takes roughly 25 seconds to sort 100,000 random items (using the main() function from our Bubble Sort analysis a few days ago). That's already on par with yesterday's optimized insertion sort implementation and we haven't even plugged in memmove() yet!

I'm pretty excited and can't wait to plug in memmove() to see what kind of results we get, so here we go:

void
BinaryInsertionSort (int a[], int n)
{
    int ins, i, j;
    int tmp;

    for (i = 1; i < n; i++) {
        ins = BinarySearch (a, 0, i, a[i]);
        if (ins < i) {
            tmp = a[i];
            memmove (a + ins + 1, a + ins, sizeof (int) * (i - ins));
            a[ins] = tmp;
        }
    }
}

After plugging in memmove(), our BinaryInsertionSort() sorts 100,000 items in about 5.5 seconds! Whoo!

Lets recap: We started off with the goal of sorting 100,000 items using Bubble Sort which took over over 103 seconds to accomplish. We then scratched our heads and optimized the Bubble Sort algorithm the best we could and obtained a result that was twice as fast. Then we traded our BubbleSort() implementation in for another O(n2) algorithm (generally not something I would suggest wasting time on, but for the sake of learning about a bunch of sorting algorithms, why not?). Using InsertionSort(), we got that down to 26.5 seconds which is 4 times faster than our original sort! But wait, then we took our InsertionSort() one step farther and used a binary search algorithm to help us find the ideal location in which to insert our item and we got it down to a whopping 5.5 seconds - 20 times faster than what we started with!

That's pretty awesome if you ask me...

There's still one more optimization we can try on our BinaryInsertionSort() implementation to attempt to get faster speeds, but it may be over your head if you're a beginner so feel free to ignore my following code dump:

void
BinaryInsertionSort (int a[], int n)
{
    register int i, m;
    int hi, lo, tmp;

    for (i = 1; i < n; i++) {
        lo = 0, hi = i;
        m = i / 2;

        do {
            if (a[i] > a[m]) {
                lo = m + 1;
            } else if (a[i] < a[m]) {
                hi = m;
            } else
                break;

            m = lo + ((hi - lo) / 2);
        } while (lo < hi);
 
        if (m < i) {
            tmp = a[i];
            memmove (a + m + 1, a + m, sizeof (int) * (i - m));
            a[m] = tmp;
        }
    }
}

The basic idea here was that I wanted to get rid of recursion because, unfortunately, as nice as recursion is for visualizing (and implementing) our binary search, there is a cost penalty for every time we recurse - there is both function call overhead (which slows us down) and a memory overhead (we need to grow the stack). If we're clever enough, we can sometimes bypass the need for recursion like I did above which can often lead to significant performance gains.

In this case, however, we didn't improve the performance all that much (down from 5.5 to 5.2 seconds) but on other architectures (such as SPARC), it may be a lot more noticeable. It may also be a lot more noticeable for larger datasets. In fact, lets try sorting 200,000 items and see what sort of times we get.

Ah, much more noticeable now. With recursion, it takes 46 seconds and without, it takes 37 seconds - so there is definitely an advantage to cleverly working around the need for recursion.

Okay, that's all I've got for Binary Insertion Sort, but there are still plenty more sorting algorithms to try before we call it a day - we've just barely scratched the surface - and so far we've only looked into the O(n2) algorithms - there are others that promise O(n lg n) performance!

To learn more about sorting, I would recommend reading Art of Computer Programming, Volume 3: Sorting and Searching (2nd Edition)

Insertion Sort

Note: this is a repost from an old weblog.

Insertion Sort is another O(n2) sorting algorithm (just like Bubble Sort was) but is a little bit faster. Now, ideally, when trying to optimize by picking a new algorithm - you don't want to go from one slow algorithm to another, but I figure while I'm explaining sorting techniques - I might as well explain a bunch of them. Anyways... it turns out that insertion sort works out to be faster than Bubble Sort in the general case.

The way Insertion Sort works, is you move one item from your input dataset into an output dataset one item at a time. Take an item from the input and place it in the output at the proper position such that the output dataset is always in sorted order. Allow me to demonstrate:

Given: [5 8 2 9 6 3 7 1 0 4]

That's our input dataset. Our output dataset is so far empty. The first thing we do, is to take the first item of our input dataset (or any item, really, but we might as well grab them out of the input dataset in order, right?) and place it in the output dataset. Since the output dataset will only have 1 item this first round, there's nothing to sort. The result is:

Input: [8 2 9 6 3 7 1 0 4] Output: [5]

Now take the next input item and place it in the output, but in the proper sorted order:

Input: [2 9 6 3 7 1 0 4] Output: [5 8]

Lather. Rinse. Repeat.

Input: [9 6 3 7 1 0 4] Output: [2 5 8]
Input: [6 3 7 1 0 4] Output: [2 5 8 9]
Input: [3 7 1 0 4] Output: [2 5 6 8 9]
Input: [7 1 0 4] Output: [2 3 5 6 8 9]
Input: [1 0 4] Output: [2 3 5 6 7 8 9]
Input: [0 4] Output: [1 2 3 5 6 7 8 9]
Input: [4] Output: [0 1 2 3 5 6 7 8 9]
Input: [] Output: [0 1 2 3 4 5 6 7 8 9]

Tada! We have a sorted dataset. This is the basis of all insertion sort variants, the difference between the various insertion sorts (I only know of 2, but wouldn't be surprised if there were more) is the way in which we find the ideal place to insert the new item from the input into the output.

Before I get into the different variants, however, let me address one of the concerns you likely have thus far: if we're sorting a large-ish array (note: if we are sorting a linked list, then this isn't an issue), we don't want to have two arrays because that doubles the amount of memory we're using!

Indeed, but if you'll notice: as the output size increases, the input size decreases - together, both the input and output sizes total the same number of elements and if we iterate through the input elements in order, then we can share the same array as both the input and output arrays. Let me re-illustrate:

Output | Input
[][5 8 2 9 6 3 7 1 0 4]
[5 ][8 2 9 6 3 7 1 0 4]
[5 8 ][2 9 6 3 7 1 0 4]
[2 5 8 ][9 6 3 7 1 0 4]
...

See how we can cheat now? Visualization is the key to solving so many problems. Don't be afraid to take a pencil and paper and "draw" the problem - many times the solution will present itself.

Linear Insertion Sort

Generally when someone refers to Insertion Sort, this is the variant that they are talking about. As I was saying above, the difference between the variants is how they find the proper position in which to insert the new item into the output. As the name suggests, Linear insertion sort uses a linear search in order to find this position. Basically, this comes down to scanning through the output array starting at the first position and comparing the values: if the new item is larger than the first output value, move to the next output item and so on until the new item's value is less than the item we are comparing against in the output - once we find that, we have found the position to insert the new item.

Note: we could actually get away with using <= to minimize the number of comparisons we have to do, but then if we have multiple items with the same values, they won't remain in the same order they were in in the input, but if that doesn't bother you, then definitely use <=. Sorts that guarantee that items in the output with the same comparison values are in the same order as they were in the input are called "stable sorts". This is usually a pretty desirable trait (unless performance is more important). The lucky thing for us, though, is that in this particular case, keeping duplicate items in the same order we found them in the input, would force us to work backwards starting at the end of the output array rather than working forwards starting at the beginning of the output array which just so happens to allows us a more promising optimization. Pretty crafty, eh? I thought so...

One thing I should remind you about is that when you insert an item into the output, you'll need to shift all the items after that position one place to the right.

Okay, on to the implementation...

The first optimization you should notice is that after the first "loop", the first item of the input is always the first item of the output, so we might as well start with the second item of the input array. So with that, go ahead and implement this algorithm - you should get something similar to this:

void
InsertionSort (int a[], int n)
{
    int i, j = 0;
    int key;

    for (i = 1; i < n; j = i, i++) {
        key = a[i];
        while (j >= 0 && a[j] > key) {
            a[j + 1] = a[j];
            j--;
        }
        a[j + 1] = key;
    }
}

You'll notice that I took the "work backwards starting at the end of the output array" approach that I noted above. As I started to mention, this approach has the added benefit of allowing us to shift the output items as we go which means we get to shift fewer items per loop on average than if we had worked in the forward direction, resulting in a performance gain (shifting a bunch of items is more expensive than an integer comparison).

If we feed this into our sort program and have main() call this new function rather than the BubbleSort() implementation we wrote yesterday, we find that we have a sort that is significantly faster - down to 36 seconds from the optimized BubbleSort()'s 51 seconds. Not bad...

See any obvious improvements we can make that might result in a significantly faster implementation of our InsertionSort() implementation? I don't see anything I would consider major, but we might try using memmove() instead of manually moving items one space to the right. We'll get the most bang for the buck if we wait until we find our optimal insertion point before moving anything around, so:

void
InsertionSort (int a[], int n)
{
    int i, j = 0;
    int key;

    for (i = 1; i < n; j = i, i++) {
        key = a[i];
        while (j >= 0 && a[j] > key)
            j--;
        memmove (a + (j + 2), a + (j + 1), sizeof (int) * ((i - 1) - j));
        a[j + 1] = key;
    }
}

Okay, so with that very simple adjustment, we got our total sort time from 36 seconds down to 26.5 seconds. Nothing to scoff at, surely, but at the same time not as drastic an improvement as changing algorithms had been (see why algorithms are so important for optimizations?).

Now just imagine if we used an algorithm that promised better than O(n2) performance!

Before we jump to something better than O(n2) though, I still have one more trick up my sleeve to improve our insertion sort algorithm (not implementation this time, but rather: algorithm). I dub thee: Binary Insertion Sort.

To learn more about sorting, I would recommend reading Art of Computer Programming, Volume 3: Sorting and Searching (2nd Edition)

Code Snippet Licensing

All code posted to this blog is licensed under the MIT/X11 license unless otherwise stated in the post itself.