RabbitMQ, Celery and Django – connection to broker lost. Trying to re-establish the connection

Issue

Celery disconnects from RabbitMQ each time a task is passed to rabbitMQ, however the task does eventually succeed:

My questions are:

  1. How can I solve this issue?
  2. What improvements can you suggest for my celery/rabbitmq configuration?

Celery version: 5.1.2
RabbitMQ version: 3.9.0
Erlang version: 24.0.4

RabbitMQ error (sorry for the length of the log:

 ** Generic server <0.11908.0> terminating
 ** Last message in was {'$gen_cast',
                            {method,{'basic.ack',1,false},none,noflow}}
 ** When Server state == {ch,
                          {conf,running,rabbit_framing_amqp_0_9_1,1,
                           <0.11899.0>,<0.11906.0>,<0.11899.0>,
                           <<"someIPAddress:45610 -> someIPAddress:5672">>,
                           undefined,
                           {user,<<"someadmin">>,
                            [administrator],
                            [{rabbit_auth_backend_internal,none}]},
                           <<"backoffice">>,<<"celery">>,<0.11900.0>,
                           [{<<"consumer_cancel_notify">>,bool,true},
                            {<<"connection.blocked">>,bool,true},
                            {<<"authentication_failure_close">>,bool,true}],
                           none,0,134217728,1800000,#{},1000000000},
                          {lstate,<0.11907.0>,true},
                          none,2,
                          {1,
                           {[{pending_ack,1,<<"None4">>,1627738474140,
                              {resource,<<"backoffice">>,queue,<<"celery">>},
                              2097}],
                            []}},
                          {state,#{},erlang},
                          #{<<"None4">> =>
                             {{amqqueue,
                               {resource,<<"backoffice">>,queue,<<"celery">>},
                               true,false,none,[],<0.471.0>,[],[],[],undefined,
                               undefined,[],[],live,0,[],<<"backoffice">>,
                               #{user => <<"someadmin">>},
                               rabbit_classic_queue,#{}},
                              {false,0,false,[]}}},
                          #{{resource,<<"backoffice">>,queue,<<"celery">>} =>
                             {1,{<<"None4">>,nil,nil}}},
                          {state,none,5000,undefined},
                          false,1,
                          {rabbit_confirms,undefined,#{}},
                          [],[],none,flow,[],
                          {rabbit_queue_type,
                           #{{resource,<<"backoffice">>,queue,<<"celery">>} =>
                              {ctx,rabbit_classic_queue,
                               {resource,<<"backoffice">>,queue,<<"celery">>},
                               {rabbit_classic_queue,<0.471.0>,
                                {resource,<<"backoffice">>,queue,<<"celery">>},
                                #{}}}},
                           #{<0.471.0> =>
                              {resource,<<"backoffice">>,queue,<<"celery">>}}},
                          #Ref<0.4203289403.2328100865.106387>,false}
 ** Reason for termination ==
 ** {function_clause,
        [{rabbit_channel,'-notify_limiter/2-fun-0-',
             [{pending_ack,1,<<"None4">>,1627738474140,
                  {resource,<<"backoffice">>,queue,<<"celery">>},
                  2097},
              0],
             [{file,"src/rabbit_channel.erl"},{line,2124}]},
         {lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
         {rabbit_channel,notify_limiter,2,
             [{file,"src/rabbit_channel.erl"},{line,2124}]},
         {rabbit_channel,ack,2,[{file,"src/rabbit_channel.erl"},{line,2057}]},
         {rabbit_channel,handle_method,3,
             [{file,"src/rabbit_channel.erl"},{line,1343}]},
         {rabbit_channel,handle_cast,2,
             [{file,"src/rabbit_channel.erl"},{line,644}]},
         {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]},
         {proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}

   crasher:
     initial call: rabbit_channel:init/1
     pid: <0.11908.0>
     registered_name: []
     exception exit: {function_clause,
                         [{rabbit_channel,'-notify_limiter/2-fun-0-',
                              [{pending_ack,1,<<"None4">>,1627738474140,
                                   {resource,<<"backoffice">>,queue,
                                       <<"celery">>},
                                   2097},
                               0],
                              [{file,"src/rabbit_channel.erl"},{line,2124}]},
                          {lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
                          {rabbit_channel,notify_limiter,2,
                              [{file,"src/rabbit_channel.erl"},{line,2124}]},
                          {rabbit_channel,ack,2,
                              [{file,"src/rabbit_channel.erl"},{line,2057}]},
                          {rabbit_channel,handle_method,3,
                              [{file,"src/rabbit_channel.erl"},{line,1343}]},
                          {rabbit_channel,handle_cast,2,
                              [{file,"src/rabbit_channel.erl"},{line,644}]},
                          {gen_server2,handle_msg,2,
                              [{file,"src/gen_server2.erl"},{line,1067}]},
                          {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,236}]}]}
       in function  gen_server2:terminate/3 (src/gen_server2.erl, line 1183)
     ancestors: [<0.11905.0>,<0.11903.0>,<0.11898.0>,<0.11897.0>,<0.508.0>,
                   <0.507.0>,<0.506.0>,<0.504.0>,<0.503.0>,rabbit_sup,
                   <0.224.0>]
     message_queue_len: 0
     messages: []
     links: [<0.11905.0>]
     dictionary: [{channel_operation_timeout,15000},
                   {process_name,
                       {rabbit_channel,
                           {<<"someIPAddress:45610 -> someIPAddress:5672">>,
                            1}}},
                   {rand_seed,
                       {#{jump => #Fun<rand.3.92093067>,
                          max => 288230376151711743,
                          next => #Fun<rand.5.92093067>,type => exsplus},
                        [262257290895536220|242201045588130196]}},
                   {{xtype_to_module,direct},rabbit_exchange_type_direct},
                   {permission_cache_can_expire,false},
                   {msg_size_for_gc,115}]
     trap_exit: true
     status: running
     heap_size: 28690
     stack_size: 29
     reductions: 67935
   neighbours:

 Error on AMQP connection <0.11899.0> (someIPAddress:45610 -> someIPAddress:5672, vhost: 'backoffice', user: 'someadmin', state: running), channel 1:
  {function_clause,
     [{rabbit_channel,'-notify_limiter/2-fun-0-',
          [{pending_ack,1,<<"None4">>,1627738474140,
               {resource,<<"backoffice">>,queue,<<"celery">>},
               2097},
           0],
          [{file,"src/rabbit_channel.erl"},{line,2124}]},
      {lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
      {rabbit_channel,notify_limiter,2,
          [{file,"src/rabbit_channel.erl"},{line,2124}]},
      {rabbit_channel,ack,2,[{file,"src/rabbit_channel.erl"},{line,2057}]},
      {rabbit_channel,handle_method,3,
          [{file,"src/rabbit_channel.erl"},{line,1343}]},
      {rabbit_channel,handle_cast,2,
          [{file,"src/rabbit_channel.erl"},{line,644}]},
      {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]},
      {proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}
     supervisor: {<0.11905.0>,rabbit_channel_sup}
     errorContext: child_terminated
     reason: {function_clause,
                 [{rabbit_channel,'-notify_limiter/2-fun-0-',
                      [{pending_ack,1,<<"None4">>,1627738474140,
                           {resource,<<"backoffice">>,queue,<<"celery">>},
                           2097},
                       0],
                      [{file,"src/rabbit_channel.erl"},{line,2124}]},
                  {lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
                  {rabbit_channel,notify_limiter,2,
                      [{file,"src/rabbit_channel.erl"},{line,2124}]},
                  {rabbit_channel,ack,2,
                      [{file,"src/rabbit_channel.erl"},{line,2057}]},
                  {rabbit_channel,handle_method,3,
                      [{file,"src/rabbit_channel.erl"},{line,1343}]},
                  {rabbit_channel,handle_cast,2,
                      [{file,"src/rabbit_channel.erl"},{line,644}]},
                  {gen_server2,handle_msg,2,
                      [{file,"src/gen_server2.erl"},{line,1067}]},
                  {proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}
     offender: [{pid,<0.11908.0>},
                {id,channel},
                {mfargs,
                    {rabbit_channel,start_link,
                        [1,<0.11899.0>,<0.11906.0>,<0.11899.0>,
                         <<"someIPAddress:45610 -> someIPAddress:5672">>,
                         rabbit_framing_amqp_0_9_1,
                         {user,<<"someadmin">>,
                             [administrator],
                             [{rabbit_auth_backend_internal,none}]},
                         <<"backoffice">>,
                         [{<<"consumer_cancel_notify">>,bool,true},
                          {<<"connection.blocked">>,bool,true},
                          {<<"authentication_failure_close">>,bool,true}],
                         <0.11900.0>,<0.11907.0>]}},
                {restart_type,intrinsic},
                {shutdown,70000},
                {child_type,worker}]
 Non-AMQP exit reason '{function_clause,
                        [{rabbit_channel,'-notify_limiter/2-fun-0-',
                          [{pending_ack,1,<<"None4">>,1627738474140,
                            {resource,<<"backoffice">>,queue,<<"celery">>},
                            2097},
                           0],
                          [{file,"src/rabbit_channel.erl"},{line,2124}]},
                         {lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
                         {rabbit_channel,notify_limiter,2,
                          [{file,"src/rabbit_channel.erl"},{line,2124}]},
                         {rabbit_channel,ack,2,
                          [{file,"src/rabbit_channel.erl"},{line,2057}]},
                         {rabbit_channel,handle_method,3,
                          [{file,"src/rabbit_channel.erl"},{line,1343}]},
                         {rabbit_channel,handle_cast,2,
                          [{file,"src/rabbit_channel.erl"},{line,644}]},
                         {gen_server2,handle_msg,2,
                          [{file,"src/gen_server2.erl"},{line,1067}]},
                         {proc_lib,wake_up,3,
                          [{file,"proc_lib.erl"},{line,236}]}]}'
     supervisor: {<0.11905.0>,rabbit_channel_sup}
     errorContext: shutdown
     reason: reached_max_restart_intensity
     offender: [{pid,<0.11908.0>},
                {id,channel},
                {mfargs,
                    {rabbit_channel,start_link,
                        [1,<0.11899.0>,<0.11906.0>,<0.11899.0>,
                         <<"someIPAddress:45610 -> someIPAddress:5672">>,
                         rabbit_framing_amqp_0_9_1,
                         {user,<<"someadmin">>,
                             [administrator],
                             [{rabbit_auth_backend_internal,none}]},
                         <<"backoffice">>,
                         [{<<"consumer_cancel_notify">>,bool,true},
                          {<<"connection.blocked">>,bool,true},
                          {<<"authentication_failure_close">>,bool,true}],
                         <0.11900.0>,<0.11907.0>]}},
                {restart_type,intrinsic},
                {shutdown,70000},
                {child_type,worker}]
 closing AMQP connection <0.11899.0> (someIPAddress:45610 -> someIPAddress:5672, vhost: 'backoffice', user: 'someadmin')
 accepting AMQP connection <0.14133.0> (someIPAddress:57452 -> someIPAddress:5672)
 connection <0.14133.0> (someIPAddress:57452 -> someIPAddress:5672): user 'someadmin' authenticated and granted access to vhost 'backoffice'

Celery log:

INFO/MainProcess] Task subscribe_task[aae43c55-3396-45f3-8bea-d01a66983835] received
DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x7fbd03f32af0> (args:('subscribe_task', 'aae43c55-3396-45f3-8bea-d01a66983835', {'lang': 'py', 'task': 'subscribe_task', 'id': 'aae43c55-3396-45f3-8bea-d01a66983835', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'aae43c55-3396-45f3-8bea-d01a66983835', 'parent_id': None, 'argsrepr': "(140, 'Subscribe Confirm Email Address')", 'kwargsrepr': '{}', 'origin': 'gen20577@webserver', 'ignore_result': True, 'reply_to': '91cf548f-4e42-3870-b27f-2cc1fd3f7074', 'correlation_id': 'aae43c55-3396-45f3-8bea-d01a66983835', 'hostname': 'worker@webserver', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [140, 'Subscribe Confirm Email Address'], 'kwargs': {}}, '[[140, "Subscribe Confirm Email Address"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
DEBUG/MainProcess] Closed channel #1
DEBUG/MainProcess] Closed channel #2
WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
    blueprint.start(self)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 618, in start
    c.loop(*c.loop_args())
  File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/worker/loops.py", line 81, in asynloop
    next(loop)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 361, in create_loop
    cb(*cbargs)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/kombu/transport/base.py", line 235, in on_readable
    reader(loop)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/kombu/transport/base.py", line 217, in _read
    drain_events(timeout=0)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
    while not self.blocking_read(timeout):
  File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
    return self.on_inbound_frame(frame)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
    listener(*args)
  File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 665, in _on_close
    raise error_for_code(reply_code, reply_text,
amqp.exceptions.InternalError: (0, 0): (541) INTERNAL_ERROR
DEBUG/MainProcess] | Consumer: Restarting event loop...
DEBUG/MainProcess] | Consumer: Restarting Control...
DEBUG/MainProcess] | Consumer: Restarting Tasks...
DEBUG/MainProcess] Canceling task consumer...
DEBUG/MainProcess] | Consumer: Restarting Connection...
DEBUG/MainProcess] | Consumer: Starting Connection
DEBUG/MainProcess] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@webserver', 'copyright': 'Copyright (c) 2007-2021 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.0.4', 'product': 'RabbitMQ', 'version': '3.9.0'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
INFO/MainProcess] Connected to amqp://someAdmin:**@webserver:5672/backoffice

Celery Service config:

[Unit]
Description=Celery Service
After=network.target

[Service]
Type=forking
User=DJANGO_USER
Group=DJANGO_USER
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/backoffice

ExecStart=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi start $CELERYD_NODES \
   --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
   --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait $CELERYD_NODES \
    --pidfile=${CELERYD_PID_FILE} --loglevel="${CELERYD_LOG_LEVEL}"'
ExecReload=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi restart $CELERYD_NODES \
    --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
    --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'
Restart=always

[Install]
WantedBy=multi-user.target

Celery conf.d:

CELERYD_NODES="worker"
CELERY_BIN="/opt/backoffice/venv/bin/celery"
CELERY_APP="backoffice"
CELERYD_CHDIR="/opt/backoffice/"
CELERYD_MULTI="multi"
CELERYD_OPTS="--time-limit=300 --without-heartbeat --without-gossip --without-mingle"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_LOG_LEVEL="DEBUG"
CELERYBEAT_PID_FILE="/var/run/celery/beat.pid"
CELERYBEAT_LOG_FILE="/var/log/celery/beat.log"
CELERYBEAT_DB_FILE="/var/cache/backoffice/celerybeat/celerybeat-schedule.db"

Django celery_config.py:

from django.conf import settings
broker_url = settings.RABBITMQ_BROKER
worker_send_task_event = False
task_ignore_result = True
task_time_limit = 60
task_soft_time_limit = 50
task_acks_late = True
worker_prefetch_multiplier = 10
worker_cancel_long_running_tasks_on_connection_loss = True

Solution

Thanks Xentux, you set me on the right path. Whilst looking for the downgrade I saw that there is a patch for rabbit (3.9.1). Rabbit is working again without errors, at least for the moment..

Answered By – user12231454

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published