-
Renamed
Socket#_attach→#attach_endpointsand#_init_engine→#init_engine. Both are now public so plugin gems can call them without reaching into private API. Internal callers updated. -
Routing registry exposed via
Routing.registry.omq.rb'sfreeze_for_ractors!no longer reaches in viainstance_variable_get.
-
Test helper deadlock.
Kernel#Asyncoverride intest_helper.rbwas wrapping everyAsync doblock in awith_timeout, including the reactor thread's own root task. With a 1s timeout the reactor task died mid-suite and subsequentReactor.runcalls hung forever. The override now only wraps blocks running on the main thread. -
wait_connectedtest helper usesAsync::Barrierfor parallel fork-join across all sockets instead of a sequentialAsync{}array. -
examples/zguide/03_pipeline.rbflake. The example sent 20 tasks to 3 PUSH workers and asserted that all three got some — but PUSH work-stealing on inproc lets the first pump fiber to wake grab a whole batch (256 messages) before yielding, so worker-0 always took everything. Fixed by waiting on each worker'speer_connectedpromise viaAsync::Barrierand bumping the burst above one pump's batch cap.
-
Documented work-stealing as a deviation from libzmq. README routing tables now say "Work-stealing" instead of "Round-robin" for PUSH/REQ/DEALER/SCATTER/CLIENT, with a callout explaining the burst-vs-steady distribution behavior. DESIGN.md's "Per-socket HWM" section gained a user-visible-consequence note covering the same.
-
Lifecycle boundary docs.
ConnectionLifecycleandSocketLifecyclenow carry explicit class-level comments delimiting their scopes (per-connection arc vs. per-socket state) and referencing each other. -
API doc fill-in. Added missing YARD comments on
RecvPump::FAIRNESS_MESSAGES/FAIRNESS_BYTES,RecvPump#start_with_transform/#start_direct, severalFanOutsend-pump methods, and the TCP/IPCapply_buffer_sizeshelpers. -
Engine#drain_send_queuesflagged with TODO. The 1 ms busy-poll is non-trivial to fix cleanly (needs a "queue fully drained" signal threaded through every routing strategy), so it's marked rather than reworked here.
- Linger drain missed in-flight messages.
RoundRobin#send_queues_drained?now tracks an@in_flightcounter for messages dequeued by pump fibers but not yet written. Previously, linger could tear down connections while pumps still held unwritten batches, dropping messages silently.
-
Message parts coerced via
#to_s.#frozen_binarynow calls#to_sinstead of#to_str, sonilbecomes an empty frame and integers/symbols are converted automatically. A cachedEMPTY_PARTavoids allocations for nil parts. -
Reduced allocations on hot paths.
freeze_messageshort-circuits when all parts are already frozen binary (zero-alloc fast path).write_batchpasses the batch directly instead of.map-ing throughtransform_send— only REQ overrides the transform and it never batches. Up to +55% throughput on small messages (PUSH/PULL IPC 64B).
- Silence Async warning on handshake timeout.
spawn_connectionnow rescuesAsync::TimeoutErrorso a timed-out ZMTP handshake doesn't emit an "unhandled exception" warning from the Async task.
- Handshake timeout.
ConnectionLifecycle#handshake!now wraps the ZMTP greeting/handshake exchange with a timeout (reconnect interval, floor 0.5s). Prevents a hang when a non-ZMQ service accepts the TCP connection but never sends a ZMTP greeting (e.g. macOS AirPlay Receiver on port 5000). On timeout the connection tears down withreconnect: true, so the retry loop picks up.
- Connect timeout at the transport level.
TCP.connectnow uses::Socket.tcp(host, port, connect_timeout:)instead ofTCPSocket.new. The timeout is derived from the reconnect interval (floor 0.5s). Fixes a hang on macOS where a non-blocking IPv6connect(2)to::1via kqueue never deliversECONNREFUSEDwhen nothing is listening —TCPSocket.newblocked the fiber indefinitely becauseAsync::Task#with_timeoutcannot interrupt C-level blocking calls.::Socket.tcpuses kernel-levelconnect_timeout:which works regardless of the scheduler.
- Connect timeout in reconnect loop. Each connect attempt is now
capped at the reconnect interval (floor 0.5s) via
Async::Task#with_timeout. Fixes a hang on macOS where a non-blocking IPv6connect(2)to::1via kqueue never deliversECONNREFUSEDwhen nothing is listening — the fiber would block indefinitely, stalling the entire reconnect loop.
- Extracted
Reconnect#retry_loop. The reconnect retry loop is now a separate private method, keeping#runfocused on task spawning and error handling.
- Reconnect after handshake failure. When a peer RST'd a TCP
connection mid-ZMTP-handshake (e.g.
LINGER 0close against an in-flight connect),ConnectionLifecycle#handshake!calledtransition!(:closed)directly, bypassingtear_down!and itsmaybe_reconnectcall.spawn_connection'sensure close!then saw the state already:closedand did nothing — the endpoint died silently with no reconnect ever scheduled. Now the handshake rescue goes throughtear_down!(reconnect: true), emitting:disconnectedand scheduling reconnect like any other connection loss.
- Reconnect sleeps are wall-clock quantized.
Engine::Reconnectnow sleeps until the nextdelay-sized grid tick instead ofdelayfrom now (same math asAsync::Loop.quantized). Multiple clients reconnecting with the same interval wake up at the same instant, collapsing staggered retries into aligned waves — easier to reason about for observability and cache-warmup, and a server coming back up sees one batch of accepts instead of a smear. Wall-clock (not monotonic) on purpose: the grid has to line up across processes. Anti-jitter by design. Exponential backoff still works: each iteration quantizes to its own (growing) interval's grid, and clients at the same backoff stage still align with each other.
Readable#receiveno longer prefetches a batch. Each#receivecall dequeues exactly one message from the engine recv queue. The per-socket prefetch buffer (@recv_buffer+@recv_mutex) anddequeue_recv_batchare gone, along withReadable::RECV_BATCH_SIZE. Simpler code; ~5–10% inproc microbench regression accepted (tcp/ipc unchanged — wire I/O dominates dispatch overhead).
-
Socket-level
Async::Barrierand cascading teardown.SocketLifecyclenow owns anAsync::Barrierthat tracks every socket-scoped task — connection supervisors, pumps, accept loops, reconnect loops, heartbeat, maintenance.Engine#closeand the newEngine#stopstop this single barrier and every descendant unwinds in one call, so the ordering of:disconnected/all_peers_gone/maybe_reconnectside effects no longer depends on which pump happens to observe the disconnect first. -
Socket#stop— immediate hard stop that skips the linger drain and goes straight to the barrier cascade. Complements#closefor crash-path cleanup. -
parent:kwarg onSocket#bind/Socket#connect. Accepts any object responding to#async(Async::Task,Async::Barrier,Async::Semaphore). The socket-level barrier is constructed with the caller's parent, so every task spawned under the socket lives under the caller's Async tree — standard Async idiom for letting callers coordinate teardown of internal tasks with their own work.
-
macOS: PUSH fails to reconnect after peer rebinds (and analogous races on any platform where the send pump observes the disconnect before the recv pump does). The send pump's
rescue EPIPEcalledconnection_lost(conn)→tear_down!→routing.connection_removed→.stopon@conn_send_tasks[conn]— which was the currently- running send pump.Task#stopon self raisesAsync::Cancelsynchronously and unwinds throughtear_down!mid-sequence, before:disconnectedemission andmaybe_reconnect, leaving the socket stuck with no reconnect scheduled. Root-caused from aruby -dtrace showingEPIPEatbuffered.rb:112immediately followed byAsync::Cancelattask.rb:358"Cancelling current task!".Fix: introduce a per-connection
Async::Barrierand a supervisor task placed on the socket barrier (not the per-conn one) that blocks on@barrier.wait { |t| t.wait; break }and runslost!in itsensure. Pumps now just exit onEPIPE/EOFError/ ZMTP errors — they never initiate teardown from inside themselves, soTask#stop-on-self is structurally impossible. All three shutdown paths (peer disconnect,#close,#stop) converge on the same orderedtear_down!sequence. -
DESIGN.mdsynced with post-barrier-refactor reality. Rewrote the Task tree and Engine lifecycle sections to reflect the socket- levelAsync::Barrier, per-connection nested barrier, supervisor pattern,Socket#stop, and user-providedparent:kwarg. Added a new Cancellation safety subsection documenting that wire writes in protocol-zmtp are wrapped inAsync::Task#defer_cancelso cascade teardown during a mid-frame write can't desync the peer's framer. -
IPC connect to an existing
SOCK_DGRAMsocket file now surfaces as a connect-time failure with backoff retry instead of crashing the pump.Errno::EPROTOTYPEadded toCONNECTION_FAILED(notCONNECTION_LOST— it's a connect() error, not an established- connection drop). Consistent with howECONNREFUSEDis treated for TCP: the endpoint is misconfigured or not ready, the socket keeps trying, and the user sees:connect_retriedmonitor events.
-
Work-stealing send pump fairness.
RoundRobin#start_conn_send_pumphad no fiber yield between batches.write_batchtypically completes without yielding when the kernel TCP buffer absorbs the whole batch, so the first pump to wake could drain a pre-filled send queue in one continuous run — starving peer pumps until the queue was empty. This was visible as a flakypush_pull_test.rb#test_0002 distributes messages across multiple PULL peerson CI, where the second peer received zero messages. AddedAsync::Task.current.yieldat the bottom of the pump loop; effectively free when there is no other work, and guarantees peers actually get a turn when the queue stays non-empty. -
disconnecttest no longer assumes strict round-robin. The test asserted thatpush.send("to ep1")followed bypull1.receivereturns that exact message — only true with libzmq-style strict per-peer round-robin, not OMQ's work-stealing. It was passing by accident because the first-started pump consistently dequeued first. Rewritten to only assert the actual#disconnectsemantics: afterdisconnect("ep1"), subsequent messages reach ep2 and ep1 receives nothing.
- Depend on
protocol-zmtp ~> 0.4. Picks up the batchedConnection#write_messagesused by the work-stealing send pumps and the zero-alloc frame-header path on the unencrypted hot send path.
- PUB/XPUB/RADIO fan-out now honors
on_mute. Per-subscriber send queues were hardcoded to:block, so a slow subscriber would back-pressure the publisher despite PUB/XPUB/RADIO defaulting toon_mute: :drop_newest. Fan-out now builds each subscriber's queue with the socket'son_mutestrategy — slow subscribers silently drop their own messages without stalling the publisher or other subscribers.
-
Consolidate connection lifecycle into
Engine::ConnectionLifecycle. One object per connection owns the full arc: handshake → ready → closed. Replaces the scattered callback pattern whereEngine,ConnectionSetup, and#close_connections_ateach held partial responsibility for registration, monitor emission, routing add/remove, and reconnect scheduling. Side-effect order (:handshake_succeededbeforeconnection_added,connection_removedbefore:disconnected) is now encoded as sequential statements in two methods instead of implicit across multiple files. Teardown is idempotent via an explicit 4-state transition table — racing pumps can no longer double-fire:disconnectedor double-callrouting.connection_removed.ConnectionSetupis absorbed and removed.ConnectionRecordcollapses away —@connectionsnow stores lifecycles directly. -
Consolidate socket-level state into
Engine::SocketLifecycle. Six ivars (@state,@peer_connected,@all_peers_gone,@reconnect_enabled,@parent_task,@on_io_thread) move into one cohesive object with an explicit 4-state transition table (:new → :open → :closing → :closed).Engine#closed?,#peer_connected,#all_peers_gone,#parent_taskremain as delegators — public API unchanged. ParallelsConnectionLifecyclein naming and shape. Pure refactor, no behavior change. -
Revert to per-socket HWM with work-stealing send pumps. One shared bounded send queue per socket, drained by N per-connection send pumps that race to dequeue. Slow peers' pumps simply stop pulling; fast peers absorb the load. Strictly better PUSH semantics than libzmq's strict per-pipe round-robin (a known footgun where one slow worker stalls the whole pipeline). Removes
StagingQueue, per-connection queue maps, the double-drain race inadd_*, the disconnect-prepend ordering pretense, and the@cycle/ next-connection machinery. SeeDESIGN.md"Per-socket HWM (not per-connection)" for full reasoning. -
RoundRobinbatch cap is now dual: 256 messages OR 512 KB, whichever hits first (previously 64 messages). The old cap was too aggressive for large messages — with 64 KB payloads it forced a flush every ~4 MB, capping multi-peer push_pull throughput at ~50 % of what the network could handle. Dual cap lets large-message workloads batch ~8 messages per cycle while small-message workloads still yield quickly enough to keep other work-stealing pumps fair. push_pull +5–40 % across transports and sizes; router_dealer +5–15 %. -
Send pumps batched under a single mutex. RoundRobin, ConnSendPump and Pair now drain batches through
Protocol::ZMTP::Connection#write_messages, collapsing N lock acquire/release pairs into one per batch. The size==1 path still usessend_message(write+flush in one lock) to avoid an extra round-trip at low throughput. push_pull inproc +18–28 %, tcp/ipc flat to +17 %.
disconnect(endpoint)now emits:disconnectedon the monitor queue. Previously silent becauseclose_connections_atbypassedconnection_lost.- PUSH/PULL round-robin test. Previously asserted strict 1-msg-per-peer distribution — a libzmq-ism OMQ never promised — and was silently "passing" with 0 assertions and a 10 s Async-block timeout that masked a hang. New test verifies both peers receive nonzero load over TCP.
- Report throughput in bytes/s alongside msgs/s.
- Regenerated
bench/README.mdPUSH/PULL and REQ/REP tables: push_pull throughput up 5–40 %, req_rep round-trip latency down 5–15 %.
max_message_sizenow defaults tonil(unlimited) — previous default of 1 MiB moved into omq-cli.- Benchmark suite: calibration-driven measurement. Each cell auto-sizes
nfrom a prime burst + doubling warmup, then runsROUNDS=3timed rounds ofROUND_DURATION=1.0 s(override viaOMQ_BENCH_TARGET) and reports the fastest. Full suite runs in ~3 min. - Benchmark suite: dropped
curvetransport and thepair/dealer_dealerpattern scripts from the default loop. Files stay in place for ad-hoc runs. bench/push_pull/omq.rbnow runspeer_counts: [1, 3].bench/report.rb --update-readmeregenerates the PUSH/PULL and REQ/REP tables inbench/README.mdfrom the latest run inresults.jsonl, between<!-- BEGIN … -->/<!-- END … -->markers.
- Lazy routing initialization — the routing strategy is now created on
first use (bind, connect, send, or receive) instead of eagerly in the
constructor. This allows socket option setters (
send_hwm=,recv_hwm=) to take effect before internal queue sizing. - Prefetch byte limit —
dequeue_recv_batchnow stops at 1 MB total, not just 64 messages. Prevents large messages from filling the prefetch buffer with hundreds of megabytes. - Bound staging queue
@head—StagingQueue#prependnow drops messages when at capacity, preventing unbounded growth during reconnect cycles. - Bound monitor queue —
Socket#monitoruses aLimitedQueue(64)instead of an unbounded queue, preventing memory growth when verbose monitoring can't keep up with message rate.
- Auto-freeze on bind/connect —
#bindand#connectnow callOMQ.freeze_for_ractors!automatically, freezingCONNECTION_LOST,CONNECTION_FAILED, andEngine.transports. This replaces the internal#freeze_error_lists!method which only froze the error lists. - Drop
Ractor.make_shareable—freeze_for_ractors!now uses plain.freezeinstead ofRactor.make_shareable, removing the Ractor dependency from the core freeze path. - Freeze routing registry —
freeze_for_ractors!now freezesRouting.@registryso draft socket types (SCATTER, GATHER, etc.) can be created inside Ractors.
- Add
OMQ.freeze_for_ractors!— freezesCONNECTION_LOST,CONNECTION_FAILED, andEngine.transportsso OMQ sockets can be created inside bare Ractors. Call once before spawning Ractors.
- Fix pipe FIFO ordering — messages from sequential source batches could
interleave when a connection dropped and reconnected.
FairQueuenow moves orphaned per-connection queues to a priority drain list, ensuring all buffered messages from a disconnected peer are consumed before any new peer's messages. - Fix lost messages on disconnect —
RoundRobin#remove_round_robin_send_connectionnow drains the per-connection send queue back to staging before closing it, and the send pump re-stages its in-flight batch onCONNECTION_LOST. Previously messages in the per-connection queue or mid-batch were silently dropped. - Fix
next_connectiondeadlock — when the round-robin cycle exhausted with connections still present, a new unresolvedAsync::Promisewas created unconditionally, blocking the sender forever. Now only creates a new promise when@connectionsis actually empty. - Fix staging drain race —
add_round_robin_send_connectionnow appends to@connectionsafter draining staging (not before), preventing the pipe loop from bypassing staging during drain. A second drain pass catches any message that squeezed in during the first. - Fix
handshake_succeededevent ordering — the monitor event is now emitted beforeconnection_added(which may yield during drain), so it always appears before anymessage_sentevents on that connection. - Fix send pump
Async::Stoppreventing reconnect —remove_round_robin_send_connectionno longer callstask.stopon the send pump. Instead it closes the queue and lets the pump detect nil, avoidingAsync::Stoppropagation that preventedmaybe_reconnectfrom running. - Add
StagingQueue— bounded FIFO queue with#prependfor re-staging failed messages at the front. Replaces rawAsync::LimitedQueueinRoundRobinandPairrouting strategies. - Add
SingleFramemixin to core — moved from 5 duplicate copies across RFC gems toOMQ::SingleFrame, eliminating method redefinition warnings. - Add
SO_SNDBUF/SO_RCVBUFsocket options —Options#sndbufandOptions#rcvbufset kernel buffer sizes on TCP and IPC sockets (both accepted and connected). - Add verbose monitor events —
Socket#monitor(verbose: true)emits:message_sentand:message_receivedevents viaEngine#emit_verbose_monitor_event. Allocation-free when verbose is off. - Add
OMQ::DEBUGflag — whenOMQ_DEBUGis set, transport accept loops print unexpected exceptions to stderr. - Fix
Pairre-staging on disconnect —Pair#connection_removednow drains the per-connection send queue back to staging, and the send pump re-stages its batch onCONNECTION_LOST.
- Fix PUSH send queue deadlock on disconnect — when a peer disconnected
while a fiber was blocked on a full per-connection send queue (low
send_hwm), the fiber hung forever. Now closes the queue on disconnect, raisingClosedErrorwhich re-routes the message to staging. Also reordersadd_round_robin_send_connectionto start the send pump before draining staging, preventing deadlock with small queues. - Fix reconnect backoff for plain Numeric —
#next_delayincorrectly doubled the delay even whenreconnect_intervalwas a plain Numeric. Now only Range triggers exponential backoff; a fixed Numeric returns the same interval every retry. - Default
reconnect_intervalchanged to0.1..1.0— uses exponential backoff (100 ms → 1 s cap) by default instead of a fixed 100 ms. - Fix per-connection task tree — recv pump, heartbeat, and reaper tasks
were spawned under
@parent_task(socket-level) instead of the connection task. When@parent_taskfinished before a late connection completed its handshake,spawn_pump_taskraisedAsync::Task::FinishedError. Now usesAsync::Task.currentso per-connection subtasks are children of their connection task, matching the DESIGN.md task tree.
-
Fix recv pump crash with connection wrappers —
start_directcalledmsg.sum(&:bytesize)unconditionally, crashing when aconnection_wrapper(e.g. omq-ractor'sMarshalConnection) returns deserialized Ruby objects. Byte counting now usesconn.instance_of?(Protocol::ZMTP::Connection)to skip non-ZMTP connections (inproc, Ractor bridges). -
Remove TLS transport dependency from Gemfile.
-
YARD documentation on all public methods and classes.
-
Code style: expand
else Xone-liners, enforce two blank lines between methods and constants. -
Benchmarks: add per-run timeout (default 30s,
OMQ_BENCH_TIMEOUTenv var) and abort if a group produces no results. -
Add
Engine::Maintenance— spawns a periodicAsync::Loop.quantizedtimer that calls the mechanism's#maintenancecallback (if defined). Enables automatic cookie key rotation for CurveZMQ and BLAKE3ZMQ server mechanisms. -
YJIT: remove redundant
is_a?guards in recv pump — the non-transform branch no longer type-checks every message;conn.receive_messagealways returnsArray<String>. -
YJIT:
FanOut#subscribed?fast path for subscribe-all — connections subscribed to""are tracked in a@subscribe_allSet, short-circuiting the per-message prefix scan with an O(1) lookup. -
YJIT: remove safe navigation in hot enqueue paths —
&.enqueuecalls inFanOut#fan_out_enqueueandRoundRobin#enqueue_round_robinreplaced with direct calls; queues are guaranteed to exist for live connections. -
Fix PUB/SUB fan-out over inproc and IPC — restore
respond_to?(:write_wire)guard inFanOut#start_conn_send_pumpso DirectPipe connections use#write_messageinstead of the wire-optimized path. AddDirectPipe#encrypted?(returnsfalse) for the mechanism query. -
Code audit: never-instantiated classes —
RecvPump,ConnectionSetup, andReconnectrefactored from class-method namespaces to proper instances that capture shared state.Heartbeat,Maintenance, andConnSendPumpchanged from classes to modules (singleself.method, never instantiated).
Engineinternals:ConnectionRecord+ lifecycle state — three parallel per-connection ivars (@connectionsArray,@connection_endpoints,@connection_promises) replaced by a single@connectionsHash keyed by connection, with valuesConnectionRecord = Data.define(:endpoint, :done).@connected_endpointsrenamed to@dialed(Set).@closed/@closingbooleans replaced by a@statesymbol (:open/:closing/:closed). Net: −4 instance variables.@connectionsinFanOut,Sub,XSubrouting strategies changed fromArraytoSet— O(1)#deleteon peer disconnect; semantics already required uniqueness.
- FanOut send queues no longer drop messages — per-connection send queues in
FanOut(PUB/XPUB/RADIO) usedDropQueue(Thread::SizedQueue) which never blocked the publisher fiber. When burst-sending beyondsend_hwm, the sender ran without yielding and messages were silently dropped. Switched toAsync::LimitedQueue(:block) so the publisher yields when a per-connection queue is full, giving the send pump fiber a chance to drain it.
- Benchmark suite redesign — replaced ASCII plots (unicode_plot) with JSONL
result storage and a colored terminal regression report. Results are appended
to
bench/results.jsonl(gitignored, machine-local). New commands:ruby bench/run_all.rb(run all patterns),ruby bench/report.rb(compare last runs, highlight regressions/improvements).
-
Per-peer HWM — send and receive high-water marks now apply per connected peer (RFC 28/29/30). Each peer gets its own bounded send queue and its own bounded recv queue. A slow or muted peer no longer steals capacity from other peers.
FairQueue+SignalingQueueaggregate per-connection recv queues with fair round-robin delivery;RoundRobinandFanOutmixins maintain per-connection send queues with dedicated send pump fibers.PUSH/DEALER/PAIRbuffer messages in a staging queue when no peers are connected yet, draining into the first peer's queue on connect. -
FairQueue— new aggregator class (lib/omq/routing/fair_queue.rb) that fair-queues across per-connection bounded queues. Pending messages from a disconnected peer are drained before the queue is discarded. -
Socket.bind/Socket.connectclass-method fix — now pass the endpoint via@/>prefix into the constructor so any post-attach initialization in subclasses (e.g. XSUB'ssubscribe:kwarg) runs after the connection is established. -
QoS infrastructure —
Options#qosattribute (default 0) and inproc command queue support for QoS-enabled connections. The omq-qos gem activates delivery guarantees via prepends. -
REQ send/recv ordering — REQ sockets now enforce strict send/recv/send/recv alternation. Calling
#sendtwice without a#receivein between raisesSocketError. -
DirectPipe command frame support —
DirectPipe#receive_messageaccepts a block for command frames, matching theProtocol::ZMTP::Connectioninterface. Enables inproc transports to handle ACK/NACK and other command-level protocols.
-
send_pump_idle?visibility — moved aboveprivateinRoundRobinandFanOutsoEngine#drain_send_queuescan call it during socket close. -
Socket#monitor— observe connection lifecycle events via a block-based API. Returns anAsync::Taskthat yieldsMonitorEvent(Data.define) instances for:listening,:accepted,:connected,:connect_delayed,:connect_retried,:handshake_succeeded,:handshake_failed,:accept_failed,:bind_failed,:disconnected,:closed, and:monitor_stopped. Event types align with libzmq'szmq_socket_monitorwhere applicable. Pattern-matchable, zero overhead when no monitor is attached. -
Pluggable transport registry —
Engine.transportsis a scheme → module hash. Built-in transports (tcp,ipc,inproc) are registered at load time. External gems register viaOMQ::Engine.transports["scheme"] = MyTransport. Each transport implements.bind(endpoint, engine)→ Listener,.connect(endpoint, engine), and optionally.validate_endpoint!(endpoint). Listeners implement#start_accept_loops(parent_task, &on_accepted),#stop,#endpoint, and optionally#port. -
Mutable error lists —
CONNECTION_LOSTandCONNECTION_FAILEDare no longer frozen at load time. Transport plugins can append error classes (e.g.OpenSSL::SSL::SSLError) before the first#bind/#connect, which freezes both arrays. -
on_muteoption — controls behavior when a socket enters the mute state (HWM full). PUB, XPUB, and RADIO default toon_mute: :drop_newest— slow subscribers are skipped in the fan-out rather than blocking the publisher. SUB, XSUB, and DISH accepton_mute: :drop_newestor:drop_oldestto drop messages on the receive side instead of applying backpressure. All other socket types default to:block(existing behavior). -
DropQueue— bounded queue with:drop_newest(tail drop) and:drop_oldest(head drop) strategies. Used by recv queues whenon_muteis a drop strategy. -
Routing.build_queue— factory method for building send/recv queues based on HWM and mute strategy. Supports HWM of0ornilfor unbounded queues.
max_message_sizedefaults to 1 MiB — frames exceeding this limit cause the connection to be dropped before the body is read from the wire, preventing a malicious peer from causing arbitrary memory allocation. Setsocket.max_message_size = nilto restore the previous unlimited behavior.- Accept loops moved into Listeners —
TCP::ListenerandIPC::Listenernow own their accept loop logic via#start_accept_loops(parent_task, &on_accepted). Engine delegates via duck-type check. This enables external transports to define custom accept behavior without modifying Engine. Engine#transport_foruses registry lookup instead ofcase/when.Engine#validate_endpoint!delegates to transport module.Engine#bindreadslistener.portinstead of parsing the endpoint string.
-
Draft socket types extracted —
RADIO,DISH,CLIENT,SERVER,SCATTER,GATHER,CHANNEL, andPEERare no longer bundled withomq. Use the omq-draft gem and require the relevant entry point (omq/draft/radiodish,omq/draft/clientserver, etc.). -
UDP transport extracted —
udp://endpoints are provided byomq-draft(viarequire "omq/draft/radiodish"). No longer registered by default. -
Routing.forplugin registry — draft socket type removal addedRouting.register(socket_type, strategy_class)for external gems to register routing strategies. Unknown types fall through the built-incaseto this registry before raisingArgumentError. -
TLS transport — extracted to the omq-transport-tls gem. (Experimental)
require "omq/transport/tls"to restoretls+tcp://support. -
tls_context/tls_context=removed fromOptionsandSocket(provided by omq-transport-tls). -
OpenSSL::SSL::SSLErrorremoved fromCONNECTION_LOST(added back by omq-transport-tls). -
TLS benchmark transport removed from
bench_helper.rbandplot.rb.
backend:kwarg — all socket types acceptbackend: :ffito use the libzmq FFI backend (via the omq-ffi gem). Default is:ruby(pure Ruby ZMTP). Enables interop testing and access to libzmq-specific features without changing the socket API.- TLS transport (
tls+tcp://) — TLS v1.3 on top of TCP using Ruby's stdlibopenssl. Setsocket.tls_contextto anOpenSSL::SSL::SSLContextbefore bind/connect. Per-socket (not per-endpoint), frozen on first use. SNI set automatically from the endpoint hostname. Bad TLS handshakes are dropped without killing the accept loop.OpenSSL::SSL::SSLErroradded toCONNECTION_LOSTfor automatic reconnection on TLS failures. Accompanied by a draft RFC (rfc/zmtp-tls.md) defining the transport mapping for ZMTP 3.1 over TLS. - PUB/RADIO fan-out pre-encoding — ZMTP frames are encoded once per
message and written as raw wire bytes to all non-CURVE subscribers.
Eliminates redundant
Frame.new+#to_wirecalls during fan-out. CURVE connections (which encrypt at the ZMTP level) still encode per-connection. TLS, NULL, and PLAIN all benefit since TLS encrypts below ZMTP. Requires protocol-zmtpFrame.encode_messageandConnection#write_wire. - CURVE benchmarks — all per-pattern benchmarks now include CURVE (via rbnacl) alongside inproc, ipc, tcp, and tls transports.
- Engine
connection_wrapperhook — optional proc on Engine that wraps new connections (both inproc and tcp/ipc) at creation time. Used by the omq-ractor gem for per-connection serialization (Marshal for tcp/ipc,Ractor.make_shareablefor inproc). - Queue-style interface — readable sockets gain
#dequeue(timeout:),#pop,#wait, and#each; writable sockets gain#enqueueand#push. Inspired byAsync::Queue.#waitblocks indefinitely (ignoresread_timeout);#eachreturns gracefully on timeout. - Recv pump fairness — each connection yields to the fiber scheduler after 64 messages or 1 MB (whichever comes first). Prevents a fast or large-message connection from starving slower peers when the consumer keeps up. Byte counting gracefully handles non-string messages (e.g. deserialized objects from connection wrappers).
- Per-pattern benchmark suite —
bench/{push_pull,req_rep,router_dealer,dealer_dealer,pub_sub,pair}/omq.rbwith shared helpers (bench_helper.rb) and UnicodePlot braille line charts (plot.rb). Each benchmark measures throughput (msg/s) and bandwidth (MB/s) across transports (inproc, ipc, tcp, tls, curve), message sizes (64 B–64 KB), and peer counts (1, 3). Plots are written to per-directoryREADME.mdfiles for easy diffing across versions.
- SUB/XSUB
prefix:kwarg renamed tosubscribe:— aligns with ZeroMQ conventions.subscribe: nil(no subscription) remains the default; passsubscribe: ''to subscribe to everything, orsubscribe: 'topic.'for a prefix filter. - Scenario benchmarks moved to
bench/scenarios/— broker, draft_types, flush_batching, hwm_backpressure, large_messages, multiframe, pubsub_fanout, ractors_vs_async, ractors_vs_fork, reconnect_storm, and reqrep_throughput moved frombench/top level.
- Old flat benchmarks —
bench/throughput.rb,bench/latency.rb,bench/pipeline_mbps.rb,bench/run_all.shreplaced by per-pattern benchmarks. bench/cli/— CLI-specific benchmarks (fib pipeline, latency, throughput shell scripts) moved to the omq-cli repository.
- Auto-close sockets via Async task tree — all engine tasks (accept
loops, connection tasks, send/recv pumps, heartbeats, reconnect loops,
reapers) now live under the caller's Async task. When the
Asyncblock exits, tasks are stopped andensureblocks close IO resources. ExplicitSocket#closeis no longer required (but remains available and idempotent). - Non-Async usage — sockets work outside
Async do…end. A shared IO thread hosts the task tree; all blocking operations (bind, connect, send, receive, close) are dispatched to it transparently viaReactor.run. The IO thread shuts down cleanly at process exit, respecting the longest linger across all sockets. - Recv prefetching —
#receiveinternally drains up to 64 messages per queue dequeue, buffering the excess behind a Mutex. Subsequent calls return from the buffer without touching the queue. Thread-safe on JRuby. TCP 64B pipelined: 30k → 221k msg/s (7x).
- Transports are pure IO — TCP and IPC transports no longer spawn tasks. They create server sockets and return them; Engine owns the accept loops.
- Reactor simplified —
spawn_pumpandPumpHandleremoved. Reactor exposesroot_task(shared IO thread's root Async task) andrun(cross-thread dispatch).stop!respects max linger. - Flatten
OMQ::ZMTPnamespace intoOMQ— with the ZMTP protocol layer extracted toprotocol-zmtp, theZMTPsub-namespace no longer makes sense. Engine, routing, transport, and mixins now live directly underOMQ::. Protocol-zmtp types are referenced asProtocol::ZMTP::*.
- Direct pipe bypass for single-peer inproc — PAIR, CHANNEL, and single-peer RoundRobin types (PUSH, REQ, DEALER, CLIENT, SCATTER) enqueue directly into the receiver's recv queue, skipping the send_queue and send pump entirely. Inproc PUSH/PULL: 200k → 980k msg/s (5x).
- Uncapped send queue drain — the send pump drains the entire queue per cycle instead of capping at 64 messages. IO::Stream auto-flushes at 64 KB, so writes hit the wire naturally under load. IPC latency −12%, TCP latency −10%.
- Remove
.ballocations from PUB/SUB subscription matching —FanOut#subscribed?no longer creates temporary binary strings per comparison; both topic and prefix are guaranteed binary at rest. - Reuse
writtenSet andlatestHash across batches in all send pumps (fan-out, round-robin, router, server, peer, rep, radio), eliminating per-batch object allocation. - O(1)
connection_removedfor identity-routed sockets — Router, Server, and Peer now maintain a reverse index instead of scanning. freeze_messagefast path — skip.b.freezewhen the string is already a frozen binary string.- Pre-frozen empty frame constants for REQ/REP delimiter frames.
- Reapers no longer crash on inproc DirectPipe — PUSH and SCATTER reapers skipped for DirectPipe connections that have no receive queue (latent bug previously masked by transient task error swallowing).
send_pump_idle?made public on all routing strategies — was accidentally private, crashingEngine#drain_send_queueswith linger > 0.
- CLI extracted into omq-cli gem — the
omqexecutable, all CLI code (lib/omq/cli/), tests, andCLI.mdhave moved to the omq-cli gem.gem install omqno longer provides theomqcommand — usegem install omq-cli. OMQ.outgoing/OMQ.incomingregistration API moved to omq-cli. Library-only users are unaffected (these were CLI-specific).
- Gemspec is library-only — no
exe/, nobindir, noexecutables. - README — restored title, replaced inline CLI section with a pointer to omq-cli, fixed ZMTP attribution for protocol-zmtp.
- DESIGN.md — acknowledged protocol-zmtp, clarified transient task / linger interaction, removed ZMTP wire protocol section (now in protocol-zmtp), simplified inproc description, removed CLI section.
-
CURVE mechanism moved to protocol-zmtp —
OMQ::ZMTP::Mechanism::Curveis nowProtocol::ZMTP::Mechanism::Curvewith a requiredcrypto:parameter. Passcrypto: RbNaCl(libsodium) orcrypto: Nuckle(pure Ruby). The omq-curve and omq-kurve gems are superseded.# Before (omq-curve) require "omq/curve" rep.mechanism = OMQ::Curve.server(pub, sec) # After (protocol-zmtp + any NaCl backend) require "protocol/zmtp/mechanism/curve" require "nuckle" # or: require "rbnacl" rep.mechanism = Protocol::ZMTP::Mechanism::Curve.server(pub, sec, crypto: Nuckle)
- Protocol layer extracted into protocol-zmtp gem — Codec (Frame,
Greeting, Command), Connection, Mechanism::Null, Mechanism::Curve,
ValidPeers, and Z85 now live in the
protocol-zmtp gem. OMQ
re-exports them under
OMQ::ZMTP::for backwards compatibility. protocol-zmtp has zero runtime dependencies. - Unified CURVE mechanism — one implementation with a pluggable
crypto:backend replaces the two near-identical copies in omq-curve (RbNaCl) and omq-kurve (Nuckle). 1,088 → 467 lines (57% reduction). - Heartbeat ownership —
Connection#start_heartbeatremoved. Connection tracks timestamps only; the engine drives the PING/PONG loop. - CI no longer needs libsodium — CURVE tests use nuckle (pure Ruby) by default. Cross-backend interop tests run when rbnacl is available.
-eis now--recv-eval— evaluates incoming messages only. Send-only sockets (PUSH, PUB, SCATTER, RADIO) must use-E/--send-evalinstead of-e.
-E/--send-eval— eval Ruby for each outgoing message. REQ can now transform requests independently from replies. ROUTER/SERVER/PEER:-Edoes dynamic routing (first element = identity), mutually exclusive with--target.OMQ.outgoing/OMQ.incoming— registration API for script handlers loaded via-r. Blocks receive message parts as a block argument (|msg|). Setup via closures, teardown viaat_exit. CLI flags override registered handlers.- CLI.md — comprehensive CLI documentation.
- GETTING_STARTED.md — renamed from
ZGUIDE_SUMMARY.mdfor discoverability. - Multi-peer pipe with
--in/--out— modal switches that assign subsequent-b/-cto the PULL (input) or PUSH (output) side. Enables fan-in, fan-out, and mixed bind/connect per side. Backward compatible — without--in/--out, the positional 2-endpoint syntax works as before.
- YJIT recv pump — replaced lambda/proc
transform:parameter inEngine#start_recv_pumpwith block captures. No-transform path (PUSH/PULL, PUB/SUB) is now branch-free. ~2.5x YJIT speedup on inproc, ~2x on ipc/tcp.
- Frozen array from
recv_msg_raw— ROUTER/SERVER receiver crashed withFrozenErrorwhen shifting identity off frozen message arrays.#recv_msg_rawnow dups the array.
- CLI error path — use
Kernel#exitinstead ofProcess.exit!
- Dual-stack TCP bind —
TCP.bindresolves the hostname viaAddrinfo.getaddrinfoand binds to all returned addresses.tcp://localhost:PORTnow listens on both127.0.0.1and::1. - Eager DNS validation on connect —
Engine#connectresolves TCP hostnames upfront viaAddrinfo.getaddrinfo. Unresolvable hostnames raiseSocket::ResolutionErrorimmediately instead of failing silently in the background reconnect loop. Socket::ResolutionErrorinCONNECTION_FAILED— DNS failures during reconnect are now retried with backoff (DNS may recover or change), matching libzmq behavior.- CLI catches
SocketDeadErrorandSocket::ResolutionError— prints the error and exits with code 1 instead of silently exiting 0.
- CLI endpoint shorthand —
tcp://:PORTexpands totcp://localhost:PORT(loopback, safe default).tcp://*:PORTexpands totcp://0.0.0.0:PORT(all interfaces, explicit opt-in).
tcp://*:PORTfailed on macOS —*is not a resolvable hostname. Connects now uselocalhostby default;*only expands to0.0.0.0for explicit all-interface binding.Socketconstant resolution insideOMQnamespace — bareSocketresolved toOMQ::Socketinstead of::Socket, causingNameErrorforSocket::ResolutionErrorandSocket::AI_PASSIVE.
self << msgin REP-ecaused double-send —self << $Freturns the socket, whicheval_exprtried to coerce viato_str. Now detected viaresult.equal?(@sock)and returned as aSENTsentinel. REP skips the auto-send when the eval already sent the reply.eval_exprcalledto_stron non-string results — non-string, non-array return values from-enow fail with a clearNoMethodErroronto_str(unchanged), but socket self-references are handled first.
- Gemspec summary — highlights the CLI's composable pipeline capabilities (pipe, filter, transform, formats, Ractor parallelism).
- README CLI section — added
pipe,--transient,-P/--parallel,BEGIN{}/END{}blocks,$_variable, and--marshalformat.
- Flaky memory leak tests on CI — replaced global
ObjectSpacecounting withWeakReftracking of specific objects, retrying GC until collected. No longer depends on GC generational timing.
pipein CLI help and examples — addedpipeto the help banner as a virtual socket type (PULL → eval → PUSH) and added examples showing single-worker,-PRactor, and--transientusage.- Pipeline benchmarks run from any directory —
pipeline.shandpipeline_ractors.shnow derive absolute paths from the script location instead of assuming the working directory is the project root.
- Flaky memory leak tests on CI — replaced global
ObjectSpacecounting withWeakReftracking of specific objects, retrying GC until collected. No longer depends on GC generational timing.
OMQ::SocketDeadError— raised on#send/#receiveafter an internal pump task crashes. The original exception is available via#cause. The socket is permanently bricked.Engine#spawn_pump_task— replaces bareparent_task.async(transient: true)in all 10 routing strategies. Catches unexpected exceptions and forwards them viasignal_fatal_errorso blocked#send/#receivecallers see the real error instead of deadlocking.Socket#close_read— pushes a nil sentinel into the recv queue, causing a blocked#receiveto return nil. Used by--transientto drain remaining messages before exit instead of killing the task.send_pump_idle?on all routing classes — tracks whether the send pump has an in-flight batch.Engine#drain_send_queuesnow waits for bothsend_queue.empty?andsend_pump_idle?, preventing message loss during linger close.- Grace period after
peer_connected— senders that bind or connect to multiple endpoints sleep onereconnect_interval(100ms) after the first peer handshake, giving latecomers time to connect before messages start flowing. -P/--parallel [N]foromq pipe— spawns N Ractor workers (default: nproc) in a single process for true CPU parallelism. Each Ractor runs its own Async reactor with independent PULL/PUSH sockets.$Fin-eexpressions is transparently rewritten for Ractor isolation.BEGIN{}/END{}blocks in-eexpressions — like awk, run setup before the message loop and teardown after. Supports nested braces. Example:-e 'BEGIN{ @sum = 0 } @sum += Integer($_); next END{ puts @sum }'--reconnect-ivl— set reconnect interval from the CLI, accepts a fixed value (0.5) or a range for exponential backoff (0.1..2).--transient— exit when all peers disconnect (after at least one message has been sent/received). Useful for pipeline sinks and workers.--examples— annotated usage examples, paged via$PAGERorless.--helpnow shows help + examples (paged);-hshows help only.-rrelative paths —-r./lib.rband-r../lib.rbresolve viaFile.expand_pathinstead of$LOAD_PATH.peer_connected/all_peers_gone—Async::Promisehooks onSocketfor connection lifecycle tracking.reconnect_enabled=— disable auto-reconnect per socket.- Pipeline benchmark — 4-worker fib pipeline via
omqCLI (bench/cli/pipeline.sh). ~300–1800 msg/s depending on N. - DESIGN.md — architecture overview covering task trees, send pump batching, ZMTP wire protocol, transports, and the fallacies of distributed computing.
- Draft socket types in omqcat — CLIENT, SERVER, RADIO, DISH, SCATTER,
GATHER, CHANNEL, and PEER are now supported in the CLI tool.
-j/--join GROUPfor DISH (like--subscribefor SUB)-g/--group GROUPfor RADIO publishing--targetextended to SERVER and PEER (accepts0xhex for binary routing IDs)--echoand-eon SERVER/PEER reply to the originating client viasend_to- CLIENT uses request-reply loop (send then receive)
- Unified
--timeout— replaces--recv-timeout/--send-timeoutwith a single-t/--timeoutflag that applies to both directions. --linger— configurable drain time on close (default 5s).- Exit codes — 0 = success, 1 = error, 2 = timeout.
- CLI unit tests — 74 tests covering Formatter, routing helpers, validation, and option parsing.
- Quantized
--interval— usesAsync::Loop.quantizedfor wall-clock-aligned, start-to-start timing (no drift). -eas data source — eval expressions can generate messages without--data,--file, or stdin. E.g.omq pub -e 'Time.now.to_s' -i 1.$_in eval — set to the first frame of$Finside-eexpressions, following Ruby convention.wait_for_peer— connecting sockets wait for the first peer handshake before sending. Replaces the need for manual--delayon PUB, PUSH, etc.OMQ_DEVenv var — unified dev-mode flag for loading local omq and omq-curve source viarequire_relative(replacesDEV_ENV).--marshal/-M— Ruby Marshal stream format. Sends any Ruby object over the wire; receiver deserializes and printsinspectoutput. E.g.omq pub -e 'Time.now' -M/omq sub -M.-esingle-shot — eval runs once and exits when no other data source is present. Supportsself << msgfor direct socket sends.subscriber_joined—Async::Promiseon PUB/XPUB that resolves when the first subscription arrives. CLI PUB waits for it before sending.#to_strenforcement — message parts must be string-like; passing integers or symbols raisesNoMethodErrorinstead of silently coercing.-eerror handling — eval errors abort with exit code 3.--rawoutputs ZMTP frames — flags + length + body per frame, suitable forhexdump -C. Compression remains transparent.- ROUTER
router_mandatoryby default — CLI ROUTER rejects sends to unknown identities and waits for first peer before sending. --timeoutapplies towait_for_peer—-tnow bounds the initial connection wait viaAsync::TimeoutError.
- Received messages are always frozen —
Connection#receive_message(TCP/IPC) now returns a frozen array of frozen strings, matching the inproc fast-path. REP and REQ recv transforms rewritten to avoid in-place mutation (Array#shift→ slicing). - CLI refactored into 16 files — the 1162-line
cli.rbmonolith is decomposed intoCLI::Config(frozenData.define),CLI::Formatter,CLI::BaseRunner(shared infrastructure), and one runner class per socket type combo (PushRunner, PullRunner, ReqRunner, RepRunner, etc.). Each runner models its behavior as a single#run_loopoverride. --transientusesclose_readinstead oftask.stop— recv-only and bidirectional sockets drain their recv queue via nil sentinel before exiting, preventing message loss on disconnect. Send-only sockets still usetask.stop.- Pipeline benchmark — natural startup order (producer → workers →
sink), workers use
--transient -t 1(timeout covers workers that connect after the producer is already gone). Verified correct at 5M messages (56k msg/s sustained, zero message loss). - Renamed
omqcat→omq— the CLI executable is nowomq, matching the gem name. - Per-connection task subtrees — each connection gets an isolated Async task whose children (heartbeat, recv pump, reaper) are cleaned up automatically when the connection dies. No reparenting.
- Flat task tree — send pump spawned at socket level (singleton), not
inside connection subtrees. Accept loops use
defer_stopto prevent socket leaks on stop. compile_expr—-eexpressions compiled once as a proc,instance_execper message (wasinstance_evalper message).- Close lifecycle — stop listeners before drain only when connections exist; keep listeners open with zero connections so late-arriving peers can receive queued messages during linger.
- Reconnect guard —
@closingflag suppresses reconnect during close. - Task annotations — all pump tasks carry descriptive annotations (send pump, recv pump, reaper, heartbeat, reconnect, tcp/ipc accept).
- Rename monitor → reaper — clearer name for PUSH/SCATTER dead-peer detection tasks.
- Extracted
OMQ::CLImodule —exe/omqis a thin wrapper; bulk of the CLI lives inlib/omq/cli.rb(loaded viarequire "omq/cli", not auto-loaded byrequire "omq").Formatterclass for encode/decode/compress/decompressRunneris stateful with@sock, cleaner method signatures
- Quoted format uses
String#dump/undump— fixes backslash escaping bug, proper round-tripping of all byte values. - Hex routing IDs — binary identities display as
0xdeadbeefinstead of lossy Z85 encoding.--target 0x...decodes hex on input. - Compression-safe routing — routing ID and delimiter frames are no longer compressed/decompressed in ROUTER, SERVER, and PEER loops.
require_relativein CLI —exe/omqloads the local source tree instead of the installed gem.outputskips nil —-ereturning nil no longer prints a blank line.- Removed
#count_reached?— inlined for clarity. - System tests overhauled —
test/omqcat→test/cli, all IPC abstract namespace,set -eu, stderr captured, no sleeps (except ROUTER --target), under 10s.
- Inproc DEALER→REP broker deadlock —
Writable#sendfreezes the message array, but the REP recv transform mutated it in-place viaArray#shift. On the inproc fast-path the frozen array passed through the DEALER send pump unchanged, causingFrozenErrorthat silently killed the send pump task and deadlocked the broker. - Pump errors swallowed silently — all send/recv pump tasks ran as
transient: trueAsync tasks, so unexpected exceptions (bugs) were logged but never surfaced to the caller. The socket would deadlock instead of raising. NowEngine#signal_fatal_errorstores the error and unblocks the recv queue; subsequent#send/#receivecalls re-raise it asSocketDeadError. Expected errors (Async::Stop,ProtocolError,CONNECTION_LOST) are still handled normally. - Pipe
--transientdrains too early —all_peers_gonefired whilepull.receivewas blocked, hanging the worker forever. Now the transient monitor pushes a nil sentinel viaclose_read, which unblocks the blocked dequeue and lets the loop drain naturally. - Linger drain missed in-flight batches —
drain_send_queuesonly checkedsend_queue.empty?, but the send pump may have already dequeued messages into a local batch. Now also checkssend_pump_idle?. - Socket option delegators not Ractor-safe —
define_methodwith a block captured state from the main Ractor, causingRactor::IsolationErrorwhen calling setters likerecv_timeout=. Replaced withForwardable. - Pipe endpoint ordering —
omq pipe -b url1 -c url2assigned PULL tourl2and PUSH tourl1(backwards) because connects were concatenated before binds. Now uses orderedConfig#endpoints. - Linger drain kills reconnect tasks —
Engine#closeset@closed = truebefore draining send queues, causing reconnect tasks to bail immediately. Messages queued before any peer connected were silently dropped. Now@closedis set after draining, so reconnection continues during the linger period.
-
3–4x throughput under burst load — send pumps now batch writes before flushing.
Connection#write_messagebuffers without flushing;Connection#flushtriggers the syscall. Pumps drain all queued messages per cycle, reducing flush count fromN_msgs × N_connstoN_connsper batch. PUB/SUB TCP with 10 subscribers: 2.3k → 9.2k msg/s (4x). PUSH/PULL TCP: 24k → 83k msg/s (3.4x). Zero overhead under light load (batch of 1 = same path as before). -
Simplified Reactor IO thread — replaced
Thread::Queue+IO.pipewake signal with a singleAsync::Queue.Thread::Queue#popis fiber-scheduler-aware in Ruby 4.0, so the pipe pair was unnecessary.
router_mandatorySocketError raised in send pump — the error killed the pump fiber instead of reaching the caller. Now checked synchronously inenqueuebefore queuing.
- Draft socket types (RFCs 41, 48, 49, 51, 52):
CLIENT/SERVER— thread-safe REQ/REP without envelope, 4-byte routing IDsRADIO/DISH— group-based pub/sub with exact match, JOIN/LEAVE commands.radio.publish(group, body),radio.send(body, group:),radio << [group, body]SCATTER/GATHER— thread-safe PUSH/PULLPEER— bidirectional multi-peer with 4-byte routing IDsCHANNEL— thread-safe PAIR
- All draft types enforce single-frame messages (no multipart)
- Reconnect-after-restart tests for all 10 socket type pairings
- PUSH/SCATTER silently wrote to dead peers — write-only sockets had no recv pump to detect peer disconnection. Writes succeeded because the kernel send buffer absorbed the data, preventing reconnect from triggering. Added background monitor task per connection.
- PAIR/CHANNEL stale send pump after reconnect — old send pump kept
its captured connection reference and raced with the new send pump,
sending to the dead connection. Now stopped in
connection_removed.
- Send pump dies permanently on connection loss —
rescuewas outside the loop, so a singleCONNECTION_LOSTkilled the pump and all subsequent messages queued but never sent - NULL handshake deadlocks with buffered IO — missing
io.flushafter greeting and READY writes caused both peers to block on read - Inproc DirectPipe drops messages when send pump runs before
direct_recv_queueis wired — now buffers to@pending_directand drains on assignment - HWM and timeout options set after construction had no effect because
Async::LimitedQueuewas already allocated with the default
send_hwm:,send_timeout:constructor kwargs forPUSHrecv_hwm:,recv_timeout:constructor kwargs forPULL
- Use
Async::Clock.nowinstead ofProcess.clock_gettimeinternally
- Explicit flush after
send_message/send_commandinstead ofminimum_write_size: 0workaround — enables write buffering (multi-frame messages coalesced into fewer syscalls). +68% inproc throughput (145k → 244k msg/s), -40% inproc latency (15 → 9 µs)
- Require
async ~> 2.38forPromise#wait?(was~> 2)
--curve-serverflag — generates ephemeral keypair, printsOMQ_SERVER_KEY=...to stderr for easy copy-paste--curve-server-key KEYflag — CURVE client mode from the CLI--echoflag for REP — explicit echo mode- REP reads stdin/
-Fas reply source (one line per reply, exits at EOF) - REP without a reply source now aborts with a helpful error message
- CURVE env vars renamed:
OMQ_SERVER_KEY,OMQ_SERVER_PUBLIC,OMQ_SERVER_SECRET(wasSERVER_KEY,SERVER_PUBLIC,SERVER_SECRET) - REP with
--echo/-D/-eserves forever by default (like a server). Use-n 1for one-shot,-nto limit exchanges. Stdin/-Freplies naturally terminate at EOF.
- Hide the warning about the experimental
IO::Buffer(used by io-stream)
omqcat --helpresponds in ~90ms (was ~470ms) — defer heavy gem loading until after option parsing
omqcatCLI tool — nngcat-like Swiss army knife for OMQ sockets- Socket types: req, rep, pub, sub, push, pull, pair, dealer, router
- Formats: ascii (default, tab-separated), quoted, raw, jsonl, msgpack
-e/--eval— Ruby code runs inside the socket instance ($F= message parts, full socket API available:self <<,send,subscribe, etc.). REP auto-replies with the return value; PAIR/DEALER useself <<explicitly-r/--requireto load gems for use in-e-z/--compressZstandard compression per frame (requireszstd-ruby)-D/-Fdata sources,-iinterval,-ncount,-ddelay- CURVE encryption via
SERVER_KEY/SERVER_PUBLIC+SERVER_SECRETenv vars (requiresomq-curve) --identity/--targetfor DEALER/ROUTER patternstcp://:PORTshorthand fortcp://*:PORT(no shell glob issues)- 22 system tests via
rake test:cli
ØMQalias forOMQ— because Ruby can
- Replace
IO::Bufferwithpack/unpack1/getbyte/byteslicein frame, command, and greeting codecs — up to 68% higher throughput for large messages, 21% lower TCP latency
mechanismoption now holds the mechanism instance directly (Mechanism::Null.newby default). For CURVE, useOMQ::Curve.server(pub, sec)orOMQ::Curve.client(pub, sec, server_key: k).- Removed
curve_server,curve_server_key,curve_public_key,curve_secret_key,curve_authenticatorsocket options
- Handle
Errno::EPIPE,Errno::ECONNRESET,Errno::ECONNABORTED,Errno::EHOSTUNREACH,Errno::ENETUNREACH,Errno::ENOTCONN, andIO::Stream::ConnectionResetErrorin accept loops, connect, reconnect, and recv/send pumps — prevents unhandled exceptions when peers disconnect during handshake or become unreachable - Use
TCPSocket.newinstead ofSocket.tcpfor reliable cross-host connections with io-stream
- TCP/IPC
#connectis now non-blocking — returns immediately and establishes the connection in the background, like libzmq - Consolidated connection error handling via
ZMTP::CONNECTION_LOSTandZMTP::CONNECTION_FAILEDconstants - Removed
connect_timeoutoption (no longer needed since connect is non-blocking)
Initial release. Pure Ruby implementation of ZMTP 3.1 (ZeroMQ) using Async.
- REQ, REP, DEALER, ROUTER
- PUB, SUB, XPUB, XSUB
- PUSH, PULL
- PAIR
- TCP (with ephemeral port support and IPv6)
- IPC (Unix domain sockets, including Linux abstract namespace)
- inproc (in-process, lock-free direct pipes)
- Buffered I/O via io-stream (read-ahead buffering, automatic TCP_NODELAY)
- Heartbeat (PING/PONG) with configurable interval and timeout
- Automatic reconnection with exponential backoff
- Per-socket send/receive HWM (high-water mark)
- Linger on close (drain send queue before closing)
max_message_sizeenforcement- Works inside Async reactors or standalone (shared IO thread)
- Optional CURVE encryption via the protocol-zmtp gem