How to use zmq inside a function (in a non-blocking manner) to obtain the function's state when requested by a client?

Issue

How to use ZMQ in a non-blocking manner to “serve” the status of a long running job when the status is requested by a client?

The below code illustrates how the long running task could be temporarily “interrupted” to send the current status.

The task is long running because there are many urls to process, and not because each url takes a long time to process. This would mean that the server could respond to the client with the current status almost instantly.

I have been unable to implement this logic in a non-blocking manner as using the flag zmq.NOBLOCK results in Again: Resource temporarily unavailable, and not using the flag means that the server blocks and waits to receive a message.

How to achieve such logic/behaviour? I am open to using either a C++ or Python.

Server code:

import zmq

# Socket details
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

# List of many urls
urls = ['http://google.com','http://yahoo.com']

def process(url):
    """Sample function"""
    pass

processed_urls = []
for url in urls:

    # If a message has been received by a client respond to the message
    # The response should be the current status.
    if socket.recv(zmq.NOBLOCK):
        msg = b"Processed the following urls %s" % str(processed_urls).encode()
        socket.send(msg, zmq.NOBLOCK)

    # Continue processing the urls
    process(url)
    processed_urls.append(url)

Solution

First of all – NON-BLOCKING is a dual-side sword. There are two worlds, each of which CAN and sometimes DOES block.

  1. The GIL-side and/or process-side "blocking" can appear ( numpy example below, but valid for any sync-blocking calls that cannot have any easily achievable non-blocking workaround ) while some external process or a global application architecture may still need to have ( at least ) some responding & hand-shaking behaviour from even such knowingly "blocked" Python code-areas.

  2. The second world is your ZeroMQ (potentially)-blocking call. Seting a zmq.CONFLATE may additionally help you in PUSH-like URL-reporting from long-job running client to server. Set CONFLATE both on client and server side of the reporting socket.

In every place where I can I do advocate for strictly non-blocking designs. Even the school-book examples of ZeroMQ code ought be realistic and fair not to block. We live in the 3rd Millenium and a blocking code is a performance & resources-usage devastating state, principally outside of one’s domain of control in professional grade distributed-systems’ design.


A principal scaffolding:

####################################################################
### NEED TO VIEW aHealthSTATUS FROM AN EXTERNAL_UNIVERSE:
### ( A LIGHTWEIGHT EXCULPATED MONITOR TO OBSERVE THE HEALTH OF THE EXECUTION ENVIRONMENT FROM OUTSIDE OF THE VM-JAIL, FROM AN OUTER HYPERVISOR SPACE )
### ( + using signal.signal() )

import signal, os
#-------------------------------------------------------------------
# .SET  ZeroMQ INFRASTRUCTURE:

#-------------------------------------------------------------------
# .DEF  SIG_handler(s)

def SIG_handler_based_HealthREPORTER( SIGnum, aFrame ):
    print( 'SIG_handler called to report state with signal', SIGnum )
    #---------------------------------------------------------------
    # ZeroMQ .send( .SIG/.MSG )
    
    pass;   # yes, all the needed magic comes right here
    
    #-------------------------------------------------------------------
    # FINALLY:
    
    raise OSError( "Had to send a HealthREPORT" )                   # ??? do we indeed need this circus to be always played around, except in a DEMO-mode?

#-------------------------------------------------------------------
# .ASSOC SIG_handler:

signal.signal( signal.SIGALRM, SIG_handler_based_HealthREPORTER )   # .SET { SIGALRM: <aHandler> }-assoc

#-------------------------------------------------------------------
# .SET 1[sec]-delay + 1[sec]-interval

signal.setitimer( signal.ITIMER_REAL, 1, 1 )                        # .SET REAL-TIME Interval-based WatchDog -- Decrements interval timer in real time, and delivers SIGALRM upon expiration.


# ------------------------------------------------------------------
# FINALLY:


#-------------------------------------------------------------------
# .SET / .DEACTIVATE 
signal.setitimer( signal.ITIMER_REAL, 0 )                           # .SET / DEACTIVATE

#-------------------------------------------------------------------
# .TERM GRACEFULLY ZeroMQ INFRASTRUCTURE


#-------------------------------------------------------------------
# CLEAN EXIT(0)
_exit(0)

Let me share an approach used for a sort of aHealthMONITOR on an indeed a long principally-BLOCKING computation cases.

Let’s take one example of a GIL-"blocking" type of computations:

#######
# SETUP
signal.signal(    signal.SIGALRM, SIG_ALRM_handler_A )          # .ASSOC { SIGALRM: thisHandler }
signal.setitimer( signal.ITIMER_REAL, 10, 5 )                   # .SET   @5 [sec] interval, after first run, starting after 10[sec] initial-delay
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1                         # .RESET .INIT()

Mechanics of SIGALRM + ITIMER_REAL deliver a lovely automation to keep external worlds happy with at least some responsiveness ( as frequent as ~ 0.2 [Hz] in this example, but principally {up-|down-}-scalable to any reasonable & yet system-wide stable amount of time testing a 0.5 [GHz] handler on a 1.0 [GHz] VM-system is left for a kind ultimate hacker’s consideration otherwise a common sense for reasonable factors of scale and non-blocking/low-latency designs apply )

