Skip to content
Merged
181 changes: 164 additions & 17 deletions flow-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ impl FlowFilter {
.map(NonZero::get)
.zip(t.dst_port().map(NonZero::get))
});
let log_str = format_packet_addrs_ports(&src_ip, &dst_ip, ports);

// For Display
let tuple = FlowTuple::new(src_ip, dst_ip, ports);

if let Some(dst_vpcd) = tablesr.lookup(src_vpcd, &src_ip, &dst_ip, ports) {
debug!("{nfi}: Flow allowed: {log_str}, setting packet dst_vpcd to {dst_vpcd}");
debug!("{nfi}: Flow {tuple} is allowed, setting packet dst_vpcd to {dst_vpcd}",);
packet.meta_mut().dst_vpcd = Some(dst_vpcd);
} else {
debug!("{nfi}: Flow not allowed, dropping packet: {log_str}");
debug!("{nfi}: Flow {tuple} is NOT allowed, dropping packet",);
packet.done(DoneReason::Filtered);
}
}
Expand All @@ -105,16 +107,45 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for FlowFilter {
}
}

fn format_packet_addrs_ports(
src_addr: &IpAddr,
dst_addr: &IpAddr,
ports: Option<(u16, u16)>,
) -> String {
format!(
"src={src_addr}{}, dst={dst_addr}{}",
ports.map_or(String::new(), |p| format!(":{}", p.0)),
ports.map_or(String::new(), |p| format!(":{}", p.1))
)
// Only used for Display
struct OptPort(Option<u16>);
impl std::fmt::Display for OptPort {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(port) = self.0 {
write!(f, ":{port}")?;
}
Ok(())
}
}

// Only used for Display
struct FlowTuple {
src_addr: IpAddr,
dst_addr: IpAddr,
src_port: OptPort,
dst_port: OptPort,
}

impl FlowTuple {
fn new(src_addr: IpAddr, dst_addr: IpAddr, ports: Option<(u16, u16)>) -> Self {
let ports = ports.unzip();
Self {
src_addr,
dst_addr,
src_port: OptPort(ports.0),
dst_port: OptPort(ports.1),
}
}
}

impl std::fmt::Display for FlowTuple {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"src={}{}, dst={}{}",
self.src_addr, self.src_port, self.dst_addr, self.dst_port
)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -595,6 +626,122 @@ mod tests {
assert_eq!(packets[0].meta().dst_vpcd, None);
}

#[traced_test]
#[test]
fn test_flow_filter_table_check_send_from_default() {
let vni1 = Vni::new_checked(100).unwrap();
let vni2 = Vni::new_checked(200).unwrap();

let mut vpc_table = VpcTable::new();
vpc_table
.add(Vpc::new("vpc1", "VPC01", vni1.as_u32()).unwrap())
.unwrap();
vpc_table
.add(Vpc::new("vpc2", "VPC02", vni2.as_u32()).unwrap())
.unwrap();

let mut peering_table = VpcPeeringTable::new();
peering_table
.add(VpcPeering::new(
"vpc1-to-vpc2",
VpcManifest {
name: "vpc1".to_string(),
exposes: vec![VpcExpose::empty().set_default()],
},
VpcManifest {
name: "vpc2".to_string(),
exposes: vec![VpcExpose::empty().ip("5.0.0.0/24".into())],
},
None,
))
.unwrap();

let mut overlay = Overlay::new(vpc_table, peering_table);
// Validation is necessary to build overlay.vpc_table's peerings from peering_table
overlay.validate().unwrap();

let table = FlowFilterTable::build_from_overlay(&overlay).unwrap();

let mut writer = FlowFilterTableWriter::new();
writer.update_flow_filter_table(table);

let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader());

// Test with a packet

let packet = create_test_packet(
Some(vni1),
"99.99.99.99".parse().unwrap(), // From "default" expose, use any address
"5.0.0.8".parse().unwrap(),
);

let packets = flow_filter
.process([packet].into_iter())
.collect::<Vec<_>>();

assert_eq!(packets.len(), 1);
assert!(!packets[0].is_done(), "{:?}", packets[0].get_done());
assert_eq!(packets[0].meta().dst_vpcd, Some(vni2.into()));
}

