@@ -270,6 +270,7 @@ do is to write your own CSV receiver::
270270
271271 use App\Message\NewOrder;
272272 use Symfony\Component\Messenger\Envelope;
273+ use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
273274 use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
274275 use Symfony\Component\Serializer\SerializerInterface;
275276
@@ -286,17 +287,23 @@ do is to write your own CSV receiver::
286287
287288 public function get(): iterable
288289 {
289- $ordersFromCsv = $this->serializer->deserialize(file_get_contents($this->filePath), 'csv');
290-
291- foreach ($ordersFromCsv as $orderFromCsv) {
292- $order = new NewOrder($orderFromCsv['id'], $orderFromCsv['account_id'], $orderFromCsv['amount']);
293-
294- $envelope = new Envelope($order);
290+ // Receive the envelope according to your transport ($yourEnvelope here),
291+ // in most cases, using a connection is the easiest solution.
292+ if (null === $yourEnvelope) {
293+ return [];
294+ }
295295
296- $handler($envelope);
296+ try {
297+ $envelope = $this->serializer->decode([
298+ 'body' => $yourEnvelope['body'],
299+ 'headers' => $yourEnvelope['headers'],
300+ ]);
301+ } catch (MessageDecodingFailedException $exception) {
302+ $this->connection->reject($yourEnvelope['id']);
303+ throw $exception;
297304 }
298305
299- return [$envelope] ;
306+ return [$envelope->with(new CustomStamp($yourEnvelope['id']) ;
300307 }
301308
302309 public function ack(Envelope $envelope): void
@@ -306,7 +313,8 @@ do is to write your own CSV receiver::
306313
307314 public function reject(Envelope $envelope): void
308315 {
309- // Reject the message if needed
316+ // In the case of a custom connection
317+ $this->connection->reject($this->findCustomStamp($envelope)->getId());
310318 }
311319 }
312320
0 commit comments