=pod =head1 ZeroMQ & Perl =head2 META =head3 Author Thomas Klausner =head3 Email domm AT plix.at =head3 URL http://domm.plix.at =head3 URL_slides http://domm.plix.at/talks/linuxwochen_zeromq_perl =head3 Date 2013-05-03 =head3 Location FH Technikum Wien =head3 Event Linuxwochen Wien 2013 =head2 Thomas Klausner domm Just another EPU Perl Hacker =for newslide =for img vienna.pm.org.png =for newslide =for img bicycle.pm.png =for newslide =for img validad.com.png =for newslide =for img oe1.orf.at.png =head2 The Single CPU is Dead Moorepocalypse Single CPUs will not continue to become faster / more powerful We need other ways to increase performance Parallelism FTW! =head2 Parallel computing is hard Lots of programms need to communicate with each other Old style solution: Implement some sort of protocol over TCP Sockets problem: Hard, ugly =for newslide Modern solution: Message Queue / Job Queue AMQP, RabbitMQ, Gearman problem: central message broker =for newslide and then there is ... =head2 ZeroMQ - 0mq The Intelligent Transport Layer "We took a normal TCP socket, injected it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombarded it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex. Yes, 0MQ sockets are the world-saving superheroes of the networking world." =for newslide =for img boom.png (this and other images are taken from the 0mq Guide: http://zguide.zeromq.org) =for newslide 0mq isn't a message queue 0mq isn't just sockets it's a set of tools that make it very easy to let programs talk to each other using a few well-defined B =for newslide 0mq is asynchron, easy to distribute over the network .. and over different languages =for newslide Ada Bash Basic C Chicken Scheme Common Lisp C# C++ D delphi Erlang F# Felix Flex Go Guile Haskell Haxe Java JavaScript Julia LabVIEW Lua Nimrod Node.js Objective-C Objective Caml ooc Perl PHP Python Q Racket R REBOL 2 REBOL 3 Red Ruby Scala Smalltalk Tcl Twisted XPCOM =for newslide Ada Bash Basic C Chicken Scheme Common Lisp C# C++ D delphi Erlang F# Felix Flex Go Guile Haskell Haxe Java JavaScript Julia LabVIEW Lua Nimrod Node.js Objective-C Objective Caml ooc B PHP Python Q Racket R REBOL 2 REBOL 3 Red Ruby Scala Smalltalk Tcl Twisted XPCOM =head2 Basic 0mq Concepts =head3 Messaging Patterns =head4 REQ-REP =for img req_rep.png Client-Server a REQ for the client always has to be followed by a REP from the server to make things more flexible / usable there is also DEALER and ROUTER read about them in the Guide =head4 PUB-SUB =for img pub_sub.png =head4 PUSH-PULL =for img push_pull.png =head4 Messaging as it starts =for img messaging_start.png =head4 Messaging at it becomes =for img messaging_end.png =head3 Messages A 0mq message is just a bunch of bytes. What those bytes mean is up to you. To make things a bit easier, 0mq supports multipart messages. 0mq guarantees that a "client" will always recieve either all complete parts of a message, or no message at all. "atomic delivery of messages" (if you have enough RAM...) =head3 Transports Currently 0mq supports various unicast and multicast transports: B inproc - inside one process ipc - using something like a FIFO tcp - ipv4 and ipv6 B epgm and pgm, but I have no idea what they do :-) =head3 0MQ in a Hundered Words (abridged version, original available in the Guide) 0MQ looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. (58 words..) =for newslide Read the ZeroMQ Guide: http://zguide.zeromq.org =head2 ZMQ::LibZMQ3 Daisuke Maki https://metacpan.org/module/ZMQ::LibZMQ3 Very low-level and C-like =head3 REQ-REP Hello World A server that accepts a single part message And returns it with "Hello " preprended. =for newslide =for include_code bin/raw_req_rep_hello_world_server.pl @c3e99e4dd =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 1-4 3:4-16:red =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 1-4 4:4-18:red # we need to load some constants 0MQ uses a lot of constants and so does ZMQ::LibZMQ3 I hate constants.. =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 6-8 6:4-11:blue;15-24:red # $context is some sort of internal 0MQ manager You will always need exactly one $context per program. =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 6-8 7:4-10:blue;14-23:red;35-41:green # we generate a new REP 0MQ socket and store it in $server =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 6-8 8:1-8:red;10-16:blue;20-32:green # now we bind the socket to a transport in this case a C port C<10001> on localhost The server is now ready to use. =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 10-21 10:1-5:green 11:5-12:red # now we start the 0mq event loop provided by C =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 10-21 13:20-29:red # If zmq_poll() detects a ZMQ_POLLIN event (i.e. if something sent a message to the REP socket) =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 10-21 14:9-16:red;21-25:blue; 19:9-10:blue # the callback sub is called =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4dd 10-21 15:36-46:red # zmq_recvmsg() gets the next message part =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4d 10-21 15:23-34:red # and zmq_msg_data() coverts the raw message into a Perl string we'll skip handling of multipart messages for now... =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4d 10-21 17:13-20:red;33-43:green # zmq_send() sends the reply =for newslide =for include_fragment bin/raw_req_rep_hello_world_server.pl @c3e99e4d 1-21 # That's it for the server =for newslide Now let's take a look at the client The client will take a string from C<@ARGV> (or use a default of "World") send it to the server and receive the response. =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 1-16 =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 1-6 4:4-18:red;23-29:blue # We'll again need some constants =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 1-6 6:4-8:red;12-20:blue;25-29:green # prepare the "payload" =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 8-10 8:15-22:red # As this is another programm, we need to set up the zmq_context by calling zmq_init() =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 8-10 9:4-10:blue;14-23:red;35-41:green # We set up the socket, this time a REQ =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 8-10 10:1-11:red;23-43:green # and call zmq_connect() to connect the socket to the server Note that there are two functions to connect sockets: C and C. You should use C for the more stable part of your messaging network (aka the "server"). The "clients" than can C to the stable port. =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 8-10 # The client is now connected to the server =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 12-16 12:1-8:red # so we use zmq_send() to send the message. =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 12-16 15:13-24:blue;26-36:red # and the combination of zmq_recvmsg() and zmq_msg_data() to read the reply =for newslide =for include_fragment bin/raw_req_rep_hello_world_client.pl @c3e99e4d 1-16 # the whole client =for newslide Live Demo! =for newslide We quickly found a big problem with C and C It eats up a whole CPU. see htop output So now is the time to use a proper event loop: =head2 AnyEvent Marc Lehmann https://metacpan.org/module/AnyEvent "the DBI of event loop programming" AnyEvent itself isn't an event loop but it provides a uniform interface to various event loops: EV, AnyEvent::Loop, Event, Glib, Tk, Event::Lib, Qt, POE =head3 Side note: Event based vs. "regular" programming A "normal" program (script, CGI, ..) used to consist of a row of statements that where executed in order. Event based programs define a set of events and what code to execute if one of those events happen. I assume that everybody knows this, since we're using event based code for ages in GUI programming and thanks to JavaScript also in WebDev. =head3 AnyEvent example (from AnyEvent::Intro) =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 1-21 =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 1-8 6:8-23:green # we prompt for some input But instead of just reading from C we use AnyEvent to do so event-based =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 8-19 8:4-14:blue;18-34:red # the first thing we need is a so-called condvar aka "Condition Variables" or "merge point" =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 8-19 19:1-11:blue;14-17:red # here we will wait for the condvar to become "true" You could think of this as starting the event loop, but AFAIK that's not really true. Anyway, the code will run up to here and then wait (but without eating all your CPU as the while() loop..) =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 8-19 10:4-18:blue;22-33:red # here we define an IO event handler =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 8-19 11:4-5:red;12-19:blue 12:4-7:red;12-14:blue # it will watch read-events on STDIN =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 8-19 13:4-5:red;12-16:blue 16:4-5:blue # and call this callback if an event happens =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 8-19 14:7-11:blue;15-21:red # plain old read from STDIN =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 8-19 15:7-17:blue;20-24:red # but after reading we call send on the condvar This tells AnyEvent that this condition is now satisfied and it might continue. =for newslide =for include_fragment bin/anyevent_example.pl @c3e99e4d 10-21 21:1-5:red # i.e. we now print the name Now this seems to be a lot of hassle to just read from STDIN. So let's add some more events! =for newslide =for include_fragment bin/anyevent_example_2.pl @c3e99e4d 8-25 19:9-23:red # I've added an AnyEvent->timer =for newslide =for include_fragment bin/anyevent_example_2.pl @c3e99e4d 8-25 19:25-29:red;34-35:blue;37-44:red;49-50:blue # after one second, it will fire every second =for newslide =for include_fragment bin/anyevent_example_2.pl @c3e99e4d 8-25 20:5-7:red # and print some message Now we can enter data to STDIN and write some data B =for newslide Let's add more! =for newslide =for include_fragment bin/anyevent_example_3.pl @c3e99e4d 19-27 23:4-11:green;15-30:red;31-40:blue # Another timer, which fires once after 5 seconds =for newslide =for include_fragment bin/anyevent_example_3.pl @c3e99e4d 19-27 25:5-9:blue;14-25:green 26:5-21:red # it sets the name to some helpful string and calls send() on the condvar thus allowing the program to continue after line 29 =for newslide =for include_fragment bin/anyevent_example_3.pl @c3e99e4d 29-31 So now we can enter data to STDIN, write some data and have a timeout. Parallelism FTW! =head2 ZMQ::LibZMQ3 and AnyEvent Now let's combine AnyEvent with ZMQ::LibZMQ3 for proper asynchron, event-based handling of ZMQ messages. =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl @c3e99e4d 1-22 =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl @c3e99e4d 1-10 4:31-36:green 10:4-6:blue;10-23:red;35-40:green # we use ZMQ_FD and zmq_getsockopt() to get the file descriptor representing the socket This file descriptor will get read/write events when 0MQ accesses it. C and C are used to get/set various options in 0MQ sockets. (high water marks, timeouts, buffer sizes, ...) B Most options can only be set before the socket binds/connects! =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl @c3e99e4d 12-22 12:9-20:red # We use AnyEvent to get an IO event watcher =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl @c3e99e4d 12-22 13:5-6:red;13-15:blue 14:5-8:red;13-15:blue; # using the 0MQ file descriptor, polling on read events =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl @c3e99e4d 12-22 15:5-6:red;13-17:blue 20:5-6:red; # the code inside the callback stays the same =for newslide =for include_fragment bin/anyevent_req_rep_hello_world_server.pl @c3e99e4d 12-22 22:1-23:red # and we wait for a condvar that will never be sent i.e. wait here forever. Live Demo? note the low CPU load! =head2 PUB-SUB with multipart messages Now let's take a look at multipart message handling I will show a small PUB-SUB service. The publisher will publish random track names coming from my mp3 collection. The subscribers can listen to this stream and pick out only artists they are interested in. =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 1-23 =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 1-5 4:23-29:red # ZMQ_PUB because we want to set up a publisher =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 1-5 4:38-48:red # ZMQ_SNDMORE is needed for multipart messages =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 7-9 8:17-27:red;38-44:blue # we set up the PUB socket =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 11-12 12:12-25:red # get_mp3_data() collects my mp3s does some data munging and returns a list of songs =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 25-41 # boring but let's look at one element of C<@all> because that's what we're going to send... =for newslide [ 'bauchklang/jamzero/04_Shaker.mp3', 'bauchklang', 'jamzero', '04_Shaker.mp3' ]; =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 14-23 15:9-23:red;37-44:blue;49-51:green # we set up an AnyEvent Timer that is going to fire every 0.1 seconds =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 14-23 16:8-15:red;19-23:green;27-31:blue # we get some random MP3 file info =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 14-23 17:8-18:blue;22-24:red;26-34:green # we get the last frame of the multipart message =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 14-23 18:5-11:red;16-21:green 19:9-20:red;22-27:green;42-52:blue # we send all other frames using the ZMQ_SNDMORE flag =for newslide =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 14-23 21:5-16:red;18-28:blue # and send the last frame without ZMQ_SNDMORE This (the lack of ZMQ_SNDMORE) indicates 0MQ that the is the last part of the multipart message And that the message can now be transfered. =for newslide my @frames = qw(Hello Perl hackers at Betahaus); zmq_msg_send($frame[0], $publisher, ZMQ_SNDMORE); zmq_msg_send($frame[1], $publisher, ZMQ_SNDMORE); zmq_msg_send($frame[2], $publisher, ZMQ_SNDMORE); zmq_msg_send($frame[3], $publisher, ZMQ_SNDMORE); zmq_msg_send($frame[4], $publisher); =for newslide That's the whole publisher. =for include_fragment bin/anyevent_publisher.pl @c3e99e4d 1-23 =for newslide Now the subscriber: =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 1-27 =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 1-5 4:23-29:red # we want a subscriber socket, so we need ZMQ_SUB =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 1-5 4:38-50:red # ZMQ_SUBSCRIBE is needed to select which messages we're interested in =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 1-5 4:52-62:red # ZMQ_RCVMORE is needed to receive multipart messages =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 7-11 8:39-45:red # get a SUB socket =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 7-11 10:1-14:red;30-42:blue;45-52:green;57-58:darkgreen # we use zmq_setsockopt to set the SUBSCRIBE option to either the first command line argument or empty string =for newslide If we set a C value, the publisher will only sent messages to this subscriber which start with the given substring. ~$ anyevent_subscriber.pl the_streets Only get info about "The Streets" ~$ anyevent_subscriber.pl the_ Get info on all "The" bands =for newslide You can use that for some simple filtering So the first part of your message should represent some sort of hierarchy. Unfortunatly one cannot subscribe via regex. =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 13-26 13:9-20:red 14:13-15:blue 15:13-25:blue # we set up an AnyEvent handler catching IO read events on the 0MQ file descriptor =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 13-26 18:9-13:green;28-38:red # we get a raw message using zmq_recvmsg() =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 13-26 17:12-17:blue 19:19-24:blue;26-37:red # and use zmq_msg_data() to store this part in an array =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 13-26 20:18-31:red;46-56:blue # using zmq_getsockopt() we ask the socket if there are more parts to receive =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 13-26 20:17-17:red 21:17-19:red # if there are no more parts, we print the message =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 13-26 22:17-26:blue # and empty the message buffer And we're done with the subscriber =for newslide =for include_fragment bin/anyevent_subscriber.pl @c3e99e4d 1-27 =for newslide Live Demo! =for newslide That is all B cool! But B! C is very lowleve and C-like =for newslide Daisuke Maki has another dist called C "a Perl-ish wrapper for libzmq" "Personally, I'd recommend only using this module for your one-shot scripts, and use ZMQ::LibZMQ* for all other uses. YMMV." not very encouraging, so we wrote =head2 ZMQx::Class OO Interface to ZMQ Not on CPAN (yet) But on github https://github.com/domm/ZMQx-Class It removes a lot of the annoying low level stuff But should still be flexible enough to allow you to have fun with 0MQ =for newslide Key features are: =over =item * Easy setup of sockets =item * Easy setting & getting of sockopts =item * Easy sending & receiving of single- and multipart messages =item * Easy forking =item * AnyEvent helpers =back =head2 PUB-SUB with multipart messages & ZMQx::Class I will show a small PUB-SUB service. The publisher will publish random track names coming from my mp3 collection. The subscribers can listen to this stream and pick out only artists they are interested in. =head3 The Publisher using ZMQx::Class =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 1-16 =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 1-4 3:5-15:red # just load ZMQx::Class no more loading of lots of constants =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 1-4 4:5-12:red # AnyEvent for asynchron events =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 5-7 6:17-35:red;38-40:blue;44-47:green;51-63:green # We set up a PUB-socket on tcp://*:10001 set up socket & bind/connect in one go no more messing with C<$context> and C =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 8-9 9:4-9:blue;12-23:red; # get_mp3_data() collects my mp3s does some data munging and returns a list of songs =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 18-34 # boring but let's look at one element of C<@all> because that's what we're going to send... =for newslide [ 'bauchklang/jamzero/04_Shaker.mp3', 'bauchklang', 'jamzero', '04_Shaker.mp3' ]; =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 11-16 12:9-23:red;37-44:blue;49-51:green # we set up an AnyEvent Timer that is going to fire every 0.1 seconds =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 11-16 12:60-64:red 15:1-1:red # this is the code that will be executed everytime the callback fires =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 11-16 13:8-15:red;19-23:green;27-30:blue # we get some random MP3 file info =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 11-16 14:17-20:red;22-30:blue # and just send() the arrayref automatic handling of C =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 11-16 16:1-23:red; # this runs the AnyEvent event loop =for newslide That's the whole publisher. =for newslide =for include_fragment bin/zmqx_publisher.pl @c3e99e4d 1-16 =head3 The Subscriber using ZMQx::Class =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 1-16 =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 1-4 4:5-26:red # AnyEvent helper =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 6-7 6:17-36:red;39-41:blue;45-51:green;55-75:green # Set up a SUB socket And connect to tcp://localhost:10001 =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 6-7 7:18-26:red;28-35:blue;40-41:green # We subscribe to either the first argument passed from the command line or the empty string, i.e. all messages =for newslide If we set a C value, the publisher will only sent messages to this subscriber which start with the given substring. ~$ zmqx_subscriber.pl the_streets Only get info about "The Streets" ~$ zmqx_subscriber.pl the_ Get info on all "The" bands =for newslide You can use that for some simple filtering So the first part of your message should represent some sort of hierarchy. Unfortunatly one cannot subscribe via regex. =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 6-7 7:14-26:red # note the nice helper methods instead of annoying function calls =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 9-16 9:9-29:red;32-38:red;54-58:blue 15:1-2:blue # Set up an AnyEvent watcher with less hassle The code in the callback will be called everytime 0mq received a complete message =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 9-16 10:15-22:blue;39-45:red # we use a simple method to receive all frames of a multipart message =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 9-16 11:9-14:red # and print a short message And we're done with the subscriber =for newslide =for include_fragment bin/zmqx_subscriber.pl @c3e99e4d 1-16 =for newslide Live Demo! =head3 ZMQx::Class wrapup C is still ALPHA The API might change anytime but we're already using it in a few apps so it basically works we're currently working on some new apps that are going to use 0MQ a lot. =head2 A complex example A mongrel2-like distributed worker system =for newslide ################## # Need to get # ############################## ############### # something done # # Ventilator # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | control | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------> | ROUTER | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # **Need to get** # ############################## ############### # **something done** # # Ventilator # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | @@control@@ | # # | control | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | **REQ** |**--**%%task%%**-->**| ROUTER | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # Need to get # ############################## ############### # something done # # **Ventilator** # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | @@control@@ | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |**--**%%task%%**-->**| **ROUTER** | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # Need to get # ############################## ############### # something done # # **Ventilator** # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | @@control@@ | | @@fan@@ | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------->| ROUTER |%%task%%| PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # Need to get # ############################## ############### # something done # # **Ventilator** # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | control | | @@fan@@ | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------->| ROUTER | | **PUSH** |**--**%%task%%**-->**| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # Need to get # ############################## ############### # something done # # Ventilator # # **Worker** # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | control | | fan | # # | @@in@@ | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------->| ROUTER | | PUSH |**--**%%task%%**-->**| **PULL** | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # Need to get # ############################## ############### # something done # # Ventilator # # **Worker** # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | control | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------->| ROUTER | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # %%task%% # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # Need to get # ############################## ############### # something done # # Ventilator # # **Worker** # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | control | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------->| ROUTER | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | @@out@@ | # # |---------| # # |---------| # # | SUB | # # | **PUB** | # # | bind | # # | connect | # # '---------' # # '---------' # # **^** # # **|** # ##############**|**############### #######**|**####### **|** **|** **'----------**%%done%%**-------------'** =for newslide ################## # Need to get # ############################## ############### # something done # # **Ventilator** # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | control | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------->| ROUTER | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | @@sink@@ | # # | out | # # |---------| # # |---------| # # | **SUB** | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # **^** # # **|** # ##############**|**############### #######**|**####### **|** **|** **'----------**%%done%%**-------------'** =for newslide ################## # Need to get # ############################## ############### # something done # # **Ventilator** # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | control | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |-------->| ROUTER | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # %%done%% # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # Need to get # ############################## ############### # something done # # **Ventilator** # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | control | # # | @@control@@ | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | REQ |**<--**%%done%%**--**| **ROUTER** | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide ################## # **Need to get** # ############################## ############### # **something done** # # Ventilator # # Worker # #----------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # # .---------. # # | @@control@@ | # # | control | | fan | # # | in | # # |----------| # # |---------| |---------| # # |---------| # # | **REQ** |**<--**%%done%%**--**| ROUTER | | PUSH |-------->| PULL | # # | connect | # # | bind | | bind | # # | connect | # # '----------' # # '---------' '---------' # # '---------' # # # # # # # # # # # # # ################## # # # # # .---------. # # .---------. # # | sink | # # | out | # # |---------| # # |---------| # # | SUB | # # | PUB | # # | bind | # # | connect | # # '---------' # # '---------' # # ^ # # | # ##############|############### #######|####### | | '---------------------------' =for newslide Of course this whole setup only makes sense if we add more workers =for newslide ################# # Need to get # ############################## ############### # things done # # Ventilator # # Worker # #---------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # .->#.----..-----.#---. # | control | # # | control | | fan | # | #| in || out |# | # |----------| # # |---------| |---------| # | #'----''-----'# | # | REQ |-------->| ROUTER | | PUSH |----.-. ############### | # | connect | # # | bind | | bind | # | | | # '----------' # # '---------' '---------' # | | ############### | # # # # | | # Worker # | # # # # | | #-------------# | ################# # # | '->#.----..-----.#--.| # .---------. # | #| in || out |# || # | sink | # | #'----''-----'# || # |---------| # | ############### || # | SUB | # | || # | bind | # | ############### || # '---------' # | # Worker # || # ^ # | #-------------# || ###############|############## '--->#.----..-----.# -.| | #| in || out |# || | #'----''-----'# || | ############### || '--------------------------------------'| '--------------------------------------'| '---------------------------------------' ZeroMQ PUSH/PULL does fair queuing automatically =for newslide And we want more than one "client" =for newslide ################# # Need to get # ############################## ############### # things done # # Ventilator # # Worker # #---------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # .->#.----..-----.#---. # | control | # # | control | | fan | # | #| in || out |# | # |----------| # # |---------| |---------| # | #'----''-----'# | # | REQ | #--.--->| ROUTER | | PUSH |----.-. ############### | # | connect | # | # | bind | | bind | # | | | # '----------' # | # '---------' '---------' # | | ############### | # # | # # | | # Worker # | # # | # # | | #-------------# | ################# | # # | '->#.----..-----.#--.| | # .---------. # | #| in || out |# || ################# | # | sink | # | #'----''-----'# || # Need to get # | # |---------| # | ############### || # things done # | # | SUB | # | || #---------------# | # | bind | # | ############### || # .----------. # | # '---------' # | # Worker # || # | control | # | # ^ # | #-------------# || # |----------| # | ###############|############## '--->#.----..-----.# -.| # | REQ | #--' | #| in || out |# || # | connect | # | #'----''-----'# || # '----------' # | ############### || # # '--------------------------------------'| # # '--------------------------------------'| ################# '---------------------------------------' =for newslide We're now getting closer to this =for newslide =for img messaging_end.png =for newslide ################# # Need to get # ############################## ############### # things done # # Ventilator # # Worker # #---------------# #----------------------------# #-------------# # .----------. # # .---------. .---------. # .->#.----..-----.#---. # | control | # # | control | | fan | # | #| in || out |# | # |----------| # # |---------| |---------| # | #'----''-----'# | # | REQ | #--.--->| ROUTER | | PUSH |----.-. ############### | # | connect | # | # | bind | | bind | # | | | # '----------' # | # '---------' '---------' # | | ############### | # # | # # | | # Worker # | # # | # # | | #-------------# | ################# | # # | '->#.----..-----.#--.| | # .---------. # | #| in || out |# || ################# | # | sink | # | #'----''-----'# || # Need to get # | # |---------| # | ############### || # things done # | # | SUB | # | || #---------------# | # | bind | # | ############### || # .----------. # | # '---------' # | # Worker # || # | control | # | # ^ # | #-------------# || # |----------| # | ###############|############## '--->#.----..-----.# -.| # | REQ | #--' | #| in || out |# || # | connect | # | #'----''-----'# || # '----------' # | ############### || # # '--------------------------------------'| # # '--------------------------------------'| ################# '---------------------------------------' =for newslide The client has a task and wants to get it done A Task is a list of words We need to process each word In parallel, in the workers this will take some time ( = "cost" ) After all words are processed we return the processed list to the client =head3 The Worker =for include_code bin/zmqx_worker.pl @d72295310 =head3 The Client =for include_code bin/zmqx_client.pl @d72295310 =head3 The Ventilator =for include_code bin/zmqx_ventilator.pl @d72295310 =for newslide that's a bit much, lets break it up: =for newslide =for include_fragment bin/zmqx_ventilator.pl @d72295310 1-10 =for newslide =for include_fragment bin/zmqx_ventilator.pl @d72295310 14-36 =for newslide =for include_fragment bin/zmqx_ventilator.pl @d72295310 40-54 =head2 Resources 0MQ: http://www.zeromq.org/ The Guide: http://zguide.zeromq.org includes lots of example code in lots of languages ZMQ::LibZMQ3: https://metacpan.org/module/ZMQ::LibZMQ3 AnyEvent: http://anyevent.schmorp.de ZMQx::Class: https://github.com/domm/ZMQx-Class =head2 Questions?