#[traced_test]
#[test]
fn test_flow_filter_table_check_default_to_default() {
let vni1 = Vni::new_checked(100).unwrap();
let vni2 = Vni::new_checked(200).unwrap();

let mut vpc_table = VpcTable::new();
vpc_table
.add(Vpc::new("vpc1", "VPC01", vni1.as_u32()).unwrap())
.unwrap();
vpc_table
.add(Vpc::new("vpc2", "VPC02", vni2.as_u32()).unwrap())
.unwrap();

let mut peering_table = VpcPeeringTable::new();
peering_table
.add(VpcPeering::new(
"vpc1-to-vpc2",
VpcManifest {
name: "vpc1".to_string(),
exposes: vec![VpcExpose::empty().set_default()],
},
VpcManifest {
name: "vpc2".to_string(),
exposes: vec![VpcExpose::empty().set_default()],
},
None,
))
.unwrap();

let mut overlay = Overlay::new(vpc_table, peering_table);
// Validation is necessary to build overlay.vpc_table's peerings from peering_table
overlay.validate().unwrap();

let table = FlowFilterTable::build_from_overlay(&overlay).unwrap();

let mut writer = FlowFilterTableWriter::new();
writer.update_flow_filter_table(table);

let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader());

// Test with packets

let packet = create_test_packet(
Some(vni1),
"99.99.99.99".parse().unwrap(),
"77.77.77.77".parse().unwrap(),
);

let packets = flow_filter
.process([packet].into_iter())
.collect::<Vec<_>>();

assert_eq!(packets.len(), 1);
assert!(!packets[0].is_done(), "{:?}", packets[0].get_done());
assert_eq!(packets[0].meta().dst_vpcd, Some(vni2.into()));
}

#[test]
fn test_flow_filter_batch_processing() {
// Setup table
Expand Down Expand Up @@ -652,10 +799,10 @@ mod tests {
let src_addr = "10.0.0.1".parse().unwrap();
let dst_addr = "20.0.0.2".parse().unwrap();

let result = format_packet_addrs_ports(&src_addr, &dst_addr, Some((8080, 443)));
assert_eq!(result, "src=10.0.0.1:8080, dst=20.0.0.2:443");
let result = FlowTuple::new(src_addr, dst_addr, Some((8080, 443)));
assert_eq!(result.to_string(), "src=10.0.0.1:8080, dst=20.0.0.2:443");

let result_no_ports = format_packet_addrs_ports(&src_addr, &dst_addr, None);
assert_eq!(result_no_ports, "src=10.0.0.1, dst=20.0.0.2");
let result_no_ports = FlowTuple::new(src_addr, dst_addr, None);
assert_eq!(result_no_ports.to_string(), "src=10.0.0.1, dst=20.0.0.2");
}
}
64 changes: 43 additions & 21 deletions flow-filter/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,51 @@ impl FlowFilterTable {
) -> Result<(), ConfigError> {
let local_vpcd = VpcDiscriminant::VNI(vpc.vni);
let dst_vpcd = VpcDiscriminant::VNI(overlay.vpc_table.get_remote_vni(peering));
let local_prefixes = peering.local.exposes.iter().flat_map(|expose| &expose.ips);

for remote in &peering.remote.exposes {
if remote.default {
for local_prefix in local_prefixes.clone() {
self.insert_default_remote(
dst_vpcd,
local_vpcd,
local_prefix.prefix(),
local_prefix.ports(),
)?;

for remote_expose in &peering.remote.exposes {
if remote_expose.default {
for local_expose in &peering.local.exposes {
if local_expose.default {
// Both the local and remote expose are default exposes
self.insert_default_source_to_default_remote(local_vpcd, dst_vpcd)?;
} else {
// Only the remote expose is a default expose
for local_prefix in &local_expose.ips {
self.insert_default_remote(
local_vpcd,
dst_vpcd,
local_prefix.prefix(),
local_prefix.ports(),
)?;
}
}
}
} else {
for local_prefix in local_prefixes.clone() {
for remote_prefix in remote.public_ips() {
self.insert(
local_vpcd,
dst_vpcd,
local_prefix.prefix(),
local_prefix.ports(),
remote_prefix.prefix(),
remote_prefix.ports(),
)?;
for local_expose in &peering.local.exposes {
if local_expose.default {
// Only the local expose is a default expose
for remote_prefix in remote_expose.public_ips() {
self.insert_default_source(
local_vpcd,
dst_vpcd,
remote_prefix.prefix(),
remote_prefix.ports(),
)?;
}
} else {
// No default expose
for local_prefix in &local_expose.ips {
for remote_prefix in remote_expose.public_ips() {
self.insert(
local_vpcd,
dst_vpcd,
local_prefix.prefix(),
local_prefix.ports(),
remote_prefix.prefix(),
remote_prefix.ports(),
)?;
}
}
}
}
}
Expand Down
Loading
Loading