Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 48 additions & 17 deletions flow-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ impl FlowFilter {
.map(NonZero::get)
.zip(t.dst_port().map(NonZero::get))
});
let log_str = format_packet_addrs_ports(&src_ip, &dst_ip, ports);

// For Display
let tuple = FlowTuple::new(src_ip, dst_ip, ports);

if let Some(dst_vpcd) = tablesr.lookup(src_vpcd, &src_ip, &dst_ip, ports) {
debug!("{nfi}: Flow allowed: {log_str}, setting packet dst_vpcd to {dst_vpcd}");
debug!("{nfi}: Flow {tuple} is allowed, setting packet dst_vpcd to {dst_vpcd}",);
packet.meta_mut().dst_vpcd = Some(dst_vpcd);
} else {
debug!("{nfi}: Flow not allowed, dropping packet: {log_str}");
debug!("{nfi}: Flow {tuple} is NOT allowed, dropping packet",);
packet.done(DoneReason::Filtered);
}
}
Expand All @@ -105,16 +107,45 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for FlowFilter {
}
}

fn format_packet_addrs_ports(
src_addr: &IpAddr,
dst_addr: &IpAddr,
ports: Option<(u16, u16)>,
) -> String {
format!(
"src={src_addr}{}, dst={dst_addr}{}",
ports.map_or(String::new(), |p| format!(":{}", p.0)),
ports.map_or(String::new(), |p| format!(":{}", p.1))
)
// Only used for Display
struct OptPort(Option<u16>);
impl std::fmt::Display for OptPort {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(port) = self.0 {
write!(f, ":{port}")?;
}
Ok(())
}
}

// Only used for Display
struct FlowTuple {
src_addr: IpAddr,
dst_addr: IpAddr,
src_port: OptPort,
dst_port: OptPort,
}

impl FlowTuple {
fn new(src_addr: IpAddr, dst_addr: IpAddr, ports: Option<(u16, u16)>) -> Self {
let ports = ports.unzip();
Self {
src_addr,
dst_addr,
src_port: OptPort(ports.0),
dst_port: OptPort(ports.1),
}
}
}

impl std::fmt::Display for FlowTuple {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"src={}{}, dst={}{}",
self.src_addr, self.src_port, self.dst_addr, self.dst_port
)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -768,10 +799,10 @@ mod tests {
let src_addr = "10.0.0.1".parse().unwrap();
let dst_addr = "20.0.0.2".parse().unwrap();

let result = format_packet_addrs_ports(&src_addr, &dst_addr, Some((8080, 443)));
assert_eq!(result, "src=10.0.0.1:8080, dst=20.0.0.2:443");
let result = FlowTuple::new(src_addr, dst_addr, Some((8080, 443)));
assert_eq!(result.to_string(), "src=10.0.0.1:8080, dst=20.0.0.2:443");

let result_no_ports = format_packet_addrs_ports(&src_addr, &dst_addr, None);
assert_eq!(result_no_ports, "src=10.0.0.1, dst=20.0.0.2");
let result_no_ports = FlowTuple::new(src_addr, dst_addr, None);
assert_eq!(result_no_ports.to_string(), "src=10.0.0.1, dst=20.0.0.2");
}
}
57 changes: 37 additions & 20 deletions flow-filter/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,31 +94,35 @@ impl FlowFilterTable {
ports: Option<(u16, u16)>,
) -> Option<VpcDiscriminant> {
// Get the table related to the source VPC for the packet
let table = self.get_table(src_vpcd)?;
let Some(table) = self.get_table(src_vpcd) else {
debug!("Could not find connections table for VPC {src_vpcd}");
return None;
};

let (src_port, dst_port) = ports.unzip();
// Look for valid connections information in the table that matches the source address and port.
// If nothing matches, use the default source entry, if any.
let src_connection_data = table
.lookup(src_addr, src_port)
.map(|(_, data)| data)
.or(table.default_source_opt.as_ref())?;
let Some(src_connection_data) = table.lookup(src_addr, src_port) else {
debug!("Could not find src connection data for src:{src_addr}, src_port:{src_port:?}");
return None;
};
debug!("Found src_connection_data: {src_connection_data:?}");

// We have a src_connection_data object for our source VPC and source IP, and source port
// ranges associated to this IP: we may need to find the right item for this entry based on
// the source port
let dst_connection_data = src_connection_data.get_remote_prefixes_data(src_port)?;
let Some(dst_connection_data) = src_connection_data.get_remote_prefixes_data(src_port)
else {
debug!("Could not find dst connection data for src:{src_addr}, src_port:{src_port:?}");
return None;
};
debug!("Found dst_connection_data: {dst_connection_data:?}");

// We have a dst_connection_data object for our source VPC, IP, port. From this object, we
// need to retrieve the prefix information associated to our destination IP and port.
let Some(remote_prefix_data) = dst_connection_data.lookup(dst_addr, dst_port) else {
let default_remote_opt = dst_connection_data.default_remote;
debug!(
"No remote prefix information found, looking for default remote: {default_remote_opt:?}"
);
return default_remote_opt;
debug!("Could not find remote prefix data for dst:{dst_addr}, dst_port:{dst_port:?}");
return None;
};
debug!("Found remote_prefix_data: {remote_prefix_data:?}");

Expand Down Expand Up @@ -223,7 +227,17 @@ impl VpcConnectionsTable {
}
}

