Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::{collections::HashSet, env};

use anyhow::{anyhow, Result};
use log::error;
use redis_work_queue::{Item, KeyPrefix, WorkQueue};
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use std::{collections::HashSet, env, thread, time};

use crate::{
app::{RedisJob, SimpleRiderChange},
Expand Down Expand Up @@ -46,8 +45,7 @@ pub async fn main() -> Result<()> {
env::var("PINGS_REMOVE_ROUTE").expect("PINGS_REMOVE_ROUTE must be set"),
)?;

work_loop(queue, db_pool, pings).await?;
Ok(())
work_loop(queue, db_pool, pings).await
}

async fn get_simple_data(
Expand Down Expand Up @@ -263,27 +261,36 @@ pub async fn work_loop(
db_pool: Pool<Postgres>,
pings: PingClient,
) -> Result<()> {
let mut queue_connect_failure = 0;
let three_sec = time::Duration::from_secs(3);
loop {
// Wait for a job with no timeout and a lease time of 5 seconds.
let job: Item = match queue.get_job().await {
Ok(job) => job,
Err(err) => {
error!("{}", err);
error!("Failed to Get Job: {}", err);
queue_connect_failure += 1;
thread::sleep(three_sec);
if queue_connect_failure >= 3 {
error!("Failed to Fetch Job 3+ Times! Failing...");
return Err(anyhow!("Fetch Job Failed 3 Times. Is Redis Running?"));
}
continue;
}
};
queue_connect_failure = 0;
match work(&job, &db_pool, &pings, &mut queue).await {
// Mark successful jobs as complete
Ok(()) => {
queue.complete(&job).await?;
}
// Drop a job that should be retried - it will be returned to the work queue after
// the (5 second) lease expires.
Err(err) if err.should_retry => error!("{}", err.msg),
Err(err) if err.should_retry => error!("Job Failed: {}, Retrying", err.msg),
// Errors that shouldn't cause a retry should mark the job as complete so it isn't
// tried again.
Err(err) => {
error!("{}", err.msg);
error!("Job Failed: {}, Not Retrying", err.msg);
queue.complete(&job).await?;
}
}
Expand Down
Loading