diff --git a/src/worker.rs b/src/worker.rs index cb7bf55..e334f41 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,9 +1,8 @@ -use std::{collections::HashSet, env}; - use anyhow::{anyhow, Result}; use log::error; use redis_work_queue::{Item, KeyPrefix, WorkQueue}; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; +use std::{collections::HashSet, env, thread, time}; use crate::{ app::{RedisJob, SimpleRiderChange}, @@ -46,8 +45,7 @@ pub async fn main() -> Result<()> { env::var("PINGS_REMOVE_ROUTE").expect("PINGS_REMOVE_ROUTE must be set"), )?; - work_loop(queue, db_pool, pings).await?; - Ok(()) + work_loop(queue, db_pool, pings).await } async fn get_simple_data( @@ -263,15 +261,24 @@ pub async fn work_loop( db_pool: Pool, pings: PingClient, ) -> Result<()> { + let mut queue_connect_failure = 0; + let three_sec = time::Duration::from_secs(3); loop { // Wait for a job with no timeout and a lease time of 5 seconds. let job: Item = match queue.get_job().await { Ok(job) => job, Err(err) => { - error!("{}", err); + error!("Failed to Get Job: {}", err); + queue_connect_failure += 1; + thread::sleep(three_sec); + if queue_connect_failure >= 3 { + error!("Failed to Fetch Job 3+ Times! Failing..."); + return Err(anyhow!("Fetch Job Failed 3 Times. Is Redis Running?")); + } continue; } }; + queue_connect_failure = 0; match work(&job, &db_pool, &pings, &mut queue).await { // Mark successful jobs as complete Ok(()) => { @@ -279,11 +286,11 @@ pub async fn work_loop( } // Drop a job that should be retried - it will be returned to the work queue after // the (5 second) lease expires. - Err(err) if err.should_retry => error!("{}", err.msg), + Err(err) if err.should_retry => error!("Job Failed: {}, Retrying", err.msg), // Errors that shouldn't cause a retry should mark the job as complete so it isn't // tried again. Err(err) => { - error!("{}", err.msg); + error!("Job Failed: {}, Not Retrying", err.msg); queue.complete(&job).await?; } }