""" Packet Builder - constructs Scapy packets from flow configuration. Each generated packet embeds: - Magic bytes b'TGEN' (4 bytes) - Sequence number (4 bytes, big-endian) - Sender timestamp in nanoseconds (8 bytes, big-endian) - Padding to reach requested frame_size """ import struct import time from scapy.all import ( Ether, IP, UDP, TCP, ICMP, Dot1Q, Raw, conf, ) MAGIC = b'TGEN' HEADER_LEN = 4 + 4 + 8 # magic + seq + timestamp_ns def _build_payload(seq: int, frame_size: int, header_overhead: int) -> Raw: """Build payload with magic bytes, sequence number, timestamp placeholder, and padding to reach the desired frame_size.""" timestamp_ns = time.time_ns() header = MAGIC + struct.pack('!I', seq) + struct.pack('!Q', timestamp_ns) # frame_size includes Ethernet header (14) + FCS (4) in standard accounting, # but Scapy doesn't add FCS, so we target frame_size - 4 total bytes on wire. # header_overhead accounts for Ether + IP + L4 headers already present. pad_len = max(0, frame_size - 4 - header_overhead - HEADER_LEN) return Raw(load=header + (b'\x00' * pad_len)) def stamp_payload(payload_bytes: bytes, seq: int) -> bytes: """Re-stamp an existing payload with a new sequence number and fresh timestamp.""" timestamp_ns = time.time_ns() return ( MAGIC + struct.pack('!I', seq) + struct.pack('!Q', timestamp_ns) + payload_bytes[HEADER_LEN:] ) def parse_payload(payload_bytes: bytes): """Extract (seq, timestamp_ns) from a TGEN payload, or None if invalid.""" if len(payload_bytes) < HEADER_LEN: return None if payload_bytes[:4] != MAGIC: return None seq = struct.unpack('!I', payload_bytes[4:8])[0] timestamp_ns = struct.unpack('!Q', payload_bytes[8:16])[0] return seq, timestamp_ns def build_packet(flow_config: dict, seq: int = 0): """Build a Scapy packet from a flow configuration dict. Required keys: dst_ip, protocol Optional keys: src_mac, dst_mac, src_ip, src_port, dst_port, dscp, vlan_id, frame_size """ protocol = flow_config.get('protocol', 'udp').lower() frame_size = flow_config.get('frame_size', 512) # --- Layer 2 --- src_mac = flow_config.get('src_mac', 'auto') dst_mac = flow_config.get('dst_mac') ether_kwargs = {} if src_mac and src_mac != 'auto': ether_kwargs['src'] = src_mac if dst_mac: ether_kwargs['dst'] = dst_mac pkt = Ether(**ether_kwargs) header_overhead = 14 # Ethernet # --- VLAN --- vlan_id = flow_config.get('vlan_id') if vlan_id is not None: pkt = pkt / Dot1Q(vlan=int(vlan_id)) header_overhead += 4 # --- Layer 3 --- ip_kwargs = {'dst': flow_config['dst_ip']} src_ip = flow_config.get('src_ip') if src_ip: ip_kwargs['src'] = src_ip dscp = flow_config.get('dscp', 0) if dscp: ip_kwargs['tos'] = int(dscp) << 2 pkt = pkt / IP(**ip_kwargs) header_overhead += 20 # IP (no options) # --- Layer 4 --- if protocol == 'udp': src_port = flow_config.get('src_port', 12000) dst_port = flow_config.get('dst_port', 5001) pkt = pkt / UDP(sport=int(src_port), dport=int(dst_port)) header_overhead += 8 elif protocol == 'tcp': src_port = flow_config.get('src_port', 12000) dst_port = flow_config.get('dst_port', 80) pkt = pkt / TCP(sport=int(src_port), dport=int(dst_port), flags='S') header_overhead += 20 elif protocol == 'icmp': pkt = pkt / ICMP() header_overhead += 8 else: raise ValueError(f'Unsupported protocol: {protocol}') # --- Payload --- pkt = pkt / _build_payload(seq, frame_size, header_overhead) return pkt