Getting started with ZeroMQ and AnyEvent

We are currently evaluating ZeroMQ for a new project. At first it seemed that there are various Perl modules to help us (Message::Passing, AnyEvent::ZeroMQ), but most of them depend on the deprecated ZeroMQ distribution. So we continued on the lower levels provided by ZMQ::LibZMQ3

For the first tests, we want to set up some PUB-SUB queues where several daemons (web servers) publish different events, and some workers subscribe to some of the events and do stuff when the see an event. For example, when a new user registers a fresh account, we need to send a verification mail, copy the user data to some other database (don't ask...) and maybe notify some people (eg via an IRC message or a growl-like message on the users desktop).

When testing some new stuff, I like to do so in a fresh environment. So let's set that up:

Set up a fresh perl for testing

Using perlbrew I compiled me a new Perl:

perlbrew install -n --as zmq perl-5.16.2

I use -n (no-tests) because it's much faster and I already tested a perl-5.16.2 install on this machine (and I just assume that it still works). --as zmq installs this Perl under a distinct name ('zmq') using an @INC that's completely separated from other Perls @INCs.

This takes a while, and as one of our goals that we hope to achieve via ZeroMQ is better scalability through parallelisation, I "fork another process" (open a new xterm) to get ZeroMQ itself.

Install 0MQ 3.2 on Debian

ZeroMQ says that one should use 3.2 for new projects. Unfortunately, this version is not yet in Debian testing, so we need some fuzzing with apt. Luckily 0MQ provides some docs on how to do this here.

In short:

sudo su
cat "deb http://ftp.debian.org/debian experimental main" >> /etc/apt/sources.list
aptitude update
aptitude install libzmq3-dev

Hopefully, perlbrew has finished now. I now use the new Perl via perlbrew use zmq.

Install ZMQ::LibZMQ3

Now let's install the Perl binding from CPAN

cpanm ZMQ::LibZMQ3

Easy. Thanks to the incredible efforts by the heroes working on the Perl toolchain.

A simple Publisher

First I wrote a very small webserver using plain Plack (use cpanm Plack to get it):

#!/usr/bin/env perl
use 5.016;
use Plack::Runner qw();

my $app = sub {
    my $env = shift;
    my $path = $env->{PATH_INFO};
    return [
        '200',
        [ 'Content-Type' => 'text/plain' ],
        [ "You requested: $path" ],
    ];
};

my $runner = Plack::Runner->new;
$runner->parse_options(@ARGV);
$runner->run($app);

As you can see, it accepts any URL and echos back the path you requests.

Now I want to publish some events using ZeroMQ:

use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_PUB);
my $context = zmq_init();
my $publisher = zmq_socket($context, ZMQ_PUB);
zmq_bind($publisher, 'tcp://*:5556');

I use LIBZMQ3, get a $context and then a new 0MQ socket using ZMQ_PUB. I bind the publisher to tcp port 5556 (If this sounds like total gibberish, it might be a good idea to now go and read the first chapter of the excellent 0MQ guide).

In the code handling the request, I now send a message:

my $app = sub {
    my $env = shift;
    my $path = $env->{PATH_INFO};
    zmq_send( $publisher, "request $path" );
    return [
        '200',
        [ 'Content-Type' => 'text/plain' ],
        [ "You requested: $path" ],
    ];
};

zmq_send will publish a message, and than completely forget about it. It just sends it into the void. Here I use a simple string as a message. request is some sort of type info about the message. Subscribers can use this info to decide if they want to handle the message or not. (In a real app I guess one should use a multipart message, containing the event type in the first frame, and some payload (eg JSON) in the second frame).

I can now start the server via plackup webserver_psgi.pl and send some requests to http://localhost:5000. This works, but the zmq messages disappear into the void, as nobody is listening. So lets write a client!

A first SUB client

This client will subscribe to the request events published by server. It will echo the requested URL to the terminal. Every time after receiving 10 requests it should print out a short statistic. (We would like to use something like this to minimize the number of DB writes when collecting various stats. We might also want to use this to offload all our stats/logging to some other service)

#!/usr/bin/perl
use 5.016;

use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_SUB ZMQ_SUBSCRIBE ZMQ_POLLIN);

