Skip to content

Commit 5051c08

Browse files
authored
Merge pull request #67 from utopia-php/fix-redis-job-record-memory-leak
2 parents e551606 + 659a51b commit 5051c08

5 files changed

Lines changed: 16 additions & 9 deletions

File tree

src/Queue/Broker/Redis.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
4545
/**
4646
* Move Job to Jobs and it's PID to the processing list.
4747
*/
48-
$this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage);
48+
$this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage, $queue->jobTtl);
4949
$this->connection->leftPush("{$queue->namespace}.processing.{$queue->name}", $message->getPid());
5050

5151
/**

src/Queue/Connection.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ public function listSize(string $key): int;
1919
public function listRange(string $key, int $total, int $offset): array;
2020
public function remove(string $key): bool;
2121
public function move(string $queue, string $destination): bool;
22-
public function set(string $key, string $value): bool;
22+
public function set(string $key, string $value, int $ttl = 0): bool;
2323
public function get(string $key): array|string|null;
24-
public function setArray(string $key, array $value): bool;
24+
public function setArray(string $key, array $value, int $ttl = 0): bool;
2525
public function increment(string $key): int;
2626
public function decrement(string $key): int;
2727
public function ping(): bool;

src/Queue/Connection/Redis.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,16 @@ public function move(string $queue, string $destination): bool
119119
return $this->getRedis()->move($queue, $destination);
120120
}
121121

122-
public function setArray(string $key, array $value): bool
122+
public function setArray(string $key, array $value, int $ttl = 0): bool
123123
{
124-
return $this->set($key, json_encode($value));
124+
return $this->set($key, json_encode($value), $ttl);
125125
}
126126

127-
public function set(string $key, string $value): bool
127+
public function set(string $key, string $value, int $ttl = 0): bool
128128
{
129+
if ($ttl > 0) {
130+
return $this->getRedis()->setex($key, $ttl, $value);
131+
}
129132
return $this->getRedis()->set($key, $value);
130133
}
131134

src/Queue/Connection/RedisCluster.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,16 @@ public function move(string $queue, string $destination): bool
114114
return false;
115115
}
116116

117-
public function setArray(string $key, array $value): bool
117+
public function setArray(string $key, array $value, int $ttl = 0): bool
118118
{
119-
return $this->set($key, json_encode($value));
119+
return $this->set($key, json_encode($value), $ttl);
120120
}
121121

122-
public function set(string $key, string $value): bool
122+
public function set(string $key, string $value, int $ttl = 0): bool
123123
{
124+
if ($ttl > 0) {
125+
return $this->getRedis()->setex($key, $ttl, $value);
126+
}
124127
return $this->getRedis()->set($key, $value);
125128
}
126129

src/Queue/Queue.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
public function __construct(
88
public string $name,
99
public string $namespace = 'utopia-queue',
10+
public int $jobTtl = 0,
1011
) {
1112
if (empty($this->name)) {
1213
throw new \InvalidArgumentException('Cannot create queue with empty name.');

0 commit comments

Comments
 (0)