=pod =head1 ZeroMQ & Perl =head2 META =head3 Author Thomas Klausner =head3 Email domm AT plix.at =head3 URL http://domm.plix.at =head3 URL_slides http://domm.plix.at/talks/zeromq_perl =head3 Date 2013-03-04 =head3 Location Metalab =head3 Event Vienna.pm Techmeet =head2 ZeroMQ - 0mq "We took a normal TCP socket, injected it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombarded it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex. Yes, ØMQ sockets are the world-saving superheroes of the networking world." =for newslide =for img boom.png (this and other images are taken from the 0mq Guide: http://zguide.zeromq.org) =for newslide 0mq isn't a job queue it's a set of tools that make it very easy to let programs talk to each other using a few well-defined B =for newslide 0mq is asynchron, easy to distribute over the network .. and over different languages =head2 Basic 0mq Concepts =head3 Messaging Patterns =head4 REQ-REP =for img req_rep.png Client-Server a REQ for the client always has to be followed by a REP from the server to make things more flexible / usable there is also DEALER and ROUTER read about them in the Guide =head4 PUB-SUB =for img pub_sub.png =head4 PUSH-PULL =for img push_pull.png =head4 Messaging as it starts =for img messaging_start.png =head4 Messaging at it becomes =for img messaging_end.png =head3 Messages A 0mq message is just a bunch of bytes. What those bytes mean is up to you. To make things a bit easier, 0mq supports multipart messages. 0mq guarantees that a "client" will always recieve either all complete parts of a message, or no message at all. "atomic delivery of messages" (if you have enough RAM...) =head3 Transports Currently 0mq supports various unicast and multicast transports: B inproc - inside one process ipc - using something like a FIFO tcp - ipv4 and ipv6 B epgm and pgm, but I have no idea what they do :-) =head3 0MQ in a Hundered Words (abridged version, original available in the Guide) ØMQ looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. (58 words..) =for newslide Read the ZeroMQ Guide: http://zguide.zeromq.org =head2 ZMQ::LibZMQ3 Daisuke Maki https://metacpan.org/module/ZMQ::LibZMQ3 Very low-level and C-like =head3 REQ-REP Hello World A server that accepts a single part message And returns it with "Hello " preprended. =for newslide =for include_code bin/raw_req_rep_hello_world_server.pl =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 1-4 3:4-16:red =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 1-4 4:4-18:red # we need to load some constants 0MQ uses a lot of constants and so does ZMQ::LibZMQ3 I hate constants.. =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 6-8 6:4-11:blue;15-24:red # $context is some sort of internal 0MQ manager You will always need exactly one $context per program. =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 6-8 7:4-10:blue;14-23:red;35-41:green # we generate a new REP 0MQ socket and store it in $server =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 6-8 8:1-8:red;10-16:blue;20-32:green # now we bind the socket to a transport in this case a C port C<10001> on localhost The server is now ready to use. =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 10-21 10:1-5:green 11:5-12:red # now we start the 0mq event loop provided by C =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 10-21 13:20-29:red # If zmq_poll() detects a ZMQ_POLLIN event (i.e. if something sent a message to the REP socket) =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 10-21 14:9-16:red;21-25:blue; 19:9-10:blue # the callback sub is called =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 10-21 15:36-46:red # zmq_recvmsg() gets the next message part =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 10-21 15:23-34:red # and zmq_msg_data() coverts the raw message into a Perl string we'll skip handling of multipart messages for now... =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 10-21 17:13-20:red;33-43:green # zmq_send() sends the reply =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl 1-21 # That's it for the server =for newslide Now let's take a look at the client The client will take a string from C<@ARGV> (or use a default of "World") send it to the server and receive the response. =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 1-16 =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 1-6 4:4-18:red;23-29:blue # We'll again need some constants =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 1-6 6:4-8:red;12-20:blue;25-29:green # prepare the "payload" =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 8-10 8:15-22:red # As this is another programm, we need to set up the zmq_context by calling zmq_init() =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 8-10 9:4-10:blue;14-23:red;35-41:green # We set up the socket, this time a REQ =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 8-10 10:1-11:red;23-43:green # and call zmq_connect() to connect the socket to the server Note that there are two functions to connect sockets: C and C. You should use C for the more stable part of your messaging network (aka the "server"). The "clients" than can C to the stable port. =for newslide One drawback of the decentralised concept of 0MQ is that you have to find a way to tell all parts of your network(s) where the other parts are. This can be very annoying There are various ways around this issue I won't cover them here. But if you're interested, read the "Service Discovery" chapter in the Guide =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 8-10 # The client is now connected to the server =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 12-16 12:1-8:red # so we use zmq_send() to send the message. =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 12-16 15:13-24:blue;26-36:red # and the combination of zmq_recvmsg() and zmq_msg_data() to read the reply =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl 1-16 # the whole client =for newslide Live Demo! =for newslide We quickly found a big problem with C and C It eats up a whole CPU. see htop output So now is the time to use a proper event loop: =head2 AnyEvent Marc Lehmann https://metacpan.org/module/AnyEvent "the DBI of event loop programming" AnyEvent provides a uniform interface to various event loops: EV, AnyEvent::Loop, Event, Glib, Tk, Event::Lib, Qt, POE =head3 Side note: Event based vs. "regular" programming A "normal" program (script, CGI, ..) used to consist of a row of statements that where executed in order. Event based programs define a set of events and what code to execute if one of those events happen. I assume that everybody knows this, since we're using event based code for ages in GUI programming and thanks to JavaScript also in WebDev. =head3 AnyEvent example (from AnyEvent::Intro) =for newslide =for include_fragment bin/anyevent_example.pl 1-21 =for newslide =for include_fragment bin/anyevent_example.pl 1-8 6:8-23:green # we prompt for some input But instead of just reading from C we use AnyEvent to do so asynchronly and event-based aka "non-blocking" =for newslide =for include_fragment bin/anyevent_example.pl 8-19 8:4-14:blue;18-34:red # the first thing we need is a so-called condvar aka "Condition Variables" or "merge point" =for newslide =for include_fragment bin/anyevent_example.pl 8-19 19:1-11:blue;14-17:red # here we will wait for the condvar to become "true" You could think of this as starting the event loop, but AFAIK that's not really true. Anyway, the code will run up to here and then wait (but without eating all your CPU as the while() loop..) =for newslide =for include_fragment bin/anyevent_example.pl 8-19 10:4-18:blue;22-33:red # here we define an IO event handler =for newslide =for include_fragment bin/anyevent_example.pl 8-19 11:4-5:red;12-19:blue 12:4-7:red;12-14:blue # it will watch read-events on STDIN =for newslide =for include_fragment bin/anyevent_example.pl 8-19 13:4-5:red;12-16:blue 16:4-5:blue # and call this callback if an event happens =for newslide =for include_fragment bin/anyevent_example.pl 8-19 14:7-11:blue;15-21:red # plain old read from STDIN =for newslide =for include_fragment bin/anyevent_example.pl 8-19 15:7-17:blue;20-24:red # but after reading we call send on the condvar This tells AnyEvent that this condition is now satisfied and it might continue. =for newslide =for include_fragment bin/anyevent_example.pl 10-21 21:1-5:red # i.e. we now print the name Now this seems to be a lot of hassle to just read from STDIN. So let's add some more asynchronity! =for newslide =for include_fragment bin/anyevent_example_2.pl 8-25 19:9-23:red # I've added an AnyEvent->timer =for newslide =for include_fragment bin/anyevent_example_2.pl 8-25 19:25-29:red;34-35:blue;37-44:red;49-50:blue # after one second, it will fire every second =for newslide =for include_fragment bin/anyevent_example_2.pl 8-25 20:5-7:red # and print some message Now we can enter data to STDIN and write some data B =for newslide Let's add more! =for newslide =for include_fragment bin/anyevent_example_3.pl 19-27 23:4-11:green;15-30:red;31-40:blue # Another timer, which fires once after 5 seconds =for newslide =for include_fragment bin/anyevent_example_3.pl 19-27 25:5-9:blue;14-25:green 26:5-21:red # it sets the name to some helpful string and calls send() on the condvar thus allowing the program to continue after line 29 =for newslide =for include_fragment bin/anyevent_example_3.pl 29-31 So now we can enter data to STDIN, write some data and have a timeout. Parallelism FTW! =head2 ZMQ::LibZMQ3 and AnyEvent Now let's combine AnyEvent with ZMQ::LibZMQ3 for proper asynchron, event-based handling of ZMQ messages. =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl 1-22 =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl 1-10 4:31-36:green 10:4-6:blue;10-23:red;35-40:green # we use ZMQ_FD and zmq_getsockopt() to get a special filehandle representing the socket This filehandle will get read/write events when 0MQ accesses it. C and C are used to get/set various options in 0MQ sockets. (high water marks, timeouts, buffer sizes, ...) B Most options can only be set before the socket binds/connects! =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl 12-22 12:9-20:red # We use AnyEvent to get an IO event watcher =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl 12-22 13:5-6:red;13-15:blue 14:5-8:red;13-15:blue; # using the 0MQ filehandle, polling on read events =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl 12-22 15:5-6:red;13-17:blue 20:5-6:red; # the code inside the callback stays the same =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl 12-22 22:1-23:red # and we wait for a condvar that will never be sent i.e. wait here forever. Live Demo? note the low CPU load! =head2 PUB-SUB with multipart messages Now let's take a look at multipart message handling I will show a small PUB-SUB service. The publisher will publish random track names coming from my mp3 collection. The subscribers can listen to this stream and pick out only artists they are interested in. =for newslide =for include_fragment bin/anyevent_publisher.pl 1-23 =for newslide =for include_fragment bin/anyevent_publisher.pl 1-5 4:23-29:red # ZMQ_PUB because we want to set up a publisher =for newslide =for include_fragment bin/anyevent_publisher.pl 1-5 4:38-48:red # ZMQ_SNDMORE is needed for multipart messages =for newslide =for include_fragment bin/anyevent_publisher.pl 7-9 8:17-27:red;38-44:blue # we set up the PUB socket =for newslide =for include_fragment bin/anyevent_publisher.pl 11-12 12:12-25:red # get_mp3_data() collects my mp3s does some data munging and returns a list of songs =for newslide =for include_fragment bin/anyevent_publisher.pl 25-41 # boring but let's look at one element of C<@all> because that's what we're going to send... =for newslide [ 'bauchklang/jamzero/04_Shaker.mp3', 'bauchklang', 'jamzero', '04_Shaker.mp3' ]; =for newslide =for include_fragment bin/anyevent_publisher.pl 14-23 15:9-23:red;37-44:blue;49-51:green # we set up an AnyEvent Timer that is going to fire every 0.1 seconds =for newslide =for include_fragment bin/anyevent_publisher.pl 14-23 16:8-15:red;19-23:green;27-31:blue # we get some random MP3 file info =for newslide =for include_fragment bin/anyevent_publisher.pl 14-23 17:8-18:blue;22-24:red;26-34:green # we get the last frame of the multipart message =for newslide =for include_fragment bin/anyevent_publisher.pl 14-23 18:5-11:red;16-21:green 19:9-20:red;22-27:green;42-52:blue # we send all other frames using the ZMQ_SNDMORE flag =for newslide =for include_fragment bin/anyevent_publisher.pl 14-23 21:5-16:red;18-28:blue # and send the last frame without ZMQ_SNDMORE This (the lack of ZMQ_SNDMORE) indicates 0MQ that the is the last part of the multipart message And that the message can now be transfered. =for newslide my @frames = qw(Hello Vienna.pm at Metalab); zmq_msg_send($frame[0], $publisher, ZMQ_SNDMORE); zmq_msg_send($frame[1], $publisher, ZMQ_SNDMORE); zmq_msg_send($frame[2], $publisher, ZMQ_SNDMORE); zmq_msg_send($frame[3], $publisher); =for newslide That's the whole publisher. =for include_fragment bin/anyevent_publisher.pl 1-23 =for newslide Now the subscriber: =for newslide =for include_fragment bin/anyevent_subscriber.pl 1-27 =for newslide =for include_fragment bin/anyevent_subscriber.pl 1-5 4:23-29:red # we want a subscriber socket, so we need ZMQ_SUB =for newslide =for include_fragment bin/anyevent_subscriber.pl 1-5 4:38-50:red # ZMQ_SUBSCRIBE is needed to select which messages we're interested in =for newslide =for include_fragment bin/anyevent_subscriber.pl 1-5 4:52-62:red # ZMQ_RCVMORE is needed to receive multipart messages =for newslide =for include_fragment bin/anyevent_subscriber.pl 7-11 8:39-45:red # get a SUB socket =for newslide =for include_fragment bin/anyevent_subscriber.pl 7-11 10:1-14:red;30-42:blue;45-52:green;57-58:darkgreen # we use zmq_setsockopt to set the SUBSCRIBE option to either the first command line argument or empty string =for newslide If we set a C value, the publisher will only sent messages to this subscriber which start with the given substring. ~$ anyevent_subscriber.pl the_streets Only get info about "The Streets" ~$ anyevent_subscriber.pl the_ Get info on all "The" bands =for newslide You can use that for some simple filtering So the first part of your message should represent some sort of hierarchy. Unfortunatly one cannot subscribe via regex. =for newslide =for include_fragment bin/anyevent_subscriber.pl 13-26 13:9-20:red 14:13-15:blue 15:13-25:blue # we set up an AnyEvent handler catching IO read events on the OMQ filehandle =for newslide =for include_fragment bin/anyevent_subscriber.pl 13-26 18:9-13:green;28-38:red # we get a raw message using zmq_recvmsg() =for newslide =for include_fragment bin/anyevent_subscriber.pl 13-26 17:12-17:blue 19:19-24:blue;26-37:red # and use zmq_msg_data() to store this part in an array =for newslide =for include_fragment bin/anyevent_subscriber.pl 13-26 20:18-31:red;46-56:blue # using zmq_getsockopt() we aks the socket if there are more parts to receive =for newslide =for include_fragment bin/anyevent_subscriber.pl 13-26 20:17-17:red 21:17-19:red # if there are no more parts, we print the message =for newslide =for include_fragment bin/anyevent_subscriber.pl 13-26 22:17-26:blue # and empty the message buffer And we're done with the subscriber =for newslide =for include_fragment bin/anyevent_subscriber.pl 1-27 =for newslide Live Demo! =for newslide That is all B cool! But B! C is very lowleve and C-like =for newslide Daisuke Maki has another dist called C "a Perl-ish wrapper for libzmq" "Personally, I'd recommend only using this module for your one-shot scripts, and use ZMQ::LibZMQ* for all other uses. YMMV." not very encouraging, so we wrote =head2 ZMQx::Class OO Interface to ZMQ Not on CPAN (yet) But on github https://github.com/domm/ZMQx-Class It removes a lot of the annoying low level stuff But should still be flexible enough to allow you to have fun with 0MQ =for newslide Key features are: =over =item * Easy setup of sockets =item * Easy setting & getting of sockopts =item * Easy sending & receiving of single- and multipart messages =item * AnyEvent helpers =back =for newslide I'm now showing you the PUB-SUB MP3 Info Thingy from the last section implemented using ZMQx::Class =head3 The Publisher using ZMQx::Class =for newslide =for include_fragment bin/zmqx_publisher.pl 1-16 =for newslide =for include_fragment bin/zmqx_publisher.pl 1-4 3:5-15:red # no more loading of lots of constants =for newslide =for include_fragment bin/zmqx_publisher.pl 5-7 6:17-35:red;38-40:blue;44-47:green;51-63:green # set up socket & bind/connect in one go no more messing with C<$context> and C =for newslide You can also set sockopts here: ZMQx::Class->socket('REQ', bind=>'tcp://*:10001', { ipv4only => 0, sndtimeo => 500, linger => 1000, }); =for newslide =for include_fragment bin/zmqx_publisher.pl 11-16 14:17-20:red;22-30:blue # just send() an arrayref automatic handling of C =for newslide 10 lines of OMQ code replaced by 3 lines of ZMQx::Class code. less code! yay! =for newslide =for include_fragment bin/anyevent_publisher.pl 1-23 =for newslide =for include_fragment bin/zmqx_publisher.pl 1-16 =head3 The Subscriber using ZMQx::Class =for include_fragment bin/zmqx_subscriber.pl 1-1 =for newslide =for include_fragment bin/zmqx_subscriber.pl 1-4 4:5-26:red # AnyEvent helper =for newslide =for include_fragment bin/zmqx_subscriber.pl 6-8 7:14-26:red 8:23-28:red # nice helper methods instead of annoying function calls =for newslide =for include_fragment bin/zmqx_subscriber.pl 10-17 10:9-29:red;32-38:red;54-58:blue 16:1-2:blue # Set up an AnyEvent watcher with less hassle =for newslide =for include_fragment bin/zmqx_subscriber.pl 10-17 11:15-22:blue;39-55:red # simple method to receive all frames of a multipart message Again lots of code removed =for newslide =for include_fragment bin/anyevent_subscriber.pl 1-27 =for newslide =for include_fragment bin/zmqx_subscriber.pl 1-17 =for newslide Also: my $res = $client->wait_for_message; One-Line method to set up an AnyEvent watcher that waits for the next message great for REQ-REP More to come as we discover it... =head3 ZMQx::Class wrapup C is still ALPHA The API might change anytime but we're already using it in a few apps so it basically works we're currently working on some new apps that are going to use 0MQ a lot. I'll be happy to report about our experiences another time. =for newslide =head2 Resources 0MQ: http://www.zeromq.org/ The Guide: http://zguide.zeromq.org includes lots of example code in lots of languages ZMQ::LibZMQ3: https://metacpan.org/module/ZMQ::LibZMQ3 AnyEvent: http://anyevent.schmorp.de ZMQx::Class: https://github.com/domm/ZMQx-Class =head2 Questions? Reliability ZeroMQ vs RabbitMQ (davewood)