Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 264 additions & 0 deletions scapy/contrib/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
# SPDX-License-Identifier: GPL-2.0-or-later
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) 2024 Lucas Drufva <lucas.drufva@gmail.com>

# scapy.contrib.description = WebSocket
# scapy.contrib.status = loads

# Based on rfc6455

import struct
import base64
import zlib
from hashlib import sha1
from scapy.fields import (BitFieldLenField, Field, BitField, BitEnumField, ConditionalField, XNBytesField)
from scapy.layers.http import HTTPRequest, HTTPResponse, HTTP
from scapy.layers.inet import TCP
from scapy.packet import Packet
from scapy.error import Scapy_Exception
import logging


class PayloadLenField(BitFieldLenField):

def __init__(self, name, default, length_of, size=0, tot_size=0, end_tot_size=0):
# Initialize with length_of (like in BitFieldLenField) and lengthFrom (like in BitLenField)
super().__init__(name, default, size, length_of=length_of, tot_size=tot_size, end_tot_size=end_tot_size)

def getfield(self, pkt, s):
s, _ = s
# Get the 7-bit field (first byte)
length_byte = s[0] & 0x7F
s = s[1:]

if length_byte <= 125:
# 7-bit length
return s, length_byte
elif length_byte == 126:
# 16-bit length
length = struct.unpack("!H", s[:2])[0] # Read 2 bytes
s = s[2:]
return s, length
elif length_byte == 127:
# 64-bit length
length = struct.unpack("!Q", s[:8])[0] # Read 8 bytes
s = s[8:]
return s, length

def addfield(self, pkt, s, val):
p_field, p_val = pkt.getfield_and_val(self.length_of)
val = p_field.i2len(pkt, p_val)

if val <= 125:
self.size = 7
return super().addfield(pkt, s, val)
elif val <= 0xFFFF:
self.size = 7+16
s, _, masked = s
return s + struct.pack("!BH", 126 | masked, val)
elif val <= 0xFFFFFFFFFFFFFFFF:
self.size = 7+64
s, _, masked = s
return s + struct.pack("!BQ", 127 | masked, val)
else:
raise Scapy_Exception("%s: Payload length too large" %
self.__class__.__name__)
Comment on lines +54 to +66
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PayloadLenField.addfield() manually builds the second byte for extended payload lengths by unpacking the in-progress bitfield tuple (s, _, masked = s) and OR-ing masked into 126/127. This drops the partial bitfield state and also sets the mask bit in the wrong position (it should be 0x80, not 0x01), so masked frames with payload length > 125 will be serialized incorrectly. Consider relying on Scapy's bitfield packing for the mask/payloadLen byte and adding separate conditional fields for the 16/64-bit extended length.

Suggested change
self.size = 7
return super().addfield(pkt, s, val)
elif val <= 0xFFFF:
self.size = 7+16
s, _, masked = s
return s + struct.pack("!BH", 126 | masked, val)
elif val <= 0xFFFFFFFFFFFFFFFF:
self.size = 7+64
s, _, masked = s
return s + struct.pack("!BQ", 127 | masked, val)
else:
raise Scapy_Exception("%s: Payload length too large" %
self.__class__.__name__)
# Simple 7-bit length; let BitFieldLenField handle packing.
self.size = 7
return super().addfield(pkt, s, val)
elif val <= 0xFFFF:
# Extended 16-bit length. First pack the MASK + 7-bit length
# indicator (126) using the standard bitfield machinery, then
# append the 16-bit length to the raw bytes.
self.size = 7
s = super().addfield(pkt, s, 126)
raw, bitstate = s
raw += struct.pack("!H", val)
return raw, bitstate
elif val <= 0xFFFFFFFFFFFFFFFF:
# Extended 64-bit length. Same pattern as above, with indicator 127.
self.size = 7
s = super().addfield(pkt, s, 127)
raw, bitstate = s
raw += struct.pack("!Q", val)
return raw, bitstate
else:
raise Scapy_Exception("%s: Payload length too large" %
self.__class__.__name__)

Copilot uses AI. Check for mistakes.



class PayloadField(Field):
"""
Field for handling raw byte payloads with dynamic size.
The length of the payload is described by a preceding PayloadLenField.
"""
__slots__ = ["lengthFrom"]

def __init__(self, name, lengthFrom):
"""
:param name: Field name
:param lengthFrom: Field name that provides the length of the payload
"""
super(PayloadField, self).__init__(name, None)
self.lengthFrom = lengthFrom