fn lookup(&self, addr: &IpAddr, port: Option<u16>) -> Option<(Prefix, &SrcConnectionData)> {
fn lookup(&self, addr: &IpAddr, port: Option<u16>) -> Option<&SrcConnectionData> {
let (_, data) = self.trie.lookup(addr, port).unzip();
data.or(self.default_source_opt.as_ref())
}

#[cfg(test)]
fn lookup_with_prefix(
&self,
addr: &IpAddr,
port: Option<u16>,
) -> Option<(Prefix, &SrcConnectionData)> {
self.trie.lookup(addr, port)
}

Expand Down Expand Up @@ -494,7 +508,7 @@ impl ValueWithAssociatedRanges for RemotePrefixPortData {
#[derive(Debug, Clone)]
pub(crate) struct DstConnectionData {
trie: IpPortPrefixTrie<RemotePrefixPortData>,
default_remote: Option<VpcDiscriminant>,
default_remote: Option<RemotePrefixPortData>,
}

impl DstConnectionData {
Expand All @@ -514,12 +528,15 @@ impl DstConnectionData {
fn new_for_default_remote(vpcd: VpcDiscriminant) -> Self {
Self {
trie: IpPortPrefixTrie::new(),
default_remote: Some(vpcd),
default_remote: Some(RemotePrefixPortData::AllPorts(vpcd)),
}
}

fn lookup(&self, addr: &IpAddr, port: Option<u16>) -> Option<&RemotePrefixPortData> {
self.trie.lookup(addr, port).map(|(_, data)| data)
self.trie
.lookup(addr, port)
.map(|(_, data)| data)
.or(self.default_remote.as_ref())
}

fn update(
Expand Down Expand Up @@ -553,7 +570,7 @@ impl DstConnectionData {
"Trying to update default remote with an existing default remote".to_string(),
));
}
self.default_remote = Some(vpcd);
self.default_remote = Some(RemotePrefixPortData::AllPorts(vpcd));
Ok(())
}
}
Expand Down Expand Up @@ -705,13 +722,13 @@ mod tests {
.unwrap();

// Lookup should succeed
let result = table.lookup(&"10.0.0.5".parse().unwrap(), None);
let result = table.lookup_with_prefix(&"10.0.0.5".parse().unwrap(), None);
assert!(result.is_some());
let (prefix, _) = result.unwrap();
assert_eq!(prefix, src_prefix);

// Lookup for non-matching address should fail
let result = table.lookup(&"11.0.0.5".parse().unwrap(), None);
let result = table.lookup_with_prefix(&"11.0.0.5".parse().unwrap(), None);
assert!(result.is_none());
}

Expand All @@ -736,11 +753,11 @@ mod tests {
.unwrap();

// Lookup with matching port
let result = table.lookup(&"10.0.0.5".parse().unwrap(), Some(8085));
let result = table.lookup_with_prefix(&"10.0.0.5".parse().unwrap(), Some(8085));
assert!(result.is_some());

// Lookup with non-matching port
let result = table.lookup(&"10.0.0.5".parse().unwrap(), Some(9000));
let result = table.lookup_with_prefix(&"10.0.0.5".parse().unwrap(), Some(9000));
assert!(result.is_none());
}

Expand Down
13 changes: 4 additions & 9 deletions lpm/src/prefix/with_ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,27 +588,22 @@ impl FromStr for PortRange {

impl Display for PortRange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}", self.start, self.end)
write!(f, "[{}-{}]", self.start, self.end)
}
}

impl Display for PrefixWithPorts {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} <{}>", self.prefix, self.ports)
write!(f, "{}:{}", self.prefix, self.ports)
}
}

impl Display for PrefixWithOptionalPorts {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PrefixWithOptionalPorts::Prefix(prefix) => write!(f, "({prefix} </>)"),
PrefixWithOptionalPorts::Prefix(prefix) => write!(f, "{prefix}:[all]"),
PrefixWithOptionalPorts::PrefixPorts(prefix_with_ports) => {
write!(
f,
"({} <{}>)",
prefix_with_ports.prefix(),
prefix_with_ports.ports()
)
write!(f, "{prefix_with_ports}")
}
}
}
Expand Down
Loading