Skip to content

Commit c7b9c26

Browse files
authored
retry on missed heartbeats (#24)
1 parent 0c21b0b commit c7b9c26

File tree

1 file changed

+56
-42
lines changed

1 file changed

+56
-42
lines changed

src/Provider/Amqp/AmqpQueueProvider.php

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use PhpAmqpLib\Connection\AbstractConnection;
1212
use PhpAmqpLib\Connection\AMQPStreamConnection;
1313
use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender;
14+
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
1415
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
1516
use PhpAmqpLib\Exception\AMQPRuntimeException;
1617
use PhpAmqpLib\Exception\AMQPTimeoutException;
@@ -145,63 +146,71 @@ public function pushBatch(array $batch, $persistent = null)
145146

146147
while($needRetry)
147148
{
148-
$needRetry = false;
149-
150-
$this->_refreshConnection(self::CONN_PUSH);
151-
$ch = $this->_getChannel(self::CONN_PUSH);
152-
153-
if($needDeclare)
149+
try
154150
{
155-
$this->_log("Auto-declaring exchange and queue");
156-
$declareAttempts++;
157-
$this->declareExchange();
158-
$this->declareQueue();
159-
$this->bindQueue();
160-
}
151+
$needRetry = false;
161152

162-
$exchangeName = $this->_getExchangeName();
163-
$routingKey = $this->_getRoutingKey();
153+
$this->_refreshConnection(self::CONN_PUSH);
154+
$ch = $this->_getChannel(self::CONN_PUSH);
164155

165-
if($mandatory && $returnCallback)
166-
{
167-
$ch->set_return_listener($returnCallback);
168-
}
156+
if($needDeclare)
157+
{
158+
$this->_log("Auto-declaring exchange and queue");
159+
$declareAttempts++;
160+
$this->declareExchange();
161+
$this->declareQueue();
162+
$this->bindQueue();
163+
}
169164

170-
foreach($batch as $data)
171-
{
172-
$ch->batch_basic_publish(
173-
$this->_getMessage($data, $persistent),
174-
$exchangeName,
175-
$routingKey,
176-
$mandatory
177-
);
178-
}
165+
$exchangeName = $this->_getExchangeName();
166+
$routingKey = $this->_getRoutingKey();
179167

180-
$ch->publish_batch();
168+
if($mandatory && $returnCallback)
169+
{
170+
$ch->set_return_listener($returnCallback);
171+
}
181172

182-
if($publishConfirm || $mandatory)
183-
{
184-
try
173+
foreach($batch as $data)
185174
{
186-
$ch->wait_for_pending_acks_returns($this->_getPushTimeout());
175+
$ch->batch_basic_publish(
176+
$this->_getMessage($data, $persistent),
177+
$exchangeName,
178+
$routingKey,
179+
$mandatory
180+
);
187181
}
188-
catch(Exception $e)
182+
183+
$ch->publish_batch();
184+
185+
if($publishConfirm || $mandatory)
189186
{
190-
$this->disconnect(self::CONN_PUSH);
191-
if($autoDeclare
192-
&& ($declareAttempts < $declareRetryLimit)
193-
&& ($e->getCode() == 404)
194-
)
187+
try
195188
{
196-
$needRetry = true;
197-
$needDeclare = true;
189+
$ch->wait_for_pending_acks_returns($this->_getPushTimeout());
198190
}
199-
else
191+
catch(Exception $e)
200192
{
201-
throw $e;
193+
$this->disconnect(self::CONN_PUSH);
194+
if($autoDeclare
195+
&& ($declareAttempts < $declareRetryLimit)
196+
&& ($e->getCode() == 404)
197+
)
198+
{
199+
$needRetry = true;
200+
$needDeclare = true;
201+
}
202+
else
203+
{
204+
throw $e;
205+
}
202206
}
203207
}
204208
}
209+
catch(AMQPHeartbeatMissedException $e)
210+
{
211+
$this->disconnect(self::CONN_PUSH);
212+
$needRetry = true;
213+
}
205214
}
206215
return $this;
207216
}
@@ -261,6 +270,11 @@ public function consume(callable $callback)
261270
$this->_fixedConsumerCallback
262271
);
263272
}
273+
catch(AMQPHeartbeatMissedException $e)
274+
{
275+
$this->_disconnect(self::CONN_CONSUME);
276+
$retry = true;
277+
}
264278
catch(AMQPProtocolChannelException $e)
265279
{
266280
if(($e->getCode() == 404)

0 commit comments

Comments
 (0)