def getfield(self, pkt, s):
# Fetch the length from the field that specifies the length
length = getattr(pkt, self.lengthFrom)
payloadData = s[:length]

if pkt.mask:
key = struct.pack("I", pkt.maskingKey)[::-1]
data_int = int.from_bytes(payloadData, 'big')
mask_repeated = key * (len(payloadData) // 4) + key[: len(payloadData) % 4]
mask_int = int.from_bytes(mask_repeated, 'big')
payloadData = (data_int ^ mask_int).to_bytes(len(payloadData), 'big')
Comment on lines +90 to +95
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PayloadField uses struct.pack("I", pkt.maskingKey)[::-1] to derive the masking key bytes. Using native-endian packing plus manual reversing is platform-dependent and hard to reason about. Prefer deterministic network-order packing (e.g. !I) or directly constructing the 4 key bytes to match the on-wire order used by XNBytesField.

Copilot uses AI. Check for mistakes.

if("permessage-deflate" in pkt.extensions):
try:
payloadData = pkt.decoder[0](payloadData + b"\x00\x00\xff\xff")
except Exception:
logging.debug("Failed to decompress payload", payloadData)
Comment on lines +97 to +101
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decompression is attempted whenever "permessage-deflate" in pkt.extensions, regardless of whether the frame is actually compressed (RFC6455 permessage-deflate uses RSV1 to indicate compression). This can silently corrupt payloads if the decoder happens to accept uncompressed data. Consider conditioning decompression on the RSV1 bit (e.g. pkt.rsv), and also handle decoder being unset (currently pkt.decoder[0] will fail).

Suggested change
if("permessage-deflate" in pkt.extensions):
try:
payloadData = pkt.decoder[0](payloadData + b"\x00\x00\xff\xff")
except Exception:
logging.debug("Failed to decompress payload", payloadData)
extensions = getattr(pkt, "extensions", None)
decoder = getattr(pkt, "decoder", None)
# RFC6455 permessage-deflate uses RSV1 to indicate compression.
if (extensions
and "permessage-deflate" in extensions
and getattr(pkt, "rsv", 0) & 0x4
and decoder
and callable(decoder[0])):
try:
payloadData = decoder[0](payloadData + b"\x00\x00\xff\xff")
except zlib.error:
logging.debug("Failed to decompress payload: %r", payloadData)

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handler calls logging.debug("Failed to decompress payload", payloadData). Passing payloadData as a second argument without a % placeholder will cause a TypeError in Python's logging (it tries to apply % formatting). Use a format placeholder (e.g. %r) or log with exc_info=True so decompression failures don't crash dissection.

Suggested change
logging.debug("Failed to decompress payload", payloadData)
logging.debug("Failed to decompress payload: %r", payloadData)

Copilot uses AI. Check for mistakes.

return s[length:], payloadData

def addfield(self, pkt, s, val):
if pkt.mask:
key = struct.pack("I", pkt.maskingKey)[::-1]
data_int = int.from_bytes(val, 'big')
mask_repeated = key * (len(val) // 4) + key[: len(val) % 4]
mask_int = int.from_bytes(mask_repeated, 'big')
val = (data_int ^ mask_int).to_bytes(len(val), 'big')

return s + bytes(val)

def i2len(self, pkt, val):
# Length of the payload in bytes
return len(val)

class WebSocket(Packet):
__slots__ = ["extensions", "decoder"]

name = "WebSocket"
fields_desc = [
BitField("fin", 0, 1),
BitField("rsv", 0, 3),
BitEnumField("opcode", 0, 4,
{
0x0: "none",
0x1: "text",
0x2: "binary",
0x8: "close",
0x9: "ping",
0xA: "pong",
}),
BitField("mask", 0, 1),
PayloadLenField("payloadLen", 0, length_of="wsPayload", size=1),
ConditionalField(XNBytesField("maskingKey", 0, sz=4), lambda pkt: pkt.mask == 1),
PayloadField("wsPayload", lengthFrom="payloadLen")
]

def __init__(self, pkt=None, extensions=[], decoder=None, *args, **fields):
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WebSocket.__init__ uses a mutable default argument (extensions=[]). This can leak state between instances if the list is mutated. Use None as the default and create a new list/dict per instance.

Suggested change
def __init__(self, pkt=None, extensions=[], decoder=None, *args, **fields):
def __init__(self, pkt=None, extensions=None, decoder=None, *args, **fields):
if extensions is None:
extensions = []

Copilot uses AI. Check for mistakes.
self.extensions = extensions
self.decoder = decoder
super().__init__(_pkt=pkt, *args, **fields)

def extract_padding(self, s):
return '', s
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract_padding() returns '' (a str) rather than b'' (bytes). Packet.extract_padding is expected to return bytes, and returning a string can break dissection and TCP reassembly logic. Return b"" for the payload portion.

Suggested change
return '', s
return b'', s

Copilot uses AI. Check for mistakes.

@classmethod
def tcp_reassemble(cls, data, metadata, session):
# data = the reassembled data from the same request/flow
# metadata = empty dictionary, that can be used to store data
# during TCP reassembly
# session = a dictionary proper to the bidirectional TCP session,
# that can be used to store anything
# [...]
# If the packet is available, return it. Otherwise don't.
# Whenever you return a packet, the buffer will be discarded.


HANDSHAKE_STATE_CLIENT_OPEN = 0
HANDSHAKE_STATE_SERVER_OPEN = 1
HANDSHAKE_STATE_OPEN = 2

if "handshake-state" not in session:
session["handshake-state"] = HANDSHAKE_STATE_CLIENT_OPEN

if "extensions" not in session:
session["extensions"] = {}


if session["handshake-state"] == HANDSHAKE_STATE_CLIENT_OPEN:
http_data = HTTP(data)
if HTTPRequest in http_data:
http_data = http_data[HTTPRequest]
else:
return http_data

if http_data.Method != b"GET":
return HTTP()/http_data

if not http_data.Upgrade or http_data.Upgrade.lower() != b"websocket":
return HTTP()/http_data

if not http_data.Unknown_Headers or b"Sec-WebSocket-Key" not in http_data.Unknown_Headers:
return HTTP()/http_data

session["handshake-key"] = http_data.Unknown_Headers[b"Sec-WebSocket-Key"]

if "original" in metadata:
session["server-port"] = metadata["original"][TCP].dport

session["handshake-state"] = HANDSHAKE_STATE_SERVER_OPEN

return http_data

elif session["handshake-state"] == HANDSHAKE_STATE_SERVER_OPEN:
http_data = HTTP(data)
if HTTPResponse in http_data:
http_data = http_data[HTTPResponse]
else:
return http_data

if not http_data.Upgrade.lower() == b"websocket":
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the SERVER_OPEN handshake branch, http_data.Upgrade.lower() is called without checking that Upgrade is present. For malformed/partial responses this will raise AttributeError and break TCP stream parsing. Mirror the request-side guard (if not http_data.Upgrade or ...).

Suggested change
if not http_data.Upgrade.lower() == b"websocket":
if not http_data.Upgrade or http_data.Upgrade.lower() != b"websocket":

Copilot uses AI. Check for mistakes.
return HTTP()/http_data

# Verify key-accept handshake:
correct_accept = base64.b64encode(sha1(session["handshake-key"] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11".encode()).digest())
if not http_data.Unknown_Headers or b"Sec-WebSocket-Accept" not in http_data.Unknown_Headers or http_data.Unknown_Headers[b"Sec-WebSocket-Accept"] != correct_accept:
# TODO: handle or Logg wrong accept key
pass

if b"Sec-WebSocket-Extensions" in http_data.Unknown_Headers:
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if b"Sec-WebSocket-Extensions" in http_data.Unknown_Headers: assumes Unknown_Headers is a dict. When the response is malformed and Unknown_Headers is None (which is possible per HTTP dissector), this will raise TypeError. Guard with if http_data.Unknown_Headers and ... before doing membership checks.

Suggested change
if b"Sec-WebSocket-Extensions" in http_data.Unknown_Headers:
if http_data.Unknown_Headers and b"Sec-WebSocket-Extensions" in http_data.Unknown_Headers:

Copilot uses AI. Check for mistakes.
session["extensions"] = {}
for extension in http_data.Unknown_Headers[b"Sec-WebSocket-Extensions"].decode().strip().split(";"):
key_value_pair = extension.split("=", 1) + [None]
session["extensions"][key_value_pair[0].strip()] = key_value_pair[1]

if "permessage-deflate" in session["extensions"]:
def create_decompressor(window_bits):
decoder = zlib.decompressobj(wbits=-window_bits)
def decomp(data):
nonlocal decoder
return decoder.decompress(data, 0)

def reset():
nonlocal decoder
nonlocal window_bits
decoder = zlib.decompressobj(wbits=-window_bits)

return (decomp, reset)

# Default values
client_wb = 12
server_wb = 15

# Check for new values in extensions header
if "client_max_window_bits" in session["extensions"]:
client_wb = int(session["extensions"]["client_max_window_bits"])

if "server_max_window_bits" in session["extensions"]:
server_wb = int(session["extensions"]["server_max_window_bits"])


session["server-decoder"] = create_decompressor(client_wb)
session["client-decoder"] = create_decompressor(server_wb)


session["handshake-state"] = HANDSHAKE_STATE_OPEN

return HTTP()/http_data


# Handshake is done:
if "original" not in metadata:
return

if "permessage-deflate" in session["extensions"]:
is_server = True if metadata["original"][TCP].sport == session["server-port"] else False
ws = WebSocket(bytes(data), extensions=session["extensions"], decoder = session["server-decoder"] if is_server else session["client-decoder"])
Comment on lines +255 to +260
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the handshake, the code returns early when "original" not in metadata. In TCPSession, subsequent parsing of leftover bytes (padding) calls tcp_reassemble with a cleared metadata dict, so this guard can prevent dissecting multiple WebSocket frames that arrive in the same TCP segment. Consider storing the needed direction info in session (or metadata) during the first call and allowing parsing even when metadata["original"] is missing.

Suggested change
if "original" not in metadata:
return
if "permessage-deflate" in session["extensions"]:
is_server = True if metadata["original"][TCP].sport == session["server-port"] else False
ws = WebSocket(bytes(data), extensions=session["extensions"], decoder = session["server-decoder"] if is_server else session["client-decoder"])
if "permessage-deflate" in session["extensions"]:
# Determine direction (server vs client). When metadata["original"] is
# not available (e.g., leftover bytes in the same TCP segment),
# fall back to the last known direction stored in the session.
if "original" in metadata:
is_server = metadata["original"][TCP].sport == session["server-port"]
session["last-direction-is-server"] = is_server
else:
is_server = session.get("last-direction-is-server", False)
ws = WebSocket(
bytes(data),
extensions=session["extensions"],
decoder=session["server-decoder"] if is_server else session["client-decoder"],
)

Copilot uses AI. Check for mistakes.
return ws
else:
ws = WebSocket(bytes(data), extensions=session["extensions"])
return ws
16 changes: 8 additions & 8 deletions scapy/layers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,10 @@ def do_dissect(self, s):
"""From the HTTP packet string, populate the scapy object"""
first_line, body = _dissect_headers(self, s)
try:
Method, Path, HTTPVersion = re.split(br"\s+", first_line, maxsplit=2)
self.setfieldval('Method', Method)
self.setfieldval('Path', Path)
self.setfieldval('Http_Version', HTTPVersion)
method_path_version = re.split(br"\s+", first_line, maxsplit=2) + [None]
self.setfieldval('Method', method_path_version[0])
self.setfieldval('Path', method_path_version[1])
self.setfieldval('Http_Version', method_path_version[2])
Comment on lines +532 to +535
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do_dissect() now pads the split result with only one None, but still indexes [2]. If first_line contains only one token (e.g. truncated/invalid request line), this will raise IndexError whereas the previous unpacking raised ValueError and was safely ignored. Consider preserving the old behavior (unpack in a try/except) or explicitly guarding the list length before indexing, and avoid storing None into HTTP string fields (prefer leaving the field unset or using b"").

Copilot uses AI. Check for mistakes.
except ValueError:
pass
if body:
Expand Down Expand Up @@ -573,10 +573,10 @@ def do_dissect(self, s):
''' From the HTTP packet string, populate the scapy object '''
first_line, body = _dissect_headers(self, s)
try:
HTTPVersion, Status, Reason = re.split(br"\s+", first_line, maxsplit=2)
self.setfieldval('Http_Version', HTTPVersion)
self.setfieldval('Status_Code', Status)
self.setfieldval('Reason_Phrase', Reason)
version_status_reason = re.split(br"\s+", first_line, maxsplit=2) + [None]
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For responses without a reason phrase, Reason_Phrase is set to None. Downstream code (e.g. summaries/pretty-printing) expects bytes-like values for HTTP header fields; using None can lead to odd representations ("None") and makes round-tripping harder. Consider setting an explicit empty bytes value (or leaving the field unset) when the reason phrase is omitted.

Suggested change
version_status_reason = re.split(br"\s+", first_line, maxsplit=2) + [None]
version_status_reason = re.split(br"\s+", first_line, maxsplit=2) + [b""]

Copilot uses AI. Check for mistakes.
self.setfieldval('Http_Version', version_status_reason[0])
self.setfieldval('Status_Code', version_status_reason[1])
self.setfieldval('Reason_Phrase', version_status_reason[2])
Comment on lines +576 to +579
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change adds support for omitting the reason phrase, but there is no regression test covering parsing of a status line like HTTP/1.1 101\r\n (no reason phrase). Adding a small unit/regression test would prevent future breakage of this behavior.

Copilot uses AI. Check for mistakes.
except ValueError:
pass
if body:
Expand Down
96 changes: 96 additions & 0 deletions test/contrib/websocket.uts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# WebSocket layer unit tests
# Copyright (C) 2024 Lucas Drufva <lucas.drufva@gmail.com>
#
# Type the following command to launch start the tests:
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment reads "launch start the tests" (duplicated verb). Consider rephrasing to "start the tests" for clarity.

Suggested change
# Type the following command to launch start the tests:
# Type the following command to start the tests:

Copilot uses AI. Check for mistakes.
# $ test/run_tests -P "load_contrib('websocket')" -t test/contrib/websocket.uts

+ Syntax check
= Import the WebSocket layer
from scapy.contrib.websocket import *

+ WebSocket protocol test
= Packet instantiation
pkt = WebSocket(wsPayload=b"Hello, world!", opcode="text", mask=True, maskingKey=0x11223344)

assert pkt.wsPayload == b"Hello, world!"
assert pkt.mask == True
assert pkt.maskingKey == 0x11223344
assert bytes(pkt) == b'\x01\x8d\x11\x22\x33\x44\x59\x47\x5f\x28\x7e\x0e\x13\x33\x7e\x50\x5f\x20\x30'

= Packet dissection
raw = b'\x01\x0dHello, world!'
pkt = WebSocket(raw)

assert pkt.fin == 0
assert pkt.rsv == 0
assert pkt.opcode == 0x1
assert pkt.mask == False
assert pkt.payloadLen == 13
assert pkt.wsPayload == b'Hello, world!'

= Dissect masked packet
raw = b'\x01\x8d\x11\x22\x33\x44\x59\x47\x5f\x28\x7e\x0e\x13\x33\x7e\x50\x5f\x20\x30'
pkt = WebSocket(raw)

assert pkt.fin == 0
assert pkt.rsv == 0
assert pkt.opcode == 0x1
assert pkt.mask == True
assert pkt.payloadLen == 13
assert pkt.wsPayload == b'Hello, world!'

= Session with compression

bind_layers(TCP, WebSocket, dport=5000)
bind_layers(TCP, WebSocket, sport=5000)

from scapy.sessions import TCPSession

filename = scapy_path("/test/pcaps/websocket_compressed_session.pcap")
pkts = sniff(offline=filename, session=TCPSession)

assert len(pkts) == 13

assert pkts[7][WebSocket].wsPayload == b'Hello'
assert pkts[8][WebSocket].wsPayload == b'"Hello"'
assert pkts[10][WebSocket].wsPayload == b'Hello2'
assert pkts[11][WebSocket].wsPayload == b'"Hello2"'

= Create packet with long payload
pkt = WebSocket(wsPayload=b"a"*126, opcode="text")

assert bytes(pkt) == b'\x01\x7e\x00\x7e\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61'

= Dissect packet with long payload
raw = b'\x01\x7e\x00\x7e\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61'
pkt = WebSocket(raw)

assert pkt.payloadLen == 126
assert pkt.wsPayload == b'a'*126

= Create packet with very long payload
pkt = WebSocket(wsPayload=b"a"*65536, opcode="text")

assert bytes(pkt) == b'\x01\x7f\x00\x00\x00\x00\x00\x01\x00\x00' + b'a'*65536

= Dissect packet with very long payload
raw = b'\x01\x7f\x00\x00\x00\x00\x00\x01\x00\x00' + b'a'*65536
pkt = WebSocket(raw)

assert pkt.payloadLen == 65536
assert pkt.wsPayload == b'a'*65536

= Session with invalid parts in upgrade sequence

bind_layers(TCP, WebSocket, dport=5000)
bind_layers(TCP, WebSocket, sport=5000)

from scapy.sessions import TCPSession

filename = scapy_path("/test/pcaps/websocket_invalid_handshakes.pcap")
pkts = sniff(offline=filename, session=TCPSession)


assert len(pkts) == 19

assert pkts[15][WebSocket].wsPayload == b'Hello, world!'
Binary file added test/pcaps/websocket_compressed_session.pcap
Binary file not shown.
Binary file added test/pcaps/websocket_invalid_handshakes.pcap
Binary file not shown.