DEMO readouts show, how involuntary= context switches demonstrate the blocking-indifferent mechanics ( read the numbers, as they grow, while voluntary remain the same throughout the whole GIL-blocking part of the process ), so a similarly def-ed SIG_ALRM_handler_XYZ() can provide a solution to your process-state independent on-demand reporter.

SIG_ALRM_handler_A(): activated             Wed Oct 19 14:13:14 2016 ------------------------------ pctxsw(voluntary=53151, involuntary=1169)

>>> SIG_ALRM_last_ctx_switch_VOLUNTARY                              53243
>>> SIG_ALRM_last_ctx_switch_FORCED                                  1169

>>> [ np.math.factorial( 2**f ) for f in range(20) ][:5]            # too fast to notice @5[sec]
[1, 2, 24, 40320, 20922789888000]

#########
# COMPUTE
# len(str([np.math.factorial(2**f) for f in range(20)][-1]))    # .RUN   A "FAT"-BLOCKING CHUNK OF A regex/numpy/C/FORTRAN-calculus

>>> len( str( [ np.math.factorial( 2**f ) for f in range(20) ][-1] ) )
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1234)  INSPECT processes ... ev. add a Stateful-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1257)  INSPECT processes ... ev. add a Stateful-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1282)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1305)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1330)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1352)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1377)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1400)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1425)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1448)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1473)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1496)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1521)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1543)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1568)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1591)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1616)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1639)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1664)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1687)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1713)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1740)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1767)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1790)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1812)  INSPECT processes ... ev. add a StateFul-self-Introspection
2771010

In this process-context, there was used this handler:

########################################################################
### SIGALRM_handler_          
###

import psutil, resource, os, time
        
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1
SIG_ALRM_last_ctx_switch_FORCED    = -1

def SIG_ALRM_handler_A( aSigNUM, aFrame ):                              # SIG_ALRM fired evenly even during [ np.math.factorial( 2**f ) for f in range( 20 ) ] C-based processing =======================================
    #
    # onEntry_ROTATE_SigHandlers() -- MAY set another sub-sampled SIG_ALRM_handler_B() ... { last: 0, 0: handler_A, 1: handler_B, 2: handler_C }
    #
    # onEntry_SEQ of calls of regular, hierarchically timed MONITORS ( just the SNAPSHOT-DATA ACQUISITION Code-SPRINTs, handle later due to possible TimeDOMAIN overlaps )
    # 
    aProcess         =   psutil.Process( os.getpid() )
    aProcessCpuPCT   =         aProcess.cpu_percent( interval = 0 )     # EVENLY-TIME-STEPPED
    aCtxSwitchNUMs   =         aProcess.num_ctx_switches()              # THIS PROCESS ( may inspect other per-incident later ... on anomaly )
    
    aVolCtxSwitchCNT = aCtxSwitchNUMs.voluntary
    aForcedSwitchCNT = aCtxSwitchNUMs.involuntary
    
    global SIG_ALRM_last_ctx_switch_VOLUNTARY
    global SIG_ALRM_last_ctx_switch_FORCED
    
    if (     SIG_ALRM_last_ctx_switch_VOLUNTARY != -1 ):                # .INIT VALUE STILL UNCHANGED
        #----------
        # .ON_TICK: must process delta(s)
        if ( SIG_ALRM_last_ctx_switch_VOLUNTARY == aVolCtxSwitchCNT ):
            #
            # AN INDIRECT INDICATION OF A LONG-RUNNING WORKLOAD OUTSIDE GIL-STEPPING ( regex / C-lib / FORTRAN / numpy-block et al )
            #                                                                                 |||||              vvv
            # SIG_:  Wed Oct 19 12:24:32 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=315)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:37 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=323)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:42 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=331)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:47 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=338)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:52 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=346)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:57 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=353)  ~~~  0.0
            # ...                                                                             |||||              ^^^
            # 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]
            # >>>                                                                             |||||              |||
            #                                                                                 vvvvv              |||
            # SIG_:  Wed Oct 19 12:26:17 2016 ------------------------------ pctxsw(voluntary=49983, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:22 2016 ------------------------------ pctxsw(voluntary=49984, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:27 2016 ------------------------------ pctxsw(voluntary=49985, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:32 2016 ------------------------------ pctxsw(voluntary=49986, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:37 2016 ------------------------------ pctxsw(voluntary=49987, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:42 2016 ------------------------------ pctxsw(voluntary=49988, involuntary=502)  ~~~  0.0                
            print(   "SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: ", time.ctime(), 10 * "-",  aProcess.num_ctx_switches(), "{0:_>60s}".format( str( aProcess.threads() ) ),          " INSPECT processes ... ev. add a StateFul-self-Introspection" )
    else:
        #----------
        # .ON_INIT: may report .INIT()
        print(   "SIG_ALRM_handler_A(): activated            ", time.ctime(), 30 * "-",  aProcess.num_ctx_switches() )
    
    ##########
    # FINALLY:
    
    SIG_ALRM_last_ctx_switch_VOLUNTARY = aVolCtxSwitchCNT               # .STO ACTUALs
    SIG_ALRM_last_ctx_switch_FORCED    = aForcedSwitchCNT               # .STO ACTUALs

Answered By – user3666197

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published