Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions datadog-remote-config/src/fetch/multitarget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::fetch::{
};
use crate::{RemoteConfigCapabilities, RemoteConfigProduct, Target};
use futures_util::future::Shared;
use futures_util::FutureExt;
use futures_util::{pin_mut, select, FutureExt};
use libdd_common::MutexExt;
use manual_future::ManualFuture;
use serde::{Deserialize, Serialize};
Expand All @@ -20,7 +20,6 @@ use std::ops::Add;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use tokio::select;
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing::{debug, error, trace};
Expand Down Expand Up @@ -606,7 +605,10 @@ where
}
}),
)
.shared();
.shared()
.fuse();

pin_mut!(fetcher_fut);

loop {
{
Expand All @@ -628,9 +630,16 @@ where
}
} // unlock mutex

let sleep_fut = tokio::time::sleep(Duration::from_nanos(
fetcher.interval.load(Ordering::Relaxed),
))
.fuse();

pin_mut!(sleep_fut);

select! {
_ = tokio::time::sleep(Duration::from_nanos(fetcher.interval.load(Ordering::Relaxed))) => {},
_ = fetcher_fut.clone() => {
_ = sleep_fut => {},
_ = &mut fetcher_fut => {
break;
}
}
Expand Down
14 changes: 9 additions & 5 deletions datadog-remote-config/src/fetch/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use std::ops::Add;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::select;
use tokio::time::sleep;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tracing::error;

Expand Down Expand Up @@ -349,9 +348,14 @@ impl SharedFetcher {
}
}

select! {
_ = self.cancellation.cancelled() => { break; }
_ = sleep(Duration::from_nanos(self.interval.load(Ordering::Relaxed))) => {}
if timeout(
Duration::from_nanos(self.interval.load(Ordering::Relaxed)),
self.cancellation.cancelled(),
)
.await
.is_ok()
{
break;
}
}

