Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions cli/src/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tonic_reflection::pb::v1::server_reflection_response::MessageResponse;
use agent_api::client::{connect_to_client, connect_to_server_reflection};
use agent_api::requests::{
get_all_features, send_active_connection_request, send_dropped_packets_request,
send_latency_metrics_request, send_tracked_veth_request,
send_latency_metrics_request, send_tracked_veth_request, send_veth_tracked_hashmap_req,
};

use crate::errors::CliError;
Expand Down Expand Up @@ -304,25 +304,24 @@ pub async fn monitor_tracked_veth() -> Result<(), CliError> {
"Connecting to cortexflow Client".white()
);
match connect_to_client().await {
Ok(client) => match send_tracked_veth_request(client).await {
Ok(client) => match send_veth_tracked_hashmap_req(client).await {
Ok(response) => {
let veth_response = response.into_inner();
if veth_response.tot_monitored_veth == 0 {
println!("{} {} ", "=====>".blue().bold(), "No tracked veth found");
Ok(())
} else {
println!(
"{} {} {} {} ",
"=====>".blue().bold(),
"Found:",
&veth_response.tot_monitored_veth,
"tracked veth"
);
for veth in veth_response.veth_names.iter() {
println!("{} {}", "=====>".blue().bold(), &veth);
}
Ok(())
// if veth_response.tot_monitored_veth == 0 {
// println!("{} {} ", "=====>".blue().bold(), "No tracked veth found");
// Ok(())
// } else {
// println!(
// "{} {} {} {} ",
// "=====>".blue().bold(),
// "Found:",
// &veth_response.tot_monitored_veth,
// "tracked veth"
// );
for veth in veth_response.veths.iter() {
println!("{} {:?}", "=====>".blue().bold(), &veth);
}
Ok(())
}
Err(e) => {
return Err(CliError::AgentError(
Expand Down
3 changes: 2 additions & 1 deletion core/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ aya = "0.13.1"
cortexbrain-common = { path = "../common", features = [
"map-handlers",
"network-structs",
"buffer-reader"
"buffer-reader",
"monitoring-structs"
] }
tonic-reflection = "0.14.0"
tonic-build = "0.14.0"
Expand Down
13 changes: 12 additions & 1 deletion core/api/protos/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ message VethEvent{
uint32 pid = 6; // Process ID
}

//declare agent api
message VethHashMapResponse{ // returns tracked veth from the tracked_veth hashmap
string status = 1;
map<string,string> veths = 2;
}

// Agent Service

service Agent{
// active connections endpoint
rpc ActiveConnections(RequestActiveConnections) returns (ActiveConnectionResponse);
Expand All @@ -102,10 +108,15 @@ service Agent{
// dropped packets endpoint
rpc GetDroppedPacketsMetrics(google.protobuf.Empty) returns (DroppedPacketsResponse);

// TODO: can i combine this 2 endpoints?
// active veth info endpoint
rpc GetTrackedVeth(google.protobuf.Empty) returns (VethResponse);
// get tracked veth from blocklist
rpc GetTrackedVethFromHashMap(google.protobuf.Empty) returns (VethHashMapResponse);
}

// Blocklist

message AddIpToBlocklistRequest{
optional string ip = 1 ;
}
Expand Down
90 changes: 88 additions & 2 deletions core/api/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ pub struct VethEvent {
#[prost(uint32, tag = "6")]
pub pid: u32,
}
/// returns tracked veth from the tracked_veth hashmap
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct VethHashMapResponse {
#[prost(string, tag = "1")]
pub status: ::prost::alloc::string::String,
#[prost(map = "string, string", tag = "2")]
pub veths: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct AddIpToBlocklistRequest {
#[prost(string, optional, tag = "1")]
Expand Down Expand Up @@ -192,7 +203,6 @@ pub mod agent_client {
)]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
/// declare agent api
#[derive(Debug, Clone)]
pub struct AgentClient<T> {
inner: tonic::client::Grpc<T>,
Expand Down Expand Up @@ -444,6 +454,31 @@ pub mod agent_client {
.insert(GrpcMethod::new("agent.Agent", "GetTrackedVeth"));
self.inner.unary(req, path, codec).await
}
/// get tracked veth from blocklist
pub async fn get_tracked_veth_from_hash_map(
&mut self,
request: impl tonic::IntoRequest<()>,
) -> std::result::Result<
tonic::Response<super::VethHashMapResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/agent.Agent/GetTrackedVethFromHashMap",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("agent.Agent", "GetTrackedVethFromHashMap"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
Expand Down Expand Up @@ -511,8 +546,15 @@ pub mod agent_server {
&self,
request: tonic::Request<()>,
) -> std::result::Result<tonic::Response<super::VethResponse>, tonic::Status>;
/// get tracked veth from blocklist
async fn get_tracked_veth_from_hash_map(
&self,
request: tonic::Request<()>,
) -> std::result::Result<
tonic::Response<super::VethHashMapResponse>,
tonic::Status,
>;
}
/// declare agent api
#[derive(Debug)]
pub struct AgentServer<T> {
inner: Arc<T>,
Expand Down Expand Up @@ -885,6 +927,50 @@ pub mod agent_server {
};
Box::pin(fut)
}
"/agent.Agent/GetTrackedVethFromHashMap" => {
#[allow(non_camel_case_types)]
struct GetTrackedVethFromHashMapSvc<T: Agent>(pub Arc<T>);
impl<T: Agent> tonic::server::UnaryService<()>
for GetTrackedVethFromHashMapSvc<T> {
type Response = super::VethHashMapResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(&mut self, request: tonic::Request<()>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Agent>::get_tracked_veth_from_hash_map(
&inner,
request,
)
.await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = GetTrackedVethFromHashMapSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
let mut response = http::Response::new(
Expand Down
53 changes: 46 additions & 7 deletions core/api/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::Context;
use anyhow::anyhow;
use chrono::Local;
use cortexbrain_common::formatters::{format_ipv4, format_ipv6};
use cortexbrain_common::map_handlers::load_perf_event_array_from_mapdata;
use prost::bytes::BytesMut;
use std::str::FromStr;
use std::sync::Mutex;
use tonic::{Request, Response, Status};
Expand All @@ -28,7 +28,8 @@ use cortexbrain_common::buffer_type::VethLog;
// * contains agent api configuration
use crate::agent::{
ActiveConnectionResponse, AddIpToBlocklistRequest, BlocklistResponse, RequestActiveConnections,
RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, VethResponse, agent_server::Agent,
RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, VethHashMapResponse, VethResponse,
agent_server::Agent,
};
use crate::constants::PIN_BLOCKLIST_MAP_PATH;

Expand All @@ -38,6 +39,9 @@ use cortexbrain_common::buffer_type::IpProtocols;
use std::net::Ipv4Addr;
use tracing::warn;

use cortexbrain_common::buffer_type::BufferSize;
use cortexbrain_common::map_handlers::map_manager;

pub struct AgentApi {
//* event_rx is an istance of a mpsc receiver.
//* is used to receive the data from the transmitter (tx)
Expand Down Expand Up @@ -162,6 +166,9 @@ impl Default for AgentApi {
tracked_veth_tx: veth_tx.clone(),
};

// init map manager
//let map_manager = map_manager(maps)?

// For network metrics

//spawn an event readers
Expand All @@ -177,7 +184,7 @@ impl Default for AgentApi {
.open(cpu_id, None)
.expect("Error during the creation of net_events_buf structure");

let buffers = vec![BytesMut::with_capacity(4096); 8];
let buffers = BufferSize::ClassifierNetEvents.set_buffer();
net_events_buffer.push((buf, buffers));
}

Expand Down Expand Up @@ -262,7 +269,7 @@ impl Default for AgentApi {
.open(cpu_id, None)
.expect("Error during the creation of net_metrics_buf structure");

let buffers = vec![BytesMut::with_capacity(4096); 8];
let buffers = BufferSize::NetworkMetricsEvents.set_buffer();
net_metrics_buffer.push((buf, buffers));
}

Expand Down Expand Up @@ -343,7 +350,7 @@ impl Default for AgentApi {
.open(cpu_id, None)
.expect("Error during the creation of time stamp events buf structure");

let buffers = vec![BytesMut::with_capacity(4096); 8];
let buffers = BufferSize::TimeMetricsEvents.set_buffer();
ts_events_buffer.push((buf, buffers));
}

Expand Down Expand Up @@ -421,7 +428,7 @@ impl Default for AgentApi {
.open(cpu_id, None)
.expect("Error during the creation of time stamp events buf structure");

let buffers = vec![BytesMut::with_capacity(4096); 8];
let buffers = BufferSize::VethEvents.set_buffer();
veth_events_buffer.push((buf, buffers));
}

Expand Down Expand Up @@ -560,7 +567,10 @@ impl Agent for AgentApi {
//convert ip from string to [u8;4] type and insert into the bpf map
let u8_4_ip = Ipv4Addr::from_str(&ip).unwrap().octets();
//TODO: convert datetime in a kernel compatible format
blocklist_map.insert(u8_4_ip, u8_4_ip, 0);
blocklist_map
.insert(u8_4_ip, u8_4_ip, 0)
.map_err(|e| anyhow!("Cannot insert address in the blocklist. Reason: {}", e))
.unwrap();
info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
}
let path = std::env::var(PIN_BLOCKLIST_MAP_PATH)
Expand Down Expand Up @@ -774,4 +784,33 @@ impl Agent for AgentApi {

Ok(Response::new(response))
}

async fn get_tracked_veth_from_hash_map(
&self,
request: Request<()>,
) -> Result<Response<VethHashMapResponse>, Status> {
info!("Returning veth hashmap");
//open blocklist map
let mapdata = MapData::from_pin("/sys/fs/bpf/maps/tracked_veth")
.expect("cannot open tracked_veth Mapdata");
let tracked_veth_mapdata = Map::HashMap(mapdata); //load mapdata

let tracked_veth_map: ayaHashMap<MapData, [u8; 16], [u8; 8]> =
ayaHashMap::try_from(tracked_veth_mapdata).unwrap();

//convert the maps with a buffer to match the protobuffer types

let mut converted_tracked_veth_map: HashMap<String, String> = HashMap::new();
for item in tracked_veth_map.iter() {
let (k, v) = item.unwrap();
// convert keys and values from [u8;4] to String
let key = String::from_utf8(k.to_vec()).unwrap();
let value = String::from_utf8(v.to_vec()).unwrap();
converted_tracked_veth_map.insert(key, value);
}
Ok(Response::new(VethHashMapResponse {
status: "success".to_string(),
veths: converted_tracked_veth_map,
}))
}
}
10 changes: 10 additions & 0 deletions core/api/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::agent::LatencyMetricsResponse;
use crate::agent::RequestActiveConnections;
use crate::agent::RmIpFromBlocklistRequest;
use crate::agent::RmIpFromBlocklistResponse;
use crate::agent::VethHashMapResponse;
use crate::agent::VethResponse;
use crate::agent::agent_client::AgentClient;

Expand Down Expand Up @@ -100,3 +101,12 @@ pub async fn send_tracked_veth_request(
let response = client.get_tracked_veth(request).await?;
Ok(response)
}

#[cfg(feature = "client")]
pub async fn send_veth_tracked_hashmap_req(
mut client: AgentClient<Channel>,
) -> Result<Response<VethHashMapResponse>, Error> {
let request = Request::new(());
let response = client.get_tracked_veth_from_hash_map(request).await?;
Ok(response)
}
2 changes: 2 additions & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ opentelemetry-otlp = { version = "0.31.0", features = ["logs", "grpc-tonic"] }
bytemuck = "1.25.0"
bytes = "1.11.0"
bytemuck_derive = "1.10.2"
tokio = "1.49.0"

[features]
map-handlers = []
program-handlers = []
network-structs = []
monitoring-structs = []
buffer-reader = []
experimental = []
Loading