Skip to content

Commit f45d0bc

Browse files
committed
add publish confirm
1 parent 767fb4c commit f45d0bc

2 files changed

Lines changed: 84 additions & 10 deletions

File tree

composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
}
1818
],
1919
"require": {
20-
"php": ">=7.2"
20+
"php": ">=7.2",
21+
"ext-libxml": "*",
22+
"ext-amqp": "*"
2123
},
2224
"autoload-dev": {
2325
"classmap": [

src/Protocols/Amqp.php

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
use AMQPEnvelope;
99
use AMQPExchange;
1010
use AMQPQueue;
11+
use AMQPQueueException;
12+
use AMQPException;
13+
use AMQPBasicProperties;
1114
use Tochka\MQAdapter\Exceptions\MqAdapterException;
1215

1316
/**
@@ -16,14 +19,25 @@
1619
*/
1720
class Amqp
1821
{
22+
public const CONNECT_TYPE_PUBLISH = 'publish';
23+
public const CONNECT_TYPE_CONSUME = 'consume';
24+
public const PUBLISH_CONFIRM_WAIT = 0.4;
25+
1926
protected $hosts;
2027
protected $login;
2128
protected $password;
2229
protected $settings;
30+
/**
31+
* @var AMQPConnection $connection
32+
*/
2333
protected $connection;
34+
/**
35+
* @var AMQPChannel $channel
36+
*/
2437
protected $channel;
2538
protected $queues;
2639
protected $currentQueue;
40+
protected $connectType = self::CONNECT_TYPE_CONSUME;
2741

2842
/**
2943
* AmqpAdapter constructor.
@@ -41,6 +55,11 @@ public function __construct($hosts, $login, $password, $settings)
4155
$this->settings = $settings;
4256
}
4357

58+
public function setConnectTypePublish(): void
59+
{
60+
$this->connectType = self::CONNECT_TYPE_PUBLISH;
61+
}
62+
4463
/**
4564
* При уничтожении объекта - отключаемся
4665
*/
@@ -86,14 +105,25 @@ public function disconnect(): void
86105
*/
87106
public function send(string $destination, string $message, array $settings = []): bool
88107
{
89-
$this->checkConnection();
108+
$this->setConnectTypePublish();
90109

91-
$exchange = new AMQPExchange($this->channel);
110+
$this->checkConnection();
92111

93-
$result = $exchange->publish($message, $destination, AMQP_NOPARAM, $settings);
112+
$this->setPublisherConfirms();
94113

114+
$exchange = new AMQPExchange($this->channel);
115+
$exchange->setName($destination);
116+
$result = $exchange->publish($message, '', AMQP_NOPARAM, $settings);
95117
if (!$result) {
96-
throw new \AMQPException('Failed to publish content.');
118+
throw new AMQPException('Failed to publish content.');
119+
}
120+
121+
try {
122+
$this->channel->waitForConfirm(self::PUBLISH_CONFIRM_WAIT);
123+
} catch (AMQPException $e) {
124+
if ((int)$e->getCode() !== 0) {
125+
throw $e;
126+
}
97127
}
98128

99129
return $result;
@@ -102,7 +132,8 @@ public function send(string $destination, string $message, array $settings = [])
102132
/**
103133
* Вычитывает и возвращает новые сообщения (если они есть)
104134
*
105-
* @return bool|AMQPEnvelope
135+
* @return AMQPEnvelope|array
136+
* @throws AMQPQueueException
106137
*/
107138
public function getNextMessage()
108139
{
@@ -114,8 +145,15 @@ public function getNextMessage()
114145

115146
$this->currentQueue = $queue;
116147

117-
/** @var AMQPEnvelope $message */
118-
$message = $queue->get(AMQP_NOPARAM);
148+
try {
149+
/** @var AMQPEnvelope $message */
150+
$message = $queue->get(AMQP_NOPARAM);
151+
} catch (AMQPQueueException $e) {
152+
if ((int)$e->getCode() === 404) {
153+
sleep(3);
154+
}
155+
throw $e;
156+
}
119157

120158
if (is_bool($message)) {
121159
return $message;
@@ -163,7 +201,7 @@ public function subscribeAll(): void
163201
}
164202
}
165203

166-
public function unsubscribe($queueName)
204+
public function unsubscribe($queueName): void
167205
{
168206
if (empty($this->queues[$queueName])) {
169207
return;
@@ -200,7 +238,7 @@ public function __sleep()
200238
*
201239
* @return array
202240
*/
203-
protected function adaptMessage(AMQPEnvelope $message)
241+
protected function adaptMessage(AMQPEnvelope $message): array
204242
{
205243
$headers = $message->getHeaders();
206244
$headers['destination'] = $this->currentQueue->getName();
@@ -234,6 +272,10 @@ protected function hasErrors(): bool
234272

235273
/**
236274
* Выполняет переподключение
275+
*
276+
* @return bool
277+
* @throws MqAdapterException
278+
* @throws \AMQPConnectionException
237279
*/
238280
protected function reconnect(): bool
239281
{
@@ -249,6 +291,9 @@ protected function reconnect(): bool
249291

250292
$this->connection->reconnect();
251293
$this->channel = new AMQPChannel($this->connection);
294+
if ($this->connectType === self::CONNECT_TYPE_PUBLISH) {
295+
$this->setPublisherConfirms();
296+
}
252297

253298
if (isset($queues)) {
254299
$this->clearSubscribes();
@@ -263,6 +308,29 @@ protected function reconnect(): bool
263308
return $this->connect();
264309
}
265310

311+
/**
312+
* Включаем механизм подтверждения публикации сообщения
313+
*/
314+
private function setPublisherConfirms(): void
315+
{
316+
317+
$this->channel->setPrefetchCount(1);
318+
$this->channel->confirmSelect();
319+
$this->channel->setConfirmCallback(function ($delivery_tag, $multiple) {
320+
echo 'Message acked: ' . $delivery_tag . '/' . ($multiple ? 'multiple' : 'noMultiple'), PHP_EOL;
321+
322+
return true;
323+
}, function ($delivery_tag, $multiple, $requeue) {
324+
throw new MqAdapterException('Message nacked: ' . $delivery_tag . '/' . ($multiple ? 'multiple' : 'noMultiple') . '/' . $requeue);
325+
});
326+
$this->channel->setReturnCallback(
327+
function ($reply_code, $reply_text, $exchange, $routing_key, AMQPBasicProperties $properties, $body) {
328+
echo 'Message returned: ', $reply_code, '-', $reply_text, ', message body:', $body, PHP_EOL;
329+
}
330+
);
331+
332+
}
333+
266334
/**
267335
* @return bool
268336
* @throws MqAdapterException
@@ -306,6 +374,10 @@ protected function connect(): bool
306374

307375
$this->channel = new AMQPChannel($link);
308376

377+
if ($this->connectType === self::CONNECT_TYPE_PUBLISH) {
378+
$this->setPublisherConfirms();
379+
}
380+
309381
return true;
310382
}
311383
}

0 commit comments

Comments
 (0)