Expand Down
20 changes: 15 additions & 5 deletions datadog-sidecar/src/self_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::config::Config;
use crate::log;
use crate::service::SidecarServer;
use crate::watchdog::WatchdogHandle;
use futures::{pin_mut, select, FutureExt};
use libdd_common::{tag, tag::Tag, MutexExt};
use libdd_telemetry::data::metrics::{MetricNamespace, MetricType};
use libdd_telemetry::metrics::ContextKey;
Expand All @@ -13,7 +14,6 @@ use libdd_telemetry::worker::{
use manual_future::ManualFuture;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::task::JoinHandle;

struct MetricData<'a> {
Expand Down Expand Up @@ -160,9 +160,14 @@ pub fn self_telemetry(server: SidecarServer, watchdog_handle: WatchdogHandle) ->
tokio::spawn(async move {
let submission_interval = tokio::time::interval(Duration::from_secs(60));

let shutdown_fut = watchdog_handle.wait_for_shutdown().fuse();
let config_fut = future.fuse();

pin_mut!(shutdown_fut, config_fut);

select! {
_ = watchdog_handle.wait_for_shutdown() => { },
config = future => {
_ = shutdown_fut => { },
config = config_fut => {
let worker_cfg = SelfTelemetry { submission_interval, watchdog_handle, config, server };
worker_cfg.spawn_worker().await
},
Expand Down Expand Up @@ -272,13 +277,18 @@ impl SelfTelemetry {
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await;
loop {
let tick_fut = self.submission_interval.tick().fuse();
let shutdown_fut = self.watchdog_handle.wait_for_shutdown().fuse();

pin_mut!(tick_fut, shutdown_fut);

select! {
_ = self.submission_interval.tick() => {
_ = tick_fut => {
metrics.collect_and_send().await;
let _ = worker.send_msg(TelemetryActions::Lifecycle(LifecycleAction::FlushMetricAggr)).await;
let _ = worker.send_msg(TelemetryActions::Lifecycle(LifecycleAction::FlushData)).await;
},
_ = self.watchdog_handle.wait_for_shutdown() => {
_ = shutdown_fut => {
metrics.collect_and_send().await;
let _ = worker.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Stop)).await;
let _ = join_handle.await;
Expand Down
4 changes: 2 additions & 2 deletions datadog-sidecar/src/service/agent_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::FutureExt;
use http::uri::PathAndQuery;
use libdd_common::{Endpoint, MutexExt};
use libdd_data_pipeline::agent_info::schema::AgentInfoStruct;
use libdd_data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
use libdd_data_pipeline::agent_info::{fetch_info_with_state_tokio, FetchInfoStatus};
use manual_future::ManualFuture;
use std::ffi::CString;
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -101,7 +101,7 @@ impl AgentInfoFetcher {
parts.path_and_query = Some(PathAndQuery::from_static("/info"));
fetch_endpoint.url = hyper::Uri::from_parts(parts).unwrap();
loop {
let fetched = fetch_info_with_state(&fetch_endpoint, state.as_deref()).await;
let fetched = fetch_info_with_state_tokio(&fetch_endpoint, state.as_deref()).await;
let mut complete_fut = None;
{
let mut infos_guard = agent_infos.0.lock_or_panic();
Expand Down
11 changes: 8 additions & 3 deletions datadog-sidecar/src/service/debugger_diagnostics_bookkeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
use datadog_live_debugger::debugger_defs::{
DebuggerData, DebuggerPayload, Diagnostics, ProbeStatus,
};
use futures::{pin_mut, select, FutureExt};
use libdd_common::MutexExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::select;
use tokio_util::sync::CancellationToken;

pub struct DebuggerDiagnosticsBookkeeper {
Expand Down Expand Up @@ -40,16 +40,21 @@ impl DebuggerDiagnosticsBookkeeper {
tokio::spawn(async move {
let mut interval = tokio::time::interval(MAX_TIME_BEFORE_REMOVAL / 2);
loop {
let tick_fut = interval.tick().fuse();
let cancel_fut = cancel.cancelled().fuse();

pin_mut!(tick_fut, cancel_fut);

select! {
_ = interval.tick() => {
_ = tick_fut => {
active.lock_or_panic().retain(|_, active| {
active.active_probes.retain(|_, status| {
status.last_update.elapsed() < MAX_TIME_BEFORE_REMOVAL
});
!active.active_probes.is_empty()
});
},
_ = cancel.cancelled() => {
_ = cancel_fut => {
break;
},
}
Expand Down
11 changes: 8 additions & 3 deletions datadog-sidecar/src/service/exception_hash_rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::primary_sidecar_identifier;
use datadog_ipc::rate_limiter::{ShmLimiter, ShmLimiterMemory};
use futures::{pin_mut, select, FutureExt};
use libdd_common::{rate_limiter::Limiter, MutexExt};
use std::ffi::CString;
use std::io;
Expand Down Expand Up @@ -39,9 +40,13 @@ impl ManagedExceptionHashRateLimiter {
}
}

tokio::select! {
_ = do_loop() => {}
_ = recv => { }
let loop_fut = do_loop().fuse();
let recv_fut = recv.fuse();
pin_mut!(loop_fut, recv_fut);

select! {
_ = loop_fut => {}
_ = recv_fut => {}
}
});

Expand Down
16 changes: 11 additions & 5 deletions datadog-sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::TraceSendData;
use crate::agent_remote_config::AgentRemoteConfigWriter;
use datadog_ipc::platform::NamedShmHandle;
use futures::future::join_all;
use futures::{pin_mut, select, FutureExt};
use http_body_util::BodyExt;
use libdd_common::hyper_migration::new_default_client;
use libdd_common::{Endpoint, HttpClient, MutexExt};
Expand All @@ -19,7 +20,6 @@ use std::ops::DerefMut;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::select;
use tokio::sync::mpsc;
use tokio::task::{JoinError, JoinHandle};
use tracing::{debug, error, info};
Expand Down Expand Up @@ -277,11 +277,17 @@ impl TraceFlusher {
tokio::spawn(async move {
loop {
let mut flush_done_sender = None;
let sleep_fut = tokio::time::sleep(Duration::from_millis(
self.interval_ms.load(Ordering::Relaxed),
))
.fuse();
let force_flush_fut = (&mut force_flush).fuse();

pin_mut!(sleep_fut, force_flush_fut);

select! {
_ = tokio::time::sleep(Duration::from_millis(
self.interval_ms.load(Ordering::Relaxed),
)) => {},
sender = force_flush => { flush_done_sender = sender; },
_ = sleep_fut => {},
sender = force_flush_fut => { flush_done_sender = sender; },
}

debug!(
Expand Down
19 changes: 11 additions & 8 deletions datadog-sidecar/src/unix.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use spawn_worker::{getpid, SpawnWorker, Stdio, TrampolineData};

use std::ffi::CString;
use std::os::unix::net::UnixListener as StdUnixListener;

use crate::config::Config;
use crate::enter_listener_loop;
use futures::{pin_mut, select, FutureExt};
use nix::fcntl::{fcntl, OFlag, F_GETFL, F_SETFL};
use nix::sys::socket::{shutdown, Shutdown};
use spawn_worker::{getpid, SpawnWorker, Stdio, TrampolineData};
use std::ffi::CString;
use std::io;
use std::os::fd::RawFd;
use std::os::unix::net::UnixListener as StdUnixListener;
use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
use std::time::Instant;
use tokio::net::{UnixListener, UnixStream};
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tracing::{error, info};

Expand Down Expand Up @@ -102,12 +100,17 @@ async fn accept_socket_loop(
#[allow(clippy::unwrap_used)]
let mut termsig = signal(SignalKind::terminate()).unwrap();
loop {
let termsig_fut = termsig.recv().fuse();
let accept_fut = listener.accept().fuse();

pin_mut!(termsig_fut, accept_fut);

select! {
_ = termsig.recv() => {
_ = termsig_fut => {
stop_listening(listener.as_raw_fd());
break;
}
accept = listener.accept() => {
accept = accept_fut => {
if let Ok((socket, _)) = accept {
handler(socket);
} else {
Expand Down
17 changes: 11 additions & 6 deletions datadog-sidecar/src/watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use futures::{
future::{BoxFuture, Shared},
FutureExt,
pin_mut, select, FutureExt,
};
use std::{
sync::{
Expand All @@ -13,7 +13,7 @@ use std::{
};

use crate::service::SidecarServer;
use tokio::{select, sync::mpsc::Receiver};
use tokio::sync::mpsc::Receiver;
use tracing::error;

pub struct Watchdog {
Expand All @@ -29,8 +29,8 @@ pub struct WatchdogHandle {
}

impl WatchdogHandle {
pub async fn wait_for_shutdown(&self) {
self.handle.clone().await;
pub fn wait_for_shutdown(&self) -> Shared<BoxFuture<'static, Option<()>>> {
self.handle.clone()
}
}

Expand Down Expand Up @@ -83,8 +83,13 @@ impl Watchdog {
mem_usage_bytes.store(0, Ordering::Relaxed);

loop {
let tick_fut = self.interval.tick().fuse();
let shutdown_fut = self.shutdown_receiver.recv().fuse();

pin_mut!(tick_fut, shutdown_fut);

select! {
_ = self.interval.tick() => {
_ = tick_fut => {
still_alive.fetch_add(1, Ordering::Relaxed);

let current_mem_usage_bytes = memory_stats::memory_stats()
Expand All @@ -106,7 +111,7 @@ impl Watchdog {
}

},
_ = self.shutdown_receiver.recv() => {
_ = shutdown_fut => {
still_alive.store(SHUTDOWN, Ordering::Relaxed);
return
},
Expand Down
Loading
Loading