Why ZeroMQ?
ZeroMQ (ØMQ) is a high-performance asynchronous messaging library that acts as a concurrency framework. It provides sockets that carry atomic messages across various transports with no broker required.
Prerequisites
Development
- • 1+ CPU cores
- • 512 MB RAM minimum
- • 5 GB SSD storage
- • Ubuntu 20.04/22.04 LTS
Production
- • 2+ CPU cores
- • 2+ GB RAM recommended
- • 20+ GB NVMe SSD
- • Low-latency network
Initial Server Setup
Update system and install build dependencies:
# Update system packages
sudo apt update && sudo apt upgrade -y
# Install build essentials
sudo apt install -y build-essential pkg-config cmake
# Install additional dependencies
sudo apt install -y libtool autoconf automake
# Set timezone (adjust as needed)
sudo timedatectl set-timezone America/New_YorkInstallation
Install from Package Manager
The easiest way to install ZeroMQ on Ubuntu:
# Install ZeroMQ library and development headers
sudo apt install -y libzmq3-dev
# Verify installation
pkg-config --modversion libzmq
# Install additional tools
sudo apt install -y libczmq-dev # High-level C bindingBuild from Source (Optional)
For the latest version, build from source:
# Install dependencies
sudo apt install -y libsodium-dev
# Clone repository
cd /tmp
git clone https://github.com/zeromq/libzmq.git
cd libzmq
# Build and install
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release -DWITH_LIBSODIUM=ON
make -j$(nproc)
sudo make install
# Update library cache
sudo ldconfig
# Verify installation
ldconfig -p | grep zmqNote: Building from source provides the latest features and security patches, plus enables libsodium for CurveZMQ encryption.
Verify Installation
Test the installation with a simple program:
// Save as test_zmq.c
#include <zmq.h>
#include <stdio.h>
int main() {
int major, minor, patch;
zmq_version(&major, &minor, &patch);
printf("ZeroMQ version: %d.%d.%d\n", major, minor, patch);
void *context = zmq_ctx_new();
if (context) {
printf("ZeroMQ context created successfully!\n");
zmq_ctx_destroy(context);
}
return 0;
}# Compile
gcc test_zmq.c -o test_zmq -lzmq
# Run
./test_zmqLanguage Bindings
Python (pyzmq)
Install the Python binding:
# Using pip
pip3 install pyzmq
# Or with system package manager
sudo apt install -y python3-zmq
# Verify installation
python3 -c "import zmq; print(f'pyzmq version: {zmq.__version__}')"
python3 -c "import zmq; print(f'libzmq version: {zmq.zmq_version()}')" Node.js (zeromq)
Install the Node.js binding:
# Ensure Node.js is installed
curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash -
sudo apt install -y nodejs
# Install zeromq binding
npm install zeromq
# For older API compatibility
npm install zeromq@5Go (zmq4)
Install the Go binding:
# Ensure Go is installed
sudo apt install -y golang-go
# Set up Go environment
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
# Install zmq4 binding
go get github.com/pebbe/zmq4
# For CGO, ensure pkg-config finds libzmq
pkg-config --cflags --libs libzmqRust (zmq)
Add ZeroMQ to your Rust project:
[dependencies]
zmq = "0.10"# Install Rust if needed
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
source $HOME/.cargo/env
# Build your project
cargo buildSocket Patterns
Core Socket Types
REQ-REP Pattern
Request-Reply Server (Python)
Create a simple echo server:
#!/usr/bin/env python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
print("Server listening on port 5555...")
while True:
# Wait for request from client
message = socket.recv_string()
print(f"Received: {message}")
# Send reply back to client
socket.send_string(f"Echo: {message}")Request-Reply Client (Python)
Create a client to send requests:
#!/usr/bin/env python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for i in range(10):
message = f"Hello {i}"
print(f"Sending: {message}")
socket.send_string(message)
reply = socket.recv_string()
print(f"Received: {reply}")# Terminal 1: Start server
python3 server.py
# Terminal 2: Run client
python3 client.pyPUB-SUB Pattern
Publisher (Python)
Create a weather data publisher:
#!/usr/bin/env python3
import zmq
import random
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
print("Publisher started on port 5556...")
while True:
# Generate random weather data
zipcode = random.randint(10000, 10010)
temperature = random.randint(-10, 35)
humidity = random.randint(10, 90)
message = f"{zipcode} {temperature} {humidity}"
socket.send_string(message)
time.sleep(0.1) # 10 updates per secondSubscriber (Python)
Create a subscriber for specific topics:
#!/usr/bin/env python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
# Subscribe to zipcode 10001
zipcode_filter = "10001"
socket.setsockopt_string(zmq.SUBSCRIBE, zipcode_filter)
print(f"Subscribed to weather updates for {zipcode_filter}")
# Collect 10 updates
total_temp = 0
for update_nbr in range(10):
message = socket.recv_string()
zipcode, temperature, humidity = message.split()
total_temp += int(temperature)
print(f"Update {update_nbr + 1}: {temperature}°C, {humidity}% humidity")
print(f"Average temperature: {total_temp / 10}°C")Note: Subscribers may miss initial messages due to the "slow joiner" syndrome. Start the publisher before subscribers, or implement a synchronization mechanism.
PUSH-PULL Pattern
Task Ventilator
Distribute tasks to workers:
#!/usr/bin/env python3
import zmq
import random
import time
context = zmq.Context()
# Socket to send tasks to workers
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")
# Socket to send start signal to sink
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print("Press Enter when workers are ready...")
input()
print("Sending tasks to workers...")
# Signal start
sink.send_string("0")
# Send 100 tasks
total_msec = 0
for task_nbr in range(100):
workload = random.randint(1, 100)
total_msec += workload
sender.send_string(str(workload))
print(f"Total expected cost: {total_msec} msec")Worker
Process tasks in parallel:
#!/usr/bin/env python3
import zmq
import time
context = zmq.Context()
# Socket to receive tasks
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send results
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
print("Worker ready...")
while True:
# Receive task
message = receiver.recv_string()
workload = int(message)
# Simulate work
time.sleep(workload / 1000.0)
# Send result to sink
sender.send_string("")
print(".", end="", flush=True)Result Sink
Collect results from workers:
#!/usr/bin/env python3
import zmq
import time
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start signal
receiver.recv_string()
print("Collecting results...")
start_time = time.time()
# Collect 100 results
for task_nbr in range(100):
receiver.recv_string()
if task_nbr % 10 == 0:
print(":", end="", flush=True)
else:
print(".", end="", flush=True)
elapsed = (time.time() - start_time) * 1000
print(f"\nTotal elapsed time: {elapsed:.0f} msec")# Terminal 1: Start sink
python3 sink.py
# Terminal 2-4: Start workers (run multiple)
python3 worker.py
# Terminal 5: Start ventilator
python3 ventilator.pyAdvanced Patterns
ROUTER-DEALER (Async Server)
Build an asynchronous multi-client server:
#!/usr/bin/env python3
import zmq
import threading
import time
def worker_routine(context, worker_url):
"""Worker thread that processes requests"""
socket = context.socket(zmq.REP)
socket.connect(worker_url)
while True:
message = socket.recv_string()
print(f"Worker received: {message}")
time.sleep(1) # Simulate work
socket.send_string(f"Processed: {message}")
context = zmq.Context()
# Frontend facing clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Backend facing workers
backend = context.socket(zmq.DEALER)
backend.bind("inproc://workers")
# Start worker threads
for i in range(4):
thread = threading.Thread(target=worker_routine, args=(context, "inproc://workers"))
thread.daemon = True
thread.start()
print("Async server started with 4 workers...")
# Use zmq.proxy to connect frontend to backend
zmq.proxy(frontend, backend)Pub-Sub with Envelope
Multi-topic publishing with envelopes:
#!/usr/bin/env python3
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5560")
print("Envelope publisher started...")
while True:
# Topic A
socket.send_multipart([
b"topic.a",
b"Message for topic A"
])
# Topic B
socket.send_multipart([
b"topic.b",
b"Message for topic B"
])
time.sleep(1)#!/usr/bin/env python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5560")
# Subscribe to topic.a only
socket.subscribe(b"topic.a")
print("Subscribed to topic.a...")
while True:
[topic, message] = socket.recv_multipart()
print(f"[{topic.decode()}] {message.decode()}")Security
CurveZMQ Encryption
Enable elliptic curve encryption:
# Python script to generate keys
python3 << 'EOF'
import zmq.auth
# Generate server keypair
server_public, server_secret = zmq.curve_keypair()
print(f"Server public key: {server_public.decode()}")
print(f"Server secret key: {server_secret.decode()}")
# Generate client keypair
client_public, client_secret = zmq.curve_keypair()
print(f"Client public key: {client_public.decode()}")
print(f"Client secret key: {client_secret.decode()}")
EOFSecure Server
Configure server with CurveZMQ:
#!/usr/bin/env python3
import zmq
import zmq.auth
from zmq.auth.thread import ThreadAuthenticator
context = zmq.Context()
# Start authenticator
auth = ThreadAuthenticator(context)
auth.start()
auth.allow('127.0.0.1') # Allow localhost
auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
# Server keys (generate your own!)
server_secret = b"your-server-secret-key-here-32-bytes!!"
server_public = b"your-server-public-key-here-32-bytes!!"
socket = context.socket(zmq.REP)
socket.curve_secretkey = server_secret
socket.curve_publickey = server_public
socket.curve_server = True
socket.bind("tcp://*:5561")
print("Secure server started...")
while True:
message = socket.recv_string()
print(f"Received: {message}")
socket.send_string(f"Secure reply: {message}")
auth.stop()Firewall Configuration
Configure UFW for ZeroMQ:
# Allow SSH
sudo ufw allow ssh
# Allow specific ZeroMQ ports (adjust as needed)
sudo ufw allow 5555/tcp # REQ-REP
sudo ufw allow 5556/tcp # PUB-SUB
sudo ufw allow 5557/tcp # PUSH
sudo ufw allow 5558/tcp # PULL
# Restrict to specific IPs in production
sudo ufw allow from 192.168.1.0/24 to any port 5555
# Enable firewall
sudo ufw enable
sudo ufw status verbosePerformance Tuning
Socket Options
Optimize socket performance:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
# High water mark (message queue limit)
socket.setsockopt(zmq.SNDHWM, 100000) # Send high water mark
socket.setsockopt(zmq.RCVHWM, 100000) # Receive high water mark
# Linger period (ms to wait for pending messages on close)
socket.setsockopt(zmq.LINGER, 1000)
# TCP keepalive
socket.setsockopt(zmq.TCP_KEEPALIVE, 1)
socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 300)
# Reconnect interval
socket.setsockopt(zmq.RECONNECT_IVL, 100) # Initial: 100ms
socket.setsockopt(zmq.RECONNECT_IVL_MAX, 5000) # Max: 5 seconds
# Buffer sizes
socket.setsockopt(zmq.SNDBUF, 65536)
socket.setsockopt(zmq.RCVBUF, 65536)Context Tuning
Optimize context for high throughput:
import zmq
# Create context with more I/O threads
context = zmq.Context()
context.set(zmq.IO_THREADS, 4) # Default is 1
# For high-throughput applications
context.set(zmq.MAX_SOCKETS, 10000) # Increase max sockets
# Check current settings
print(f"I/O threads: {context.get(zmq.IO_THREADS)}")
print(f"Max sockets: {context.get(zmq.MAX_SOCKETS)}")System Limits
Increase system limits for high-performance:
# Increase file descriptor limits
sudo nano /etc/security/limits.conf
# Add:
* soft nofile 65536
* hard nofile 65536
# Increase socket buffer sizes
sudo sysctl -w net.core.rmem_max=16777216
sudo sysctl -w net.core.wmem_max=16777216
sudo sysctl -w net.core.rmem_default=1048576
sudo sysctl -w net.core.wmem_default=1048576
# Make persistent
echo "net.core.rmem_max=16777216" | sudo tee -a /etc/sysctl.conf
echo "net.core.wmem_max=16777216" | sudo tee -a /etc/sysctl.conf
# Apply
sudo sysctl -pMonitoring
Socket Monitoring
Monitor socket events:
#!/usr/bin/env python3
import zmq
def monitor_socket(socket, monitor_socket):
"""Monitor socket events"""
EVENT_MAP = {
zmq.EVENT_CONNECTED: "CONNECTED",
zmq.EVENT_CONNECT_DELAYED: "CONNECT_DELAYED",
zmq.EVENT_CONNECT_RETRIED: "CONNECT_RETRIED",
zmq.EVENT_LISTENING: "LISTENING",
zmq.EVENT_BIND_FAILED: "BIND_FAILED",
zmq.EVENT_ACCEPTED: "ACCEPTED",
zmq.EVENT_ACCEPT_FAILED: "ACCEPT_FAILED",
zmq.EVENT_CLOSED: "CLOSED",
zmq.EVENT_CLOSE_FAILED: "CLOSE_FAILED",
zmq.EVENT_DISCONNECTED: "DISCONNECTED",
}
while True:
event = zmq.utils.monitor.recv_monitor_message(monitor_socket)
event_type = event['event']
event_name = EVENT_MAP.get(event_type, f"UNKNOWN({event_type})")
print(f"Event: {event_name}, Address: {event['endpoint']}")
context = zmq.Context()
socket = context.socket(zmq.REQ)
# Enable monitoring
monitor = socket.get_monitor_socket()
# Start monitoring in a thread
import threading
monitor_thread = threading.Thread(target=monitor_socket, args=(socket, monitor))
monitor_thread.daemon = True
monitor_thread.start()
socket.connect("tcp://localhost:5555")Metrics Collection
Collect and expose metrics:
#!/usr/bin/env python3
import zmq
import time
import json
context = zmq.Context()
# Main socket
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
# Metrics socket
metrics = context.socket(zmq.PUB)
metrics.bind("tcp://*:5565")
message_count = 0
start_time = time.time()
while True:
message = socket.recv_string()
message_count += 1
socket.send_string(f"Reply {message_count}")
# Publish metrics every 100 messages
if message_count % 100 == 0:
elapsed = time.time() - start_time
rate = message_count / elapsed
metrics_data = json.dumps({
"total_messages": message_count,
"messages_per_second": round(rate, 2),
"uptime_seconds": round(elapsed, 2)
})
metrics.send_string(f"metrics {metrics_data}")Troubleshooting
Common Issues
Messages Not Received (Slow Joiner)
# Problem: Subscriber misses first messages
# Solution: Add synchronization or delay
import zmq
import time
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:5556")
# Wait for subscribers to connect
time.sleep(1) # Simple fix
# Or use explicit synchronization:
sync = context.socket(zmq.REP)
sync.bind("tcp://*:5557")
sync.recv() # Wait for subscriber ready signal
sync.send(b"") # Confirm
# Now start publishingAddress Already in Use
# Check what's using the port
sudo lsof -i :5555
sudo netstat -tlnp | grep 5555
# Kill the process if needed
sudo kill -9 <PID>
# Or use SO_REUSEADDR equivalent in ZMQ
# (ZMQ handles this automatically for TCP)Memory Leaks
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
try:
# Your code here
socket.connect("tcp://localhost:5555")
# ...
finally:
# Always clean up!
socket.close()
context.term()
# Or use context managers (Python 3):
with zmq.Context() as context:
with context.socket(zmq.REQ) as socket:
socket.connect("tcp://localhost:5555")
# Socket auto-closes when block exitsHigh Latency
# Disable Nagle's algorithm
socket.setsockopt(zmq.TCP_NODELAY, 1)
# Use IPC instead of TCP for local communication
socket.bind("ipc:///tmp/mysocket")
# Use inproc for same-process communication
socket.bind("inproc://myendpoint")
# Reduce high water mark for low-latency
socket.setsockopt(zmq.SNDHWM, 1)
socket.setsockopt(zmq.RCVHWM, 1)Debug Logging
Enable ZeroMQ debug output:
# Set environment variable for verbose output
export ZMQ_DEBUG=1
# Run your application
python3 your_app.py
# For even more detail (Linux)
strace -e trace=network python3 your_app.py 2>&1 | head -100