Skip to content

Commit d533348

Browse files
authored
Heartbeat tests (#25)
1 parent c7b9c26 commit d533348

File tree

3 files changed

+139
-27
lines changed

3 files changed

+139
-27
lines changed

src/Provider/Amqp/AmqpQueueProvider.php

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public function consume(callable $callback)
272272
}
273273
catch(AMQPHeartbeatMissedException $e)
274274
{
275-
$this->_disconnect(self::CONN_CONSUME);
275+
$this->disconnect(self::CONN_CONSUME);
276276
$retry = true;
277277
}
278278
catch(AMQPProtocolChannelException $e)
@@ -300,10 +300,17 @@ public function consume(callable $callback)
300300
// replace callback for this consumer
301301
$channel->callbacks[$consumerId] = $this->_fixedConsumerCallback;
302302
}
303+
304+
// consumers bound, wait for message
303305
try
304306
{
305307
$channel->wait(null, false, $this->_getWaitTime());
306308
}
309+
catch(AMQPHeartbeatMissedException $e)
310+
{
311+
$this->disconnect(self::CONN_CONSUME);
312+
return false;
313+
}
307314
catch(AMQPTimeoutException $e)
308315
{
309316
return false;
@@ -399,14 +406,30 @@ public static function create(
399406

400407
public function ack($deliveryTag)
401408
{
402-
$this->_getChannel(self::CONN_CONSUME)
403-
->basic_ack($deliveryTag, false);
409+
try
410+
{
411+
$this->_getChannel(self::CONN_CONSUME)
412+
->basic_ack($deliveryTag, false);
413+
}
414+
catch(AMQPHeartbeatMissedException $e)
415+
{
416+
$this->disconnect(self::CONN_CONSUME);
417+
$this->ack($deliveryTag);
418+
}
404419
}
405420

406421
public function nack($deliveryTag, $requeueFailures = false)
407422
{
408-
$this->_getChannel(self::CONN_CONSUME)
409-
->basic_reject($deliveryTag, $requeueFailures);
423+
try
424+
{
425+
$this->_getChannel(self::CONN_CONSUME)
426+
->basic_reject($deliveryTag, $requeueFailures);
427+
}
428+
catch(AMQPHeartbeatMissedException $e)
429+
{
430+
$this->disconnect(self::CONN_CONSUME);
431+
$this->nack($deliveryTag, $requeueFailures);
432+
}
410433
}
411434

412435
public function batchAck(array $tagResults, $requeueFailures = false)
@@ -556,7 +579,7 @@ protected function _getConnection($connectionMode)
556579
}
557580
catch(Exception $e)
558581
{
559-
$this->_log('AMQP host failed to connect (' . $host . ')');
582+
$this->_log('AMQP host failed to connect [' . $e->getMessage() . '] (' . $host . ')');
560583
array_shift($this->_hosts);
561584
}
562585
$this->_persistentDefault = ValueAs::bool($config->getItem('persistent', false));
@@ -574,6 +597,7 @@ protected function _getConnection($connectionMode)
574597
}
575598
catch(AMQPRuntimeException $e)
576599
{
600+
$this->_log('Unable to start heartbeat sender. ' . $e->getMessage());
577601
}
578602

