Skip to content

Commit d009b5d

Browse files
authored
support heartbeat via configuration (#21)
1 parent 6ba6a39 commit d009b5d

File tree

2 files changed

+53
-10
lines changed

2 files changed

+53
-10
lines changed

src/Provider/Amqp/AmqpQueueProvider.php

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
use PhpAmqpLib\Channel\AMQPChannel;
1010
use PhpAmqpLib\Connection\AbstractConnection;
1111
use PhpAmqpLib\Connection\AMQPStreamConnection;
12+
use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender;
1213
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
14+
use PhpAmqpLib\Exception\AMQPRuntimeException;
1315
use PhpAmqpLib\Exception\AMQPTimeoutException;
1416
use PhpAmqpLib\Message\AMQPMessage;
1517
use PhpAmqpLib\Wire\AMQPTable;
@@ -83,6 +85,11 @@ class AmqpQueueProvider extends AbstractQueueProvider
8385
*/
8486
private $_slowPushThreshold = 0;
8587

88+
/**
89+
* @var ?PCNTLHeartbeatSender
90+
*/
91+
protected $_heartbeatSender;
92+
8693
protected function _construct()
8794
{
8895
$this->_fixedConsumerCallback = [$this, 'consumerCallback'];
@@ -116,9 +123,8 @@ public function pushBatch(array $batch, $persistent = null)
116123
$returnCallback = null;
117124
if($mandatory)
118125
{
119-
$returnCallback = function ($replyCode, $replyText, $exchange, $routingKey) use
120-
(&$needRetry, &$needDeclare, &$autoDeclare, $declareAttempts, $declareRetryLimit)
121-
{
126+
$returnCallback = function ($replyCode, $replyText, $exchange, $routingKey)
127+
use (&$needRetry, &$needDeclare, &$autoDeclare, $declareAttempts, $declareRetryLimit) {
122128
if($autoDeclare && ($declareAttempts < $declareRetryLimit) && ($replyCode == 312))
123129
{
124130
$needDeclare = true;
@@ -520,7 +526,17 @@ protected function _getConnection($connectionMode)
520526
$host,
521527
$config->getItem('port', 5672),
522528
$config->getItem('username', 'guest'),
523-
$config->getItem('password', 'guest')
529+
$config->getItem('password', 'guest'),
530+
$config->getItem('vhost', '/'),
531+
false,
532+
'AMQPLAIN',
533+
null,
534+
'en_US',
535+
$config->getItem('connection_timeout', 3),
536+
$config->getItem('read_write_timeout', 3),
537+
null,
538+
(bool)$config->getItem('keepalive', false),
539+
$config->getItem('heartbeat', 0)
524540
);
525541
}
526542
catch(Exception $e)
@@ -534,6 +550,20 @@ protected function _getConnection($connectionMode)
534550
);
535551
$this->_lastConnectTimes[$connectionMode] = time();
536552
}
553+
554+
try
555+
{
556+
if($this->_heartbeatSender)
557+
{
558+
$this->_heartbeatSender->unregister();
559+
}
560+
$this->_heartbeatSender = new PCNTLHeartbeatSender($this->_connections[$connectionMode]);
561+
$this->_heartbeatSender->register();
562+
}
563+
catch(AMQPRuntimeException $e)
564+
{
565+
}
566+
537567
return $this->_connections[$connectionMode];
538568
}
539569

@@ -630,6 +660,12 @@ private function _disconnect($connectionMode)
630660
{
631661
}
632662
$this->_channels[$connectionMode] = null;
663+
664+
if($this->_heartbeatSender)
665+
{
666+
$this->_heartbeatSender->unregister();
667+
$this->_heartbeatSender = null;
668+
}
633669
try
634670
{
635671
if((!empty($this->_connections[$connectionMode]))

tests/Provider/AmqpTest.php

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,17 @@
77

88
class AmqpTest extends \PHPUnit_Framework_TestCase
99
{
10+
protected function _getProvider(string $queue, ?string $exchange = null)
11+
{
12+
$q = AmqpQueueProvider::create($queue, $exchange);
13+
$q->configure(new ConfigSection('', ['heartbeat' => 2]));
14+
return $q;
15+
}
16+
1017
public function testAmqp()
1118
{
12-
$q = AmqpQueueProvider::create('test', 'testexchange')
13-
->declareExchange()
19+
$q = $this->_getProvider('test', 'testexchange');
20+
$q->declareExchange()
1421
->declareQueue()
1522
->bindQueue()
1623
->push('this is a test');
@@ -25,7 +32,7 @@ function ($message, $deliveryTag) use ($q) {
2532

2633
public function testQueueExists()
2734
{
28-
$q = AmqpQueueProvider::create('new_queue');
35+
$q = $this->_getProvider('new_queue');
2936
$this->assertFalse($q->queueExists());
3037
$q->declareQueue();
3138
$this->assertTrue($q->queueExists());
@@ -35,7 +42,7 @@ public function testQueueExists()
3542

3643
public function testExchangeExists()
3744
{
38-
$q = AmqpQueueProvider::create('new_queue_e', 'new_exchange');
45+
$q = $this->_getProvider('new_queue_e', 'new_exchange');
3946
$this->assertFalse($q->exchangeExists());
4047
$q->declareExchange();
4148
$this->assertTrue($q->exchangeExists());
@@ -179,7 +186,7 @@ protected function _getQueue(
179186
$queueName, ConfigSectionInterface $config = null
180187
)
181188
{
182-
$q = AmqpQueueProvider::create($queueName);
189+
$q = $this->_getProvider($queueName);
183190
if($config)
184191
{
185192
$q->configure($config);
@@ -200,7 +207,7 @@ public function testMandatory(
200207
$config, $queueName, $createExchange, $createQueue, $createBinding
201208
)
202209
{
203-
$q = AmqpQueueProvider::create($queueName)->deleteQueueAndExchange();
210+
$q = $this->_getProvider($queueName)->deleteQueueAndExchange();
204211
$q->configure(new ConfigSection('', $config));
205212

206213
if($createExchange)

0 commit comments

Comments
 (0)