ZeroMQ for distributed computing.
This post is influenced by ØMQ – The Guide By Pieter Hintjens and translation of codes in Haskell. I suggest you to read The Guide By Pieter Hintjens and if you are interested in Haskell code then you see these codes.
Request-Reply
The client sends “Accept hello from Client. ” to the server, which replies with “I got you.”. This Haskell server opens a ØMQ socket on port 5555, reads requests on it, and replies with “I got you” to each request.The REQ-REP socket pair is in lockstep. The client issues send and then receive, in a loop (or once if that’s all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or receive call.
import System.ZMQ3 import Control.Monad import qualified Data.ByteString.Char8 as BS import Data.ByteString.Lazy.Internal as BL import Data.IORef import Control.Concurrent ( threadDelay ) main = do c <- context s <- socket c Rep bind s "tcp://127.0.0.1:5555" counter <- newIORef 0 forever $ do t <- readIORef counter res <- receive s print res send' s [] ( BL.packChars $ "I got you. " ++ show t ) modifyIORef counter ( +1 ) threadDelay 10000 close s destroy c return ()
Client
import System.ZMQ3 import Control.Monad import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Internal as BL import Data.IORef import Control.Concurrent ( threadDelay ) main = do c <- context s <- socket c Req connect s "tcp://127.0.0.1:5555" counter <- newIORef 0 forever $ do t <- readIORef counter send' s [] ( BL.packChars $ "Accept hello from Client. " ++ show t ) msg <- receive s print msg modifyIORef counter ( +1 ) threadDelay 10000 return ()
Running a server with two clients.
Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./ZeroMqServer "Accept hello from Client. 0" "Accept hello from Client. 1" "Accept hello from Client. 2" "Accept hello from Client. 3" "Accept hello from Client. 4" "Accept hello from Client. 5" "Accept hello from Client. 6" "Accept hello from Client. 7" "Accept hello from Client. 8" "Accept hello from Client. 9" "Accept hello from Client. 10" "Accept hello from Client. 11" "Accept hello from Client. 12" "Accept hello from Client. 13" "Accept hello from Client. 14" "Accept hello from Client. 15" "Accept hello from Client. 0" "Accept hello from Client. 228" "Accept hello from Client. 1" "Accept hello from Client. 229" "Accept hello from Client. 2" "Accept hello from Client. 230" "Accept hello from Client. 3" "Accept hello from Client. 231" "Accept hello from Client. 4" "Accept hello from Client. 232" First client. Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./ZeroMqClient "I got you. 0" "I got you. 1" "I got you. 2" "I got you. 3" "I got you. 4" "I got you. 5" "I got you. 6" "I got you. 7" "I got you. 8" "I got you. 9" "I got you. 10" "I got you. 11" "I got you. 12" Second client Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./ZeroMqClient "I got you. 228" "I got you. 230" "I got you. 232" "I got you. 234" "I got you. 236" "I got you. 238" "I got you. 240" "I got you. 242" "I got you. 244" "I got you. 246" "I got you. 248" "I got you. 250" "I got you. 252" "I got you. 254" "I got you. 256" "I got you. 258" "I got you. 260" "I got you. 262" "I got you. 264" "I got you. 266"
Publish-Subscribe
Data publishing server which publishes weather data for zip codes in range 500 and 2000.
import System.ZMQ3 import Control.Monad import qualified Data.ByteString.Char8 as BS hiding ( putStrLn ) import qualified Data.ByteString.Lazy.Internal as BL import Control.Concurrent ( threadDelay ) import System.Random main = do c <- context publisher <- socket c Pub bind publisher "tcp://127.0.0.1:5556" bind publisher "ipc://weather.ipc" forever $ do zipcode <- randomRIO ( ( 500 , 2000 ) :: ( Int , Int ) ) temp <- randomRIO ( 10 , 45 ) :: IO Int relhum <- randomRIO ( 0 , 100 ) :: IO Int putStrLn $ show zipcode ++ " " ++ show temp ++ " " ++ show relhum send' publisher [] ( BL.packChars $ show zipcode ++ " " ++ show temp ++ " " ++ show relhum ) close publisher destroy c return ()
Client who is only interested in two zip codes.
import System.ZMQ3 import Control.Monad import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Internal as BL import Control.Concurrent ( threadDelay ) import System.Random main = do c <- context subscriber <- socket c Sub connect subscriber "tcp://127.0.0.1:5556" subscribe subscriber ( BS.pack "1000" ) subscribe subscriber ( BS.pack "1010" ) forever $ do update <- receive subscriber print update close subscriber destroy c
Our server is publishing lot of data but client is only interested in two zip codes.
Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./ZeroMqWeatherPubServer 1568 27 85 924 41 46 1461 15 46 1867 44 28 1013 23 100 1052 13 6 1720 45 6 1480 12 94 1852 33 4 1295 20 6 925 18 77 935 37 94 1670 11 6 1285 39 38 1613 44 99 1888 26 62 1011 21 45 993 45 45 1402 26 86 1639 13 65 1285 18 40 1960 38 18 1160 27 39 1374 16 59 665 25 22 Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./ZeroMqWeatherClient "1000 34 89" "1010 19 70" "1010 15 86" "1000 38 57" "1010 12 42" "1000 25 1" "1000 28 78" "1000 25 16" "1000 28 82" "1010 28 98" "1000 12 77" "1010 11 16" "1010 44 14" "1010 12 89" "1000 32 8" "1010 37 55" "1010 17 21" "1000 13 96" "1000 18 51" "1010 16 38" "1000 18 21" "1010 37 60" "1010 17 25" "1000 33 43" "1000 34 44" "1010 33 78" "1010 35 63" "1000 39 50" "1000 45 70" "1000 26 3" "1010 34 12" "1010 26 3" "1000 32 75" "1010 14 68" "1000 44 75" "1010 27 54" "1000 21 39" "1010 12 65" "1010 43 29" "1010 25 60"
Divide and Conquer
For this problem we will calculate the number of primes less than . Our ventilator will push the task to workers and they will perform the task. After finishing the job, they will send the result back to sink.
import System.ZMQ3 import Control.Monad import qualified Data.ByteString.Char8 as BS import Data.ByteString.Lazy.Internal as BL import Data.IORef import Control.Concurrent ( threadDelay ) main = do c <- context sender <- socket c Push bind sender "tcp://127.0.0.1:5557" sink <- socket c Push connect sink "tcp://127.0.0.1:5558" putStrLn "Press enter when workers are ready." _ <- getChar putStrLn "Sending the task to workers.\n" send' sink [] ( BL.packChars . show $ 0 ) forM_ [ 0..9999 ] $ \ i -> do putStrLn $ "Sending the range [ " ++ show ( 1000 * i + 1 ) ++ " .. " ++ show ( 1000 * i + 1 + 999 ) ++ "] to worker for primality testing." send' sender [] ( BL.packChars . show $ 1000 * i + 1 ) close sink close sender destroy c
Worker for computing the prime number.
import System.ZMQ3 import Control.Monad import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Internal as BL import Data.IORef import Control.Concurrent ( threadDelay ) import Data.Bits powM :: Integer -> Integer -> Integer -> Integer powM a d n = powM' a d n where powM' a d n | d == 0 = 1 | d == 1 = mod a n | otherwise = mod q n where p = powM' ( mod ( a^2 ) n ) ( shiftR d 1 ) n q = if (.&.) d 1 == 1 then mod ( a * p ) n else p calSd :: Integer -> ( Integer , Integer ) calSd n = ( s , d ) where s = until ( \x -> testBit ( n - 1) ( fromIntegral x ) ) ( +1 ) 0 d = div ( n - 1 ) ( shiftL 1 ( fromIntegral s ) ) rabinMiller::Integer->Integer->Integer->Integer-> Bool rabinMiller n s d a | n == a = True | otherwise = case x == 1 of True -> True _ -> any ( == pred n ) . take ( fromIntegral s ) . iterate (\e -> mod ( e^2 ) n ) $ x where x = powM a d n isPrime::Integer-> Bool isPrime n | n <= 1 = False | n == 2 = True | even n = False | otherwise = all ( == True ) . map ( rabinMiller n s d ) $ [ 2 , 3 , 5 , 7 , 11 , 13 , 17 ] where ( s , d ) = calSd n primeRange :: Integer -> Integer -> [ Bool ] primeRange m n = map isPrime [ m .. n ] main = do c <- context receiver <- socket c Pull connect receiver "tcp://127.0.0.1:5557" sender <- socket c Push connect sender "tcp://127.0.0.1:5558" forever $ do num <- receive receiver let n = read . BS.unpack $ num :: Integer let len = length . filter ( == True ) . primeRange n $ n + 999 putStrLn $ "Received range [ " ++ show n ++ " .. " ++ show ( n + 999 ) ++ " and number of primes in this range is " ++ show len send' sender [] ( BL.packChars . show $ len ) close receiver close sender destroy c
Sink for collecting the results.
import System.ZMQ3 import Control.Monad import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Internal as BL import Data.IORef import Control.Concurrent ( threadDelay ) main = do c <- context receiver <- socket c Pull bind receiver "tcp://127.0.0.1:5558" m <- receive receiver print m sum <- newIORef 0 forM_ [ 1.. 10000 ] $ \i -> do num <- receive receiver let n = read . BS.unpack $ num :: Integer putStrLn $ "Received a number " ++ show n ++ " from one of worker" modifyIORef sum ( + n ) t <- readIORef sum putStrLn $ "Total number of primes in the range is " ++ show t close receiver destroy c
Running Ventilator, 3 workers and sink
Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./TaskVent Sending the range [ 9992001 .. 9993000] to worker for primality testing. Sending the range [ 9993001 .. 9994000] to worker for primality testing. Sending the range [ 9994001 .. 9995000] to worker for primality testing. Sending the range [ 9995001 .. 9996000] to worker for primality testing. Sending the range [ 9996001 .. 9997000] to worker for primality testing. Sending the range [ 9997001 .. 9998000] to worker for primality testing. Sending the range [ 9998001 .. 9999000] to worker for primality testing. Sending the range [ 9999001 .. 10000000] to worker for primality testing. Task worker - 1 Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./TaskWorkder Received range [ 9978001 .. 9979000 and number of primes in this range is 60 Received range [ 9981001 .. 9982000 and number of primes in this range is 49 Received range [ 9984001 .. 9985000 and number of primes in this range is 69 Received range [ 9987001 .. 9988000 and number of primes in this range is 57 Received range [ 9990001 .. 9991000 and number of primes in this range is 60 Received range [ 9993001 .. 9994000 and number of primes in this range is 71 Received range [ 9996001 .. 9997000 and number of primes in this range is 58 Received range [ 9999001 .. 10000000 and number of primes in this range is 53 Task worker - 2 Mukeshs-MacBook-Pro:ZMQ mukeshtiwari$ ./TaskWorkder Received range [ 9982001 .. 9983000 and number of primes in this range is 69 Received range [ 9985001 .. 9986000 and number of primes in this range is 62 Received range [ 9988001 .. 9989000 and number of primes in this range is 58 Received range [ 9991001 .. 9992000 and number of primes in this range is 57 Received range [ 9994001 .. 9995000 and number of primes in this range is 64 Received range [ 9997001 .. 9998000 and number of primes in this range is 67 Task Worker - 3 Received range [ 9986001 .. 9987000 and number of primes in this range is 59 Received range [ 9989001 .. 9990000 and number of primes in this range is 58 Received range [ 9992001 .. 9993000 and number of primes in this range is 58 Received range [ 9995001 .. 9996000 and number of primes in this range is 62 Received range [ 9998001 .. 9999000 and number of primes in this range is 64 Sink Received a number 53 from one of worker Received a number 67 from one of worker Received a number 64 from one of worker Received a number 52 from one of worker Received a number 59 from one of worker Received a number 58 from one of worker Received a number 58 from one of worker Received a number 62 from one of worker Received a number 64 from one of worker Total number of primes in the range is 664579
First start all the workers and sink and then run ventilator. See more about distributed computing in Haskell Eden and distributed-process. If you have any suggestion then please let me know. Some of the contents and images are taken from Pieter Hintjens‘s tutorial by his permission.
[…] ZeroMQ for distributed computing […]
Pingback by Scott Banwart's Blog › Distributed Weekly 207 | May 17, 2013 |