579603
return $this->_connections[$connectionMode];
@@ -616,7 +640,7 @@ protected function _getChannel($connectionMode)
616640
catch(Exception $e)
617641
{
618642
$this->_log(
619-
'Error getting AMQP channel (' . $retries . ' retries remaining)'
643+
'Error getting AMQP channel [' . $e->getMessage() . '] (' . $retries . ' retries remaining) '
620644
);
621645
$this->disconnect($connectionMode);
622646
if(!($retries--))
@@ -659,36 +683,57 @@ public function disconnect($connectionMode = null)
659683

660684
private function _disconnect($connectionMode)
661685
{
662-
try
686+
if((!empty($this->_channels[$connectionMode]))
687+
&& ($this->_channels[$connectionMode] instanceof AMQPChannel)
688+
)
663689
{
664-
if((!empty($this->_channels[$connectionMode]))
665-
&& ($this->_channels[$connectionMode] instanceof AMQPChannel)
666-
)
690+
try
691+
{
692+
$this->_channels[$connectionMode]->wait_for_pending_acks_returns($this->_getPushTimeout());
693+
}
694+
catch(\Throwable $e)
695+
{
696+
}
697+
try
698+
{
699+
$this->_channels[$connectionMode]->basic_cancel($this->_getConsumerId());
700+
}
701+
catch(\Throwable $e)
702+
{
703+
}
704+
try
667705
{
668706
$this->_channels[$connectionMode]->close();
669707
}
670-
}
671-
catch(Exception $e)
672-
{
708+
catch(\Throwable $e)
709+
{
710+
}
673711
}
674712
$this->_channels[$connectionMode] = null;
675713

676714
if($this->_heartbeatSender)
677715
{
678-
$this->_heartbeatSender->unregister();
679-
$this->_heartbeatSender = null;
716+
try
717+
{
718+
$this->_heartbeatSender->unregister();
719+
}
720+
catch(\Throwable $e)
721+
{
722+
}
680723
}
681-
try
724+
$this->_heartbeatSender = null;
725+
726+
if((!empty($this->_connections[$connectionMode]))
727+
&& ($this->_connections[$connectionMode] instanceof AbstractConnection)
728+
)
682729
{
683-
if((!empty($this->_connections[$connectionMode]))
684-
&& ($this->_connections[$connectionMode] instanceof AbstractConnection)
685-
)
730+
try
686731
{
687732
$this->_connections[$connectionMode]->close();
688733
}
689-
}
690-
catch(Exception $e)
691-
{
734+
catch(\Throwable $e)
735+
{
736+
}
692737
}
693738
$this->_connections[$connectionMode] = null;
694739
}

tests/Provider/AmqpTest.php

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,42 @@
33

44
use Packaged\Config\ConfigSectionInterface;
55
use Packaged\Config\Provider\ConfigSection;
6-
use Packaged\Queue\Provider\Amqp\AmqpQueueProvider;
6+
use Packaged\Queue\Tests\Provider\Mock\AmqpMockProvider;
77

88
class AmqpTest extends \PHPUnit_Framework_TestCase
99
{
1010
protected function _getProvider(string $queue, ?string $exchange = null)
1111
{
12-
$q = AmqpQueueProvider::create($queue, $exchange);
13-
$q->configure(new ConfigSection('', ['heartbeat' => 2]));
12+
$q = AmqpMockProvider::create($queue, $exchange);
13+
$q->configure(new ConfigSection('', ['heartbeat' => 4, 'read_write_timeout' => 3]));
14+
$q->deleteQueueAndExchange();
1415
return $q;
1516
}
1617

18+
public function testFailHeartbeat()
19+
{
20+
$q = $this->_getProvider('test_heartbeat')->unregisterHeartbeat();
21+
$q->declareExchange()
22+
->declareQueue()
23+
->bindQueue();
24+
$q->push($q->config()->getItem('heartbeat'));
25+
self::assertEquals(0, $q->getDisconnectCount());
26+
$q->consume(
27+
function ($msg, $tag) use ($q) {
28+
29+
$timeLeft = (int)$msg * 3;
30+
while($timeLeft > 0)
31+
{
32+
$timeLeft = sleep($timeLeft);
33+
}
34+
35+
$q->ack($tag);
36+
// expect one reconnect
37+
self::assertEquals(1, $q->getDisconnectCount());
38+
}
39+
);
40+
}
41+
1742
public function testAmqp()
1843
{
1944
$q = $this->_getProvider('test', 'testexchange');
@@ -207,7 +232,7 @@ public function testMandatory(
207232
$config, $queueName, $createExchange, $createQueue, $createBinding
208233
)
209234
{
210-
$q = $this->_getProvider($queueName)->deleteQueueAndExchange();
235+
$q = $this->_getProvider($queueName);
211236
$q->configure(new ConfigSection('', $config));
212237

213238
if($createExchange)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
namespace Packaged\Queue\Tests\Provider\Mock;
4+
5+
use Packaged\Queue\Provider\Amqp\AmqpQueueProvider;
6+
7+
class AmqpMockProvider extends AmqpQueueProvider
8+
{
9+
protected $_disconnectCount = 0;
10+
protected $_unregisterHeartbeat = false;
11+
12+
protected function _getConnection($connectionMode)
13+
{
14+
$conn = parent::_getConnection($connectionMode);
15+
if($this->_unregisterHeartbeat && $this->_heartbeatSender)
16+
{
17+
$this->_heartbeatSender->unregister();
18+
}
19+
return $conn;
20+
}
21+
22+
public function unregisterHeartbeat()
23+
{
24+
$this->_unregisterHeartbeat = true;
25+
if($this->_unregisterHeartbeat && $this->_heartbeatSender)
26+
{
27+
$this->_heartbeatSender->unregister();
28+
}
29+
return $this;
30+
}
31+
32+
public function getDisconnectCount()
33+
{
34+
return $this->_disconnectCount;
35+
}
36+
37+
public function disconnect($connectionMode = null)
38+
{
39+
parent::disconnect($connectionMode);
40+
$this->_disconnectCount++;
41+
}
42+
}

0 commit comments

Comments
 (0)