Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions Command/ScheduleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace JMS\JobQueueBundle\Command;


use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManager;
use Doctrine\ORM\Query;
Expand Down Expand Up @@ -37,8 +38,10 @@ protected function configure()
$this
->setDescription('Schedules jobs at defined intervals')
->addOption('max-runtime', null, InputOption::VALUE_REQUIRED, 'The maximum runtime of this command.', 3600)
->addOption('min-job-interval', null, InputOption::VALUE_REQUIRED, 'The minimum time between schedules jobs in seconds.', 5)
;
->addOption(
'min-job-interval', null, InputOption::VALUE_REQUIRED,
'The minimum time between schedules jobs in seconds.', 5
);
}

protected function execute(InputInterface $input, OutputInterface $output)
Expand All @@ -63,7 +66,8 @@ protected function execute(InputInterface $input, OutputInterface $output)
return 0;
}

$jobsLastRunAt = $this->populateJobsLastRunAt($this->registry->getManagerForClass(CronJob::class), $jobSchedulers);
$jobsLastRunAt =
$this->populateJobsLastRunAt($this->registry->getManagerForClass(CronJob::class), $jobSchedulers);

$startedAt = time();
while (true) {
Expand All @@ -86,23 +90,23 @@ protected function execute(InputInterface $input, OutputInterface $output)
}

/**
* @param JobScheduler[] $jobSchedulers
* @param \DateTime[] $jobsLastRunAt
* @param JobScheduler[] $jobSchedulers
* @param \DateTime[] $jobsLastRunAt
*/
private function scheduleJobs(OutputInterface $output, array $jobSchedulers, array &$jobsLastRunAt)
{
foreach ($jobSchedulers as $name => $scheduler) {
$lastRunAt = $jobsLastRunAt[$name];

if ( ! $scheduler->shouldSchedule($name, $lastRunAt)) {
if (!$scheduler->shouldSchedule($name, $lastRunAt)) {
continue;
}

list($success, $newLastRunAt) = $this->acquireLock($name, $lastRunAt);
$jobsLastRunAt[$name] = $newLastRunAt;

if ($success) {
$output->writeln('Scheduling command '.$name);
$output->writeln('Scheduling command ' . $name);
$job = $scheduler->createJob($name, $lastRunAt);
$em = $this->registry->getManagerForClass(Job::class);
$em->persist($job);
Expand All @@ -117,31 +121,36 @@ private function acquireLock($commandName, \DateTime $lastRunAt)
$em = $this->registry->getManagerForClass(CronJob::class);
$con = $em->getConnection();

if (!$con->ping()) {
$con->close();
$con->connect();
}

$now = new \DateTime();
$affectedRows = $con->executeUpdate(
"UPDATE jms_cron_jobs SET lastRunAt = :now WHERE command = :command AND lastRunAt = :lastRunAt",
array(
[
'now' => $now,
'command' => $commandName,
'lastRunAt' => $lastRunAt,
),
array(
],
[
'now' => 'datetime',
'lastRunAt' => 'datetime',
)
]
);

if ($affectedRows > 0) {
return array(true, $now);
return [true, $now];
}

/** @var CronJob $cronJob */
$cronJob = $em->createQuery("SELECT j FROM ".CronJob::class." j WHERE j.command = :command")
$cronJob = $em->createQuery("SELECT j FROM " . CronJob::class . " j WHERE j.command = :command")
->setParameter('command', $commandName)
->setHint(Query::HINT_REFRESH, true)
->getSingleResult();

return array(false, $cronJob->getLastRunAt());
return [false, $cronJob->getLastRunAt()];
}

private function populateJobSchedulers()
Expand All @@ -156,7 +165,7 @@ private function populateJobSchedulers()

foreach ($this->cronCommands as $command) {
/** @var CronCommand $command */
if ( ! $command instanceof Command) {
if (!$command instanceof Command) {
throw new \RuntimeException('CronCommand should only be used on Symfony commands.');
}

Expand All @@ -168,15 +177,15 @@ private function populateJobSchedulers()

private function populateJobsLastRunAt(EntityManager $em, array $jobSchedulers)
{
$jobsLastRunAt = array();
$jobsLastRunAt = [];

foreach ($em->getRepository(CronJob::class)->findAll() as $job) {
/** @var CronJob $job */
$jobsLastRunAt[$job->getCommand()] = $job->getLastRunAt();
}

foreach (array_keys($jobSchedulers) as $name) {
if ( ! isset($jobsLastRunAt[$name])) {
if (!isset($jobsLastRunAt[$name])) {
$job = new CronJob($name);
$em->persist($job);
$jobsLastRunAt[$name] = $job->getLastRunAt();
Expand Down