# My Weblog

## ZeroMQ for distributed computing.

This post is influenced by ØMQ – The Guide By Pieter Hintjens and translation of codes in Haskell. I suggest you to read The Guide By Pieter Hintjens and if you are interested in Haskell code then you see these codes.

The client sends “Accept hello from Client. ” to the server, which replies with “I got you.”. This Haskell server opens a ØMQ socket on port 5555, reads requests on it, and replies with “I got you” to each request.The REQ-REP socket pair is in lockstep. The client issues send and then receive, in a loop (or once if that’s all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or receive call.

```
import System.ZMQ3
import qualified Data.ByteString.Char8 as BS
import Data.ByteString.Lazy.Internal as BL
import Data.IORef
main = do
c <- context
s <- socket c Rep
bind s "tcp://127.0.0.1:5555"
counter <- newIORef 0
forever \$ do
print res
send' s [] ( BL.packChars \$ "I got you. " ++ show t )
modifyIORef counter ( +1 )
close s
destroy c
return ()

```

Client

```import System.ZMQ3
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Internal as BL
import Data.IORef

main = do
c <- context
s <- socket c Req
connect s "tcp://127.0.0.1:5555"
counter <- newIORef 0
forever \$ do
send' s [] ( BL.packChars \$ "Accept hello from Client. " ++ show t )
print msg
modifyIORef counter ( +1 )
return ()

```

Running a server with two clients.

```
Mukeshs-MacBook-Pro:ZMQ mukeshtiwari\$ ./ZeroMqServer
"Accept hello from Client. 0"
"Accept hello from Client. 1"
"Accept hello from Client. 2"
"Accept hello from Client. 3"
"Accept hello from Client. 4"
"Accept hello from Client. 5"
"Accept hello from Client. 6"
"Accept hello from Client. 7"
"Accept hello from Client. 8"
"Accept hello from Client. 9"
"Accept hello from Client. 10"
"Accept hello from Client. 11"
"Accept hello from Client. 12"
"Accept hello from Client. 13"
"Accept hello from Client. 14"
"Accept hello from Client. 15"

"Accept hello from Client. 0"
"Accept hello from Client. 228"
"Accept hello from Client. 1"
"Accept hello from Client. 229"
"Accept hello from Client. 2"
"Accept hello from Client. 230"
"Accept hello from Client. 3"
"Accept hello from Client. 231"
"Accept hello from Client. 4"
"Accept hello from Client. 232"

First client.
Mukeshs-MacBook-Pro:ZMQ mukeshtiwari\$ ./ZeroMqClient
"I got you. 0"
"I got you. 1"
"I got you. 2"
"I got you. 3"
"I got you. 4"
"I got you. 5"
"I got you. 6"
"I got you. 7"
"I got you. 8"
"I got you. 9"
"I got you. 10"
"I got you. 11"
"I got you. 12"

Second client
Mukeshs-MacBook-Pro:ZMQ mukeshtiwari\$ ./ZeroMqClient
"I got you. 228"
"I got you. 230"
"I got you. 232"
"I got you. 234"
"I got you. 236"
"I got you. 238"
"I got you. 240"
"I got you. 242"
"I got you. 244"
"I got you. 246"
"I got you. 248"
"I got you. 250"
"I got you. 252"
"I got you. 254"
"I got you. 256"
"I got you. 258"
"I got you. 260"
"I got you. 262"
"I got you. 264"
"I got you. 266"
```

### Publish-Subscribe

Data publishing server which publishes weather data for zip codes in range 500 and 2000.

```import System.ZMQ3
import qualified Data.ByteString.Char8 as BS hiding ( putStrLn )
import qualified Data.ByteString.Lazy.Internal as BL
import System.Random

main = do
c <- context
publisher <- socket c Pub
bind publisher "tcp://127.0.0.1:5556"
bind publisher "ipc://weather.ipc"
forever \$ do
zipcode <- randomRIO ( ( 500 , 2000 ) ::  ( Int , Int ) )
temp <- randomRIO ( 10 , 45 ) :: IO Int
relhum <- randomRIO ( 0 , 100 ) :: IO Int
putStrLn \$ show zipcode ++ " " ++ show temp ++ " " ++ show relhum
send' publisher [] ( BL.packChars \$ show zipcode ++ " " ++ show temp ++
" " ++  show relhum )
close publisher
destroy c
return ()
```

Client who is only interested in two zip codes.

```import System.ZMQ3
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Internal as BL
import System.Random

main = do
c <- context
subscriber <- socket c Sub
connect subscriber "tcp://127.0.0.1:5556"
subscribe subscriber ( BS.pack "1000" )
subscribe subscriber ( BS.pack "1010" )
forever \$ do
print update

close subscriber
destroy c

```

Our server is publishing lot of data but client is only interested in two zip codes.

```

Mukeshs-MacBook-Pro:ZMQ mukeshtiwari\$ ./ZeroMqWeatherPubServer
1568 27 85
924 41 46
1461 15 46
1867 44 28
1013 23 100
1052 13 6
1720 45 6
1480 12 94
1852 33 4
1295 20 6
925 18 77
935 37 94
1670 11 6
1285 39 38
1613 44 99
1888 26 62
1011 21 45
993 45 45
1402 26 86
1639 13 65
1285 18 40
1960 38 18
1160 27 39
1374 16 59
665 25 22

Mukeshs-MacBook-Pro:ZMQ mukeshtiwari\$ ./ZeroMqWeatherClient
"1000 34 89"
"1010 19 70"
"1010 15 86"
"1000 38 57"
"1010 12 42"
"1000 25 1"
"1000 28 78"
"1000 25 16"
"1000 28 82"
"1010 28 98"
"1000 12 77"
"1010 11 16"
"1010 44 14"
"1010 12 89"
"1000 32 8"
"1010 37 55"
"1010 17 21"
"1000 13 96"
"1000 18 51"
"1010 16 38"
"1000 18 21"
"1010 37 60"
"1010 17 25"
"1000 33 43"
"1000 34 44"
"1010 33 78"
"1010 35 63"
"1000 39 50"
"1000 45 70"
"1000 26 3"
"1010 34 12"
"1010 26 3"
"1000 32 75"
"1010 14 68"
"1000 44 75"
"1010 27 54"
"1000 21 39"
"1010 12 65"
"1010 43 29"
"1010 25 60"
```

### Divide and Conquer

For this problem we will calculate the number of primes less than $10^{7}$. Our ventilator will push the task to workers and they will perform the task. After finishing the job, they will send the result back to sink.

Ventilator

```import System.ZMQ3
import qualified Data.ByteString.Char8 as BS
import Data.ByteString.Lazy.Internal as BL
import Data.IORef

main = do
c <- context
sender <- socket c Push
bind sender "tcp://127.0.0.1:5557"

sink <- socket c Push
connect sink "tcp://127.0.0.1:5558"

putStrLn "Press enter when workers are ready."
_ <- getChar
putStrLn "Sending the task to workers.\n"

send' sink [] ( BL.packChars . show \$ 0 )

forM_ [ 0..9999 ] \$ \ i -> do
putStrLn \$ "Sending the range [ " ++ show ( 1000 * i + 1 ) ++ " .. " ++ show  ( 1000 * i + 1 + 999 ) ++ "] to worker for primality testing."
send' sender [] ( BL.packChars . show \$ 1000 * i + 1 )

close sink
close sender
destroy c

```

Worker for computing the prime number.

```import System.ZMQ3
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Internal as BL
import Data.IORef
import Data.Bits

powM :: Integer -> Integer -> Integer ->  Integer
powM a d n = powM' a d n where
powM' a d n
| d == 0 = 1
| d == 1 = mod a n
| otherwise = mod q n  where
p = powM'  ( mod ( a^2 ) n ) ( shiftR d 1 ) n
q = if (.&.) d 1 == 1 then mod ( a * p ) n else p

calSd :: Integer -> ( Integer , Integer )
calSd n = ( s , d ) where
s = until ( \x -> testBit ( n - 1) ( fromIntegral x ) )  ( +1 ) 0
d = div ( n - 1 ) (  shiftL 1 ( fromIntegral s )  )

rabinMiller::Integer->Integer->Integer->Integer-> Bool
rabinMiller  n s d a
| n == a = True
| otherwise =  case x == 1 of
True -> True
_ ->   any ( == pred n ) . take ( fromIntegral s )
. iterate (\e -> mod ( e^2 ) n ) \$ x
where
x = powM a d n

isPrime::Integer-> Bool
isPrime n
| n <= 1 = False
| n == 2 = True
| even n = False
| otherwise  = all ( == True ) . map ( rabinMiller n s d ) \$  [ 2 , 3 , 5 , 7 , 11 , 13 , 17 ] where
( s , d ) = calSd n

primeRange :: Integer -> Integer -> [ Bool ]
primeRange m n = map isPrime [ m .. n ]

main = do
c <- context

sender <- socket c Push
connect sender "tcp://127.0.0.1:5558"

forever \$ do
let n = read . BS.unpack \$ num  :: Integer
let len = length . filter ( == True ) . primeRange  n  \$  n + 999
putStrLn \$ "Received range [ " ++  show n ++ " .. " ++ show ( n + 999 ) ++ "  and number of primes in this range is " ++ show len
send' sender [] ( BL.packChars . show \$ len  )

close sender
destroy c

```

Sink for collecting the results.

```import System.ZMQ3
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Internal as BL
import Data.IORef

main = do
c <- context

print m
sum <- newIORef  0
forM_ [ 1.. 10000 ] \$ \i -> do
let n = read . BS.unpack \$ num :: Integer
putStrLn \$ "Received a number " ++ show n ++ " from one of worker"
modifyIORef sum ( + n )

putStrLn \$ "Total number of primes in the range is " ++ show t
destroy c
```

Running Ventilator, 3 workers and sink

```Mukeshs-MacBook-Pro:ZMQ mukeshtiwari\$ ./TaskVent
Sending the range [ 9992001 .. 9993000] to worker for primality testing.
Sending the range [ 9993001 .. 9994000] to worker for primality testing.
Sending the range [ 9994001 .. 9995000] to worker for primality testing.
Sending the range [ 9995001 .. 9996000] to worker for primality testing.
Sending the range [ 9996001 .. 9997000] to worker for primality testing.
Sending the range [ 9997001 .. 9998000] to worker for primality testing.
Sending the range [ 9998001 .. 9999000] to worker for primality testing.
Sending the range [ 9999001 .. 10000000] to worker for primality testing.

Received range [ 9978001 .. 9979000  and number of primes in this range is 60
Received range [ 9981001 .. 9982000  and number of primes in this range is 49
Received range [ 9984001 .. 9985000  and number of primes in this range is 69
Received range [ 9987001 .. 9988000  and number of primes in this range is 57
Received range [ 9990001 .. 9991000  and number of primes in this range is 60
Received range [ 9993001 .. 9994000  and number of primes in this range is 71
Received range [ 9996001 .. 9997000  and number of primes in this range is 58
Received range [ 9999001 .. 10000000  and number of primes in this range is 53

Received range [ 9982001 .. 9983000  and number of primes in this range is 69
Received range [ 9985001 .. 9986000  and number of primes in this range is 62
Received range [ 9988001 .. 9989000  and number of primes in this range is 58
Received range [ 9991001 .. 9992000  and number of primes in this range is 57
Received range [ 9994001 .. 9995000  and number of primes in this range is 64
Received range [ 9997001 .. 9998000  and number of primes in this range is 67
Received range [ 9986001 .. 9987000  and number of primes in this range is 59
Received range [ 9989001 .. 9990000  and number of primes in this range is 58
Received range [ 9992001 .. 9993000  and number of primes in this range is 58
Received range [ 9995001 .. 9996000  and number of primes in this range is 62
Received range [ 9998001 .. 9999000  and number of primes in this range is 64

Sink
Received a number 53 from one of worker
Received a number 67 from one of worker
Received a number 64 from one of worker
Received a number 52 from one of worker
Received a number 59 from one of worker
Received a number 58 from one of worker
Received a number 58 from one of worker
Received a number 62 from one of worker
Received a number 64 from one of worker
Total number of primes in the range is 664579
```

First start all the workers and sink and then run ventilator. See more about distributed computing in Haskell Eden and distributed-process. If you have any suggestion then please let me know. Some of the contents and images are taken from Pieter Hintjens‘s tutorial by his permission.

May 13, 2013 -

## 1 Comment »

1. […] ZeroMQ for distributed computing […]

Pingback by Scott Banwart's Blog › Distributed Weekly 207 | May 17, 2013 | Reply