my $context = zmq_init();
my $subscriber = zmq_socket($context, ZMQ_SUB);
zmq_connect($subscriber, 'tcp://localhost:5556');
zmq_setsockopt($subscriber, ZMQ_SUBSCRIBE, 'request');

my %stat;
my $cnt=0;

This is just standard ZMQ code to set up a subscriber, connect to 5556. zmq_setsockopt tells the subscribe to only be interested in messages starting with request. %stat and $cnt store the statistics and the count of requests.

Now we need to do some actual work:


while (1) {
    zmq_poll([{
        socket  => $subscriber,
        events  => ZMQ_POLLIN,
        callback => sub {
            my $msg = zmq_msg_data(zmq_recvmsg($subscriber));
            $msg=~s/^request //;
            say "someone accessed $msg";

            $cnt++;
            $stat{ $msg }++;
            if ($cnt % 10 == 0 ) {
                foreach (sort keys %stat) {
                    say "$_\t$stat{$_}";
                }
            }
        }
    }]);
}

In an endless while-loop I use zmq_poll (which is an interface to the ZeroMQ event loop) to set up a callback. The callback receives the message, strips of 'request ', prints a message and increments the counter. After 10 requests it prints some stats.

This looks quite nice - and it even works (please imagine the server running in another term, and somebody sending some http requests to this server (eg lwp-request http://localhost:5000/test))

domm@t430:~/0mq-test$ perl w1.pl
someone accessed /
someone accessed /
someone accessed /
someone accessed /zeromq
someone accessed /zeromq
someone accessed /zeromq
someone accessed /zeromq
someone accessed /zeromq
someone accessed /test
someone accessed /test
/       3
/test   2
/zeromq 5

But I noticed a small problem. One of my CPU cores was running at 100%, because apparently the while-loop and zmq_poll do not work that good together.

Time to get async!

AnyEvent

AnyEvent calls itself "the DBI of event loop programming" because it provides an unified API to various event loops. Let's install it:

cpanm AnyEvent

It took me a while to understand how I'm supposed to write AnyEvent code. It's quite different from what I still knew from when I was doing POE. ZMQ::LibZMQ3 provides an example how to get ZeroMQ working with AnyEvent. I couldn't get this to work, so after some experimenting I ended up with this:

#!/usr/bin/perl
use 5.016;
use AnyEvent;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_SUB ZMQ_SUBSCRIBE ZMQ_RCVMORE ZMQ_FD);

my $context = zmq_init();
my $subscriber = zmq_socket($context, ZMQ_SUB);
zmq_connect($subscriber, 'tcp://localhost:5556');
zmq_setsockopt($subscriber, ZMQ_SUBSCRIBE, 'request');

my %stat;
my $cnt=0;

my $quit_program = AnyEvent->condvar;

my $fh = zmq_getsockopt( $subscriber, ZMQ_FD );

my $w = AnyEvent->io(
    fh   => $fh,
    poll => "r",
    cb   => sub {
        while ( my $recvmsg = zmq_recvmsg( $subscriber, ZMQ_RCVMORE ) ) {
            my $msg = zmq_msg_data($recvmsg);
            $msg=~s/^request //;
            say "someone accessed $msg";

            $cnt++;
            $stat{ $msg }++;
            if ($cnt % 10 == 0 ) {
                foreach (sort keys %stat) {
                    say "$_\t$stat{$_}";
                }
            }
        }
    });

$quit_program->recv;

There are only a few changes:

my $quit_program = AnyEvent->condvar;

This is some sort of handle that somehow represents the event loop. This is of course grossly wrong, but I haven't really digested AnyEvent yet.

my $fh = zmq_getsockopt( $subscriber, ZMQ_FD );

Using zmq_getsockopt I get a non-blocking filehandle from ZeroMQ. Now I can pass this filehandle on to AnyEvent for async handling:

my $w = AnyEvent->io(
    fh   => $fh,
    poll => "r",
    cb   => sub { ... }
);

(the code inside the callback stayed the same..)

My new worker still works (yay), but now uses nearly no CPU (yay!)

Summary

ZeroMQ and AnyEvent introduce a few strange and slightly low-levely concepts, which are extremely powerful. I do hope that we'll end up using ZeroMQ together with AnyEvent. If we do, I guess we'll need to come up with some code that wraps some of the ugliness into some nicer APIs. So maybe you'll see some of that on CPAN in the next months...

Source Code