Tools like tcpdump and wireshark can parse the TCP Authentication Option but there is not yet support to verify correct signatures. This patch implements TCP-AO signature verification using scapy and the python cryptography package and applies it to captures of linux traffic in multiple scenarios (ipv4, ipv6 etc). The python code is verified itself with a subset of IETF test vectors from this page: https://datatracker.ietf.org/doc/html/draft-touch-tcpm-ao-test-vectors-02 Signed-off-by: Leonard Crestez <cdleonard@xxxxxxxxx> --- .../full_tcp_sniff_session.py | 53 +++ .../tcp_authopt_test/tcp_authopt_alg.py | 276 ++++++++++++++ .../tcp_authopt_test/test_vectors.py | 359 ++++++++++++++++++ .../tcp_authopt_test/test_verify_capture.py | 123 ++++++ .../tcp_authopt/tcp_authopt_test/validator.py | 158 ++++++++ 5 files changed, 969 insertions(+) create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/full_tcp_sniff_session.py create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/tcp_authopt_alg.py create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_verify_capture.py create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/full_tcp_sniff_session.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/full_tcp_sniff_session.py new file mode 100644 index 000000000000..d709e83c8700 --- /dev/null +++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/full_tcp_sniff_session.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: GPL-2.0 +import scapy.sessions +from scapy.layers.inet import TCP + +from .utils import SimpleWaitEvent + + +class FullTCPSniffSession(scapy.sessions.DefaultSession): + """Implementation of a scapy sniff session that can wait for a full TCP capture + + Allows another thread to wait for a complete FIN handshake without polling or sleep. + """ + + found_syn = False + found_synack = False + found_fin = False + found_client_fin = False + found_server_fin = False + + def __init__(self, server_port=None, **kw): + super().__init__(**kw) + self.server_port = server_port + self._close_event = SimpleWaitEvent() + + def on_packet_received(self, p): + super().on_packet_received(p) + if not p or not TCP in p: + return + th = p[TCP] + # logger.debug("sport=%d dport=%d flags=%s", th.sport, th.dport, th.flags) + if th.flags.S and not th.flags.A: + if th.dport == self.server_port or self.server_port is None: + self.found_syn = True + if th.flags.S and th.flags.A: + if th.sport == self.server_port or self.server_port is None: + self.found_synack = True + if th.flags.F: + if self.server_port is None: + self.found_fin = True + self._close_event.set() + elif self.server_port == th.dport: + self.found_client_fin = True + self.found_fin = True + if self.found_server_fin and self.found_client_fin: + self._close_event.set() + elif self.server_port == th.sport: + self.found_server_fin = True + self.found_fin = True + if self.found_server_fin and self.found_client_fin: + self._close_event.set() + + def wait_close(self, timeout=10): + self._close_event.wait(timeout=timeout) diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/tcp_authopt_alg.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/tcp_authopt_alg.py new file mode 100644 index 000000000000..093cb4716184 --- /dev/null +++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/tcp_authopt_alg.py @@ -0,0 +1,276 @@ +# SPDX-License-Identifier: GPL-2.0 +"""Packet-processing utilities implementing RFC5925 and RFC2926""" + +import logging +from dataclasses import dataclass +from ipaddress import IPv4Address, IPv6Address +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6 +from scapy.packet import Packet +import socket +import struct +import typing +import hmac + +logger = logging.getLogger(__name__) + + +def kdf_sha1(master_key: bytes, context: bytes) -> bytes: + """RFC5926 section 3.1.1.1""" + input = b"\x01" + b"TCP-AO" + context + b"\x00\xa0" + return hmac.digest(master_key, input, "SHA1") + + +def mac_sha1(traffic_key: bytes, message: bytes) -> bytes: + """RFC5926 section 3.2.1""" + return hmac.digest(traffic_key, message, "SHA1")[:12] + + +def cmac_aes_digest(key: bytes, msg: bytes) -> bytes: + from cryptography.hazmat.primitives import cmac + from cryptography.hazmat.primitives.ciphers import algorithms + from cryptography.hazmat.backends import default_backend + + backend = default_backend() + c = cmac.CMAC(algorithms.AES(key), backend=backend) + c.update(bytes(msg)) + return c.finalize() + + +def kdf_cmac_aes(master_key: bytes, context: bytes) -> bytes: + if len(master_key) == 16: + key = master_key + else: + key = cmac_aes_digest(b"\x00" * 16, master_key) + return cmac_aes_digest(key, b"\x01" + b"TCP-AO" + context + b"\x00\x80") + + +def mac_cmac_aes(traffic_key: bytes, message: bytes) -> bytes: + return cmac_aes_digest(traffic_key, message)[:12] + + +class TcpAuthOptAlg: + def kdf(self, master_key: bytes, context: bytes) -> bytes: + raise NotImplementedError() + + def mac(self, traffic_key: bytes, message: bytes) -> bytes: + raise NotImplementedError() + + +class TcpAuthOptAlg_HMAC_SHA1(TcpAuthOptAlg): + def kdf(self, master_key: bytes, context: bytes) -> bytes: + return kdf_sha1(master_key, context) + + def mac(self, traffic_key: bytes, message: bytes) -> bytes: + return mac_sha1(traffic_key, message) + + +class TcpAuthOptAlg_CMAC_AES(TcpAuthOptAlg): + def kdf(self, master_key: bytes, context: bytes) -> bytes: + return kdf_cmac_aes(master_key, context) + + def mac(self, traffic_key: bytes, message: bytes) -> bytes: + return mac_cmac_aes(traffic_key, message) + + +def get_alg(name) -> TcpAuthOptAlg: + if name.upper() == "HMAC-SHA-1-96": + return TcpAuthOptAlg_HMAC_SHA1() + elif name.upper() == "AES-128-CMAC-96": + return TcpAuthOptAlg_CMAC_AES() + else: + raise ValueError(f"Bad TCP AuthOpt algorithms {name}") + + +IPvXAddress = typing.Union[IPv4Address, IPv6Address] + + +def get_scapy_ipvx_src(p: Packet) -> IPvXAddress: + if IP in p: + return IPv4Address(p[IP].src) + elif IPv6 in p: + return IPv6Address(p[IPv6].src) + else: + raise Exception("Neither IP nor IPv6 found on packet") + + +def get_scapy_ipvx_dst(p: Packet) -> IPvXAddress: + if IP in p: + return IPv4Address(p[IP].dst) + elif IPv6 in p: + return IPv6Address(p[IPv6].dst) + else: + raise Exception("Neither IP nor IPv6 found on packet") + + +def build_context( + saddr: IPvXAddress, daddr: IPvXAddress, sport, dport, src_isn, dst_isn +) -> bytes: + """Build context bytes as specified by RFC5925 section 5.2""" + return ( + saddr.packed + + daddr.packed + + struct.pack( + "!HHII", + sport, + dport, + src_isn, + dst_isn, + ) + ) + + +def build_context_from_scapy(p: Packet, src_isn: int, dst_isn: int) -> bytes: + """Build context based on a scapy Packet and src/dst initial-sequence numbers""" + return build_context( + get_scapy_ipvx_src(p), + get_scapy_ipvx_dst(p), + p[TCP].sport, + p[TCP].dport, + src_isn, + dst_isn, + ) + + +def build_context_from_scapy_syn(p: Packet) -> bytes: + """Build context for a scapy SYN packet""" + return build_context_from_scapy(p, p[TCP].seq, 0) + + +def build_context_from_scapy_synack(p: Packet) -> bytes: + """Build context for a scapy SYN/ACK packet""" + return build_context_from_scapy(p, p[TCP].seq, p[TCP].ack - 1) + + +def build_message_from_scapy(p: Packet, include_options=True, sne=0) -> bytearray: + """Build message bytes as described by RFC5925 section 5.1""" + result = bytearray() + result += struct.pack("!I", sne) + # ip pseudo-header: + if IP in p: + result += struct.pack( + "!4s4sHH", + IPv4Address(p[IP].src).packed, + IPv4Address(p[IP].dst).packed, + socket.IPPROTO_TCP, + p[TCP].dataofs * 4 + len(p[TCP].payload), + ) + assert p[TCP].dataofs * 4 + len(p[TCP].payload) + p[IP].ihl * 4 == p[IP].len + elif IPv6 in p: + result += struct.pack( + "!16s16sII", + IPv6Address(p[IPv6].src).packed, + IPv6Address(p[IPv6].dst).packed, + p[IPv6].plen, + socket.IPPROTO_TCP, + ) + assert p[TCP].dataofs * 4 + len(p[TCP].payload) == p[IPv6].plen + else: + raise Exception("Neither IP nor IPv6 found on packet") + + # tcp header with checksum set to zero + th_bytes = bytes(p[TCP]) + result += th_bytes[:16] + result += b"\x00\x00" + result += th_bytes[18:20] + + # Even if include_options=False the TCP-AO option itself is still included + # with the MAC set to all-zeros. This means we need to parse TCP options. + pos = 20 + tcphdr_optend = p[TCP].dataofs * 4 + # logger.info("th_bytes: %s", th_bytes.hex(' ')) + assert len(th_bytes) >= tcphdr_optend + while pos < tcphdr_optend: + optnum = th_bytes[pos] + pos += 1 + if optnum == 0 or optnum == 1: + if include_options: + result += bytes([optnum]) + continue + + optlen = th_bytes[pos] + pos += 1 + if pos + optlen - 2 > tcphdr_optend: + logger.info( + "bad tcp option %d optlen %d beyond end-of-header", optnum, optlen + ) + break + if optlen < 2: + logger.info("bad tcp option %d optlen %d less than two", optnum, optlen) + break + if optnum == 29: + if optlen < 4: + logger.info("bad tcp option %d optlen %d", optnum, optlen) + break + result += bytes([optnum, optlen]) + result += th_bytes[pos : pos + 2] + result += (optlen - 4) * b"\x00" + elif include_options: + result += bytes([optnum, optlen]) + result += th_bytes[pos : pos + optlen - 2] + pos += optlen - 2 + result += bytes(p[TCP].payload) + return result + + +@dataclass +class TCPAuthContext: + """Context used to TCP Authentication option as defined in RFC5925 5.2""" + + saddr: IPvXAddress = None + daddr: IPvXAddress = None + sport: int = 0 + dport: int = 0 + sisn: int = 0 + disn: int = 0 + + def pack(self, syn=False, rev=False) -> bytes: + if rev: + return build_context( + self.daddr, + self.saddr, + self.dport, + self.sport, + self.disn if not syn else 0, + self.sisn, + ) + else: + return build_context( + self.saddr, + self.daddr, + self.sport, + self.dport, + self.sisn, + self.disn if not syn else 0, + ) + + def rev(self) -> "TCPAuthContext": + """Reverse""" + return TCPAuthContext( + saddr=self.daddr, + daddr=self.saddr, + sport=self.dport, + dport=self.sport, + sisn=self.disn, + disn=self.sisn, + ) + + def init_from_syn_packet(self, p): + """Init from a SYN packet (and set dist to zero)""" + assert p[TCP].flags.S and not p[TCP].flags.A and p[TCP].ack == 0 + self.saddr = get_scapy_ipvx_src(p) + self.daddr = get_scapy_ipvx_dst(p) + self.sport = p[TCP].sport + self.dport = p[TCP].dport + self.sisn = p[TCP].seq + self.disn = 0 + + def update_from_synack_packet(self, p): + """Update disn and check everything else matches""" + assert p[TCP].flags.S and p[TCP].flags.A + assert self.saddr == get_scapy_ipvx_dst(p) + assert self.daddr == get_scapy_ipvx_src(p) + assert self.sport == p[TCP].dport + assert self.dport == p[TCP].sport + assert self.sisn == p[TCP].ack - 1 + self.disn = p[TCP].seq diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py new file mode 100644 index 000000000000..f622bcf0dcbc --- /dev/null +++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py @@ -0,0 +1,359 @@ +# SPDX-License-Identifier: GPL-2.0 +import logging +from ipaddress import IPv4Address, IPv6Address +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6 +from .tcp_authopt_alg import get_alg, build_context_from_scapy, build_message_from_scapy +from .utils import scapy_tcp_get_authopt_val, tcphdr_authopt +import socket + +logger = logging.getLogger(__name__) + + +class TestIETFVectors: + """Test python implementation of TCP-AO algorithms + + Data is a subset of IETF test vectors: + https://datatracker.ietf.org/doc/html/draft-touch-tcpm-ao-test-vectors-02 + """ + + master_key = b"testvector" + client_keyid = 61 + server_keyid = 84 + client_ipv4 = IPv4Address("10.11.12.13") + client_ipv6 = IPv6Address("FD00::1") + server_ipv4 = IPv4Address("172.27.28.29") + server_ipv6 = IPv6Address("FD00::2") + + client_isn_41x = 0xFBFBAB5A + server_isn_41x = 0x11C14261 + client_isn_42x = 0xCB0EFBEE + server_isn_42x = 0xACD5B5E1 + client_isn_61x = 0x176A833F + server_isn_61x = 0x3F51994B + client_isn_62x = 0x020C1E69 + server_isn_62x = 0xEBA3734D + + def check( + self, + packet_hex: str, + traffic_key_hex: str, + mac_hex: str, + src_isn, + dst_isn, + include_options=True, + alg_name="HMAC-SHA-1-96", + sne=0, + ): + packet_bytes = bytes.fromhex(packet_hex) + + # sanity check for ip version + ipv = packet_bytes[0] >> 4 + if ipv == 4: + p = IP(bytes.fromhex(packet_hex)) + assert p[IP].proto == socket.IPPROTO_TCP + elif ipv == 6: + p = IPv6(bytes.fromhex(packet_hex)) + assert p[IPv6].nh == socket.IPPROTO_TCP + else: + raise ValueError(f"bad ipv={ipv}") + + # sanity check for seq/ack in SYN/ACK packets + if p[TCP].flags.S and p[TCP].flags.A is False: + assert p[TCP].seq == src_isn + assert p[TCP].ack == 0 + if p[TCP].flags.S and p[TCP].flags.A: + assert p[TCP].seq == src_isn + assert p[TCP].ack == dst_isn + 1 + + # check traffic key + alg = get_alg(alg_name) + context_bytes = build_context_from_scapy(p, src_isn, dst_isn) + traffic_key = alg.kdf(self.master_key, context_bytes) + assert traffic_key.hex(" ") == traffic_key_hex + + # check mac + message_bytes = build_message_from_scapy( + p, include_options=include_options, sne=sne + ) + mac = alg.mac(traffic_key, message_bytes) + assert mac.hex(" ") == mac_hex + + # check option bytes in header + opt = scapy_tcp_get_authopt_val(p[TCP]) + assert opt is not None + assert opt.keyid in [self.client_keyid, self.server_keyid] + assert opt.rnextkeyid in [self.client_keyid, self.server_keyid] + assert opt.mac.hex(" ") == mac_hex + + def test_4_1_1(self): + self.check( + """ + 45 e0 00 4c dd 0f 40 00 ff 06 bf 6b 0a 0b 0c 0d + ac 1b 1c 1d e9 d7 00 b3 fb fb ab 5a 00 00 00 00 + e0 02 ff ff ca c4 00 00 02 04 05 b4 01 03 03 08 + 04 02 08 0a 00 15 5a b7 00 00 00 00 1d 10 3d 54 + 2e e4 37 c6 f8 ed e6 d7 c4 d6 02 e7 + """, + "6d 63 ef 1b 02 fe 15 09 d4 b1 40 27 07 fd 7b 04 16 ab b7 4f", + "2e e4 37 c6 f8 ed e6 d7 c4 d6 02 e7", + self.client_isn_41x, + 0, + ) + + def test_4_1_2(self): + self.check( + """ + 45 e0 00 4c 65 06 40 00 ff 06 37 75 ac 1b 1c 1d + 0a 0b 0c 0d 00 b3 e9 d7 11 c1 42 61 fb fb ab 5b + e0 12 ff ff 37 76 00 00 02 04 05 b4 01 03 03 08 + 04 02 08 0a 84 a5 0b eb 00 15 5a b7 1d 10 54 3d + ee ab 0f e2 4c 30 10 81 51 16 b3 be + """, + "d9 e2 17 e4 83 4a 80 ca 2f 3f d8 de 2e 41 b8 e6 79 7f ea 96", + "ee ab 0f e2 4c 30 10 81 51 16 b3 be", + self.server_isn_41x, + self.client_isn_41x, + ) + + def test_4_1_3(self): + self.check( + """ + 45 e0 00 87 36 a1 40 00 ff 06 65 9f 0a 0b 0c 0d + ac 1b 1c 1d e9 d7 00 b3 fb fb ab 5b 11 c1 42 62 + c0 18 01 04 a1 62 00 00 01 01 08 0a 00 15 5a c1 + 84 a5 0b eb 1d 10 3d 54 70 64 cf 99 8c c6 c3 15 + c2 c2 e2 bf ff ff ff ff ff ff ff ff ff ff ff ff + ff ff ff ff 00 43 01 04 da bf 00 b4 0a 0b 0c 0d + 26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02 + 00 02 02 42 00 02 06 41 04 00 00 da bf 02 08 40 + 06 00 64 00 01 01 00 + """, + "d2 e5 9c 65 ff c7 b1 a3 93 47 65 64 63 b7 0e dc 24 a1 3d 71", + "70 64 cf 99 8c c6 c3 15 c2 c2 e2 bf", + self.client_isn_41x, + self.server_isn_41x, + ) + + def test_4_1_4(self): + self.check( + """ + 45 e0 00 87 1f a9 40 00 ff 06 7c 97 ac 1b 1c 1d + 0a 0b 0c 0d 00 b3 e9 d7 11 c1 42 62 fb fb ab 9e + c0 18 01 00 40 0c 00 00 01 01 08 0a 84 a5 0b f5 + 00 15 5a c1 1d 10 54 3d a6 3f 0e cb bb 2e 63 5c + 95 4d ea c7 ff ff ff ff ff ff ff ff ff ff ff ff + ff ff ff ff 00 43 01 04 da c0 00 b4 ac 1b 1c 1d + 26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02 + 00 02 02 42 00 02 06 41 04 00 00 da c0 02 08 40 + 06 00 64 00 01 01 00 + """, + "d9 e2 17 e4 83 4a 80 ca 2f 3f d8 de 2e 41 b8 e6 79 7f ea 96", + "a6 3f 0e cb bb 2e 63 5c 95 4d ea c7", + self.server_isn_41x, + self.client_isn_41x, + ) + + def test_4_2_1(self): + self.check( + """ + 45 e0 00 4c 53 99 40 00 ff 06 48 e2 0a 0b 0c 0d + ac 1b 1c 1d ff 12 00 b3 cb 0e fb ee 00 00 00 00 + e0 02 ff ff 54 1f 00 00 02 04 05 b4 01 03 03 08 + 04 02 08 0a 00 02 4c ce 00 00 00 00 1d 10 3d 54 + 80 af 3c fe b8 53 68 93 7b 8f 9e c2 + """, + "30 ea a1 56 0c f0 be 57 da b5 c0 45 22 9f b1 0a 42 3c d7 ea", + "80 af 3c fe b8 53 68 93 7b 8f 9e c2", + self.client_isn_42x, + 0, + include_options=False, + ) + + def test_4_2_2(self): + self.check( + """ + 45 e0 00 4c 32 84 40 00 ff 06 69 f7 ac 1b 1c 1d + 0a 0b 0c 0d 00 b3 ff 12 ac d5 b5 e1 cb 0e fb ef + e0 12 ff ff 38 8e 00 00 02 04 05 b4 01 03 03 08 + 04 02 08 0a 57 67 72 f3 00 02 4c ce 1d 10 54 3d + 09 30 6f 9a ce a6 3a 8c 68 cb 9a 70 + """, + "b5 b2 89 6b b3 66 4e 81 76 b0 ed c6 e7 99 52 41 01 a8 30 7f", + "09 30 6f 9a ce a6 3a 8c 68 cb 9a 70", + self.server_isn_42x, + self.client_isn_42x, + include_options=False, + ) + + def test_4_2_3(self): + self.check( + """ + 45 e0 00 87 a8 f5 40 00 ff 06 f3 4a 0a 0b 0c 0d + ac 1b 1c 1d ff 12 00 b3 cb 0e fb ef ac d5 b5 e2 + c0 18 01 04 6c 45 00 00 01 01 08 0a 00 02 4c ce + 57 67 72 f3 1d 10 3d 54 71 06 08 cc 69 6c 03 a2 + 71 c9 3a a5 ff ff ff ff ff ff ff ff ff ff ff ff + ff ff ff ff 00 43 01 04 da bf 00 b4 0a 0b 0c 0d + 26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02 + 00 02 02 42 00 02 06 41 04 00 00 da bf 02 08 40 + 06 00 64 00 01 01 00 + """, + "f3 db 17 93 d7 91 0e cd 80 6c 34 f1 55 ea 1f 00 34 59 53 e3", + "71 06 08 cc 69 6c 03 a2 71 c9 3a a5", + self.client_isn_42x, + self.server_isn_42x, + include_options=False, + ) + + def test_4_2_4(self): + self.check( + """ + 45 e0 00 87 54 37 40 00 ff 06 48 09 ac 1b 1c 1d + 0a 0b 0c 0d 00 b3 ff 12 ac d5 b5 e2 cb 0e fc 32 + c0 18 01 00 46 b6 00 00 01 01 08 0a 57 67 72 f3 + 00 02 4c ce 1d 10 54 3d 97 76 6e 48 ac 26 2d e9 + ae 61 b4 f9 ff ff ff ff ff ff ff ff ff ff ff ff + ff ff ff ff 00 43 01 04 da c0 00 b4 ac 1b 1c 1d + 26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02 + 00 02 02 42 00 02 06 41 04 00 00 da c0 02 08 40 + 06 00 64 00 01 01 00 + """, + "b5 b2 89 6b b3 66 4e 81 76 b0 ed c6 e7 99 52 41 01 a8 30 7f", + "97 76 6e 48 ac 26 2d e9 ae 61 b4 f9", + self.server_isn_42x, + self.client_isn_42x, + include_options=False, + ) + + def test_5_1_1(self): + self.check( + """ + 45 e0 00 4c 7b 9f 40 00 ff 06 20 dc 0a 0b 0c 0d + ac 1b 1c 1d c4 fa 00 b3 78 7a 1d df 00 00 00 00 + e0 02 ff ff 5a 0f 00 00 02 04 05 b4 01 03 03 08 + 04 02 08 0a 00 01 7e d0 00 00 00 00 1d 10 3d 54 + e4 77 e9 9c 80 40 76 54 98 e5 50 91 + """, + "f5 b8 b3 d5 f3 4f db b6 eb 8d 4a b9 66 0e 60 e3", + "e4 77 e9 9c 80 40 76 54 98 e5 50 91", + 0x787A1DDF, + 0, + include_options=True, + alg_name="AES-128-CMAC-96", + ) + + def test_6_1_1(self): + self.check( + """ + 6e 08 91 dc 00 38 06 40 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 01 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 02 f7 e4 00 b3 17 6a 83 3f + 00 00 00 00 e0 02 ff ff 47 21 00 00 02 04 05 a0 + 01 03 03 08 04 02 08 0a 00 41 d0 87 00 00 00 00 + 1d 10 3d 54 90 33 ec 3d 73 34 b6 4c 5e dd 03 9f + """, + "62 5e c0 9d 57 58 36 ed c9 b6 42 84 18 bb f0 69 89 a3 61 bb", + "90 33 ec 3d 73 34 b6 4c 5e dd 03 9f", + self.client_isn_61x, + 0, + include_options=True, + ) + + def test_6_1_2(self): + self.check( + """ + 6e 01 00 9e 00 38 06 40 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 01 00 b3 f7 e4 3f 51 99 4b + 17 6a 83 40 e0 12 ff ff bf ec 00 00 02 04 05 a0 + 01 03 03 08 04 02 08 0a bd 33 12 9b 00 41 d0 87 + 1d 10 54 3d f1 cb a3 46 c3 52 61 63 f7 1f 1f 55 + """, + "e4 a3 7a da 2a 0a fc a8 71 14 34 91 3f e1 38 c7 71 eb cb 4a", + "f1 cb a3 46 c3 52 61 63 f7 1f 1f 55", + self.server_isn_61x, + self.client_isn_61x, + include_options=True, + ) + + def test_6_2_2(self): + self.check( + """ + 6e 0a 7e 1f 00 38 06 40 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 01 00 b3 c6 cd eb a3 73 4d + 02 0c 1e 6a e0 12 ff ff 77 4d 00 00 02 04 05 a0 + 01 03 03 08 04 02 08 0a 5e c9 9b 70 00 9d b9 5b + 1d 10 54 3d 3c 54 6b ad 97 43 f1 2d f8 b8 01 0d + """, + "40 51 08 94 7f 99 65 75 e7 bd bc 26 d4 02 16 a2 c7 fa 91 bd", + "3c 54 6b ad 97 43 f1 2d f8 b8 01 0d", + self.server_isn_62x, + self.client_isn_62x, + include_options=False, + ) + + def test_6_2_4(self): + self.check( + """ + 6e 0a 7e 1f 00 73 06 40 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 01 00 b3 c6 cd eb a3 73 4e + 02 0c 1e ad c0 18 01 00 71 6a 00 00 01 01 08 0a + 5e c9 9b 7a 00 9d b9 65 1d 10 54 3d 55 9a 81 94 + 45 b4 fd e9 8d 9e 13 17 ff ff ff ff ff ff ff ff + ff ff ff ff ff ff ff ff 00 43 01 04 fd e8 00 b4 + 01 01 01 7a 26 02 06 01 04 00 01 00 01 02 02 80 + 00 02 02 02 00 02 02 42 00 02 06 41 04 00 00 fd + e8 02 08 40 06 00 64 00 01 01 00 + """, + "40 51 08 94 7f 99 65 75 e7 bd bc 26 d4 02 16 a2 c7 fa 91 bd", + "55 9a 81 94 45 b4 fd e9 8d 9e 13 17", + self.server_isn_62x, + self.client_isn_62x, + include_options=False, + ) + + server_isn_71x = 0xA6744ECB + client_isn_71x = 0x193CCCEC + + def test_7_1_2(self): + self.check( + """ + 6e 06 15 20 00 38 06 40 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 01 00 b3 f8 5a a6 74 4e cb + 19 3c cc ed e0 12 ff ff ea bb 00 00 02 04 05 a0 + 01 03 03 08 04 02 08 0a 71 da ab c8 13 e4 ab 99 + 1d 10 54 3d dc 28 43 a8 4e 78 a6 bc fd c5 ed 80 + """, + "cf 1b 1e 22 5e 06 a6 36 16 76 4a 06 7b 46 f4 b1", + "dc 28 43 a8 4e 78 a6 bc fd c5 ed 80", + self.server_isn_71x, + self.client_isn_71x, + alg_name="AES-128-CMAC-96", + include_options=True, + ) + + def test_7_1_4(self): + self.check( + """ + 6e 06 15 20 00 73 06 40 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00 + 00 00 00 00 00 00 00 01 00 b3 f8 5a a6 74 4e cc + 19 3c cd 30 c0 18 01 00 52 f4 00 00 01 01 08 0a + 71 da ab d3 13 e4 ab a3 1d 10 54 3d c1 06 9b 7d + fd 3d 69 3a 6d f3 f2 89 ff ff ff ff ff ff ff ff + ff ff ff ff ff ff ff ff 00 43 01 04 fd e8 00 b4 + 01 01 01 7a 26 02 06 01 04 00 01 00 01 02 02 80 + 00 02 02 02 00 02 02 42 00 02 06 41 04 00 00 fd + e8 02 08 40 06 00 64 00 01 01 00 + """, + "cf 1b 1e 22 5e 06 a6 36 16 76 4a 06 7b 46 f4 b1", + "c1 06 9b 7d fd 3d 69 3a 6d f3 f2 89", + self.server_isn_71x, + self.client_isn_71x, + alg_name="AES-128-CMAC-96", + include_options=True, + ) diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_verify_capture.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_verify_capture.py new file mode 100644 index 000000000000..1bc0f05197b8 --- /dev/null +++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_verify_capture.py @@ -0,0 +1,123 @@ +# SPDX-License-Identifier: GPL-2.0 +"""Capture packets with TCP-AO and verify signatures""" + +import logging +import os +import socket + +import pytest + +from . import linux_tcp_authopt, tcp_authopt_alg +from .full_tcp_sniff_session import FullTCPSniffSession +from .linux_tcp_authopt import ( + set_tcp_authopt_key, + tcp_authopt_key, +) +from .server import SimpleServerThread +from .utils import ( + DEFAULT_TCP_SERVER_PORT, + AsyncSnifferContext, + check_socket_echo, + create_listen_socket, + nstat_json, +) +from .validator import TcpAuthValidator, TcpAuthValidatorKey +from .conftest import skipif_missing_tcp_authopt + +logger = logging.getLogger(__name__) +pytestmark = skipif_missing_tcp_authopt + + +def can_capture(): + # This is too restrictive: + return os.geteuid() == 0 + + +skipif_cant_capture = pytest.mark.skipif( + not can_capture(), reason="run as root to capture packets" +) + + +def get_alg_id(alg_name) -> int: + if alg_name == "HMAC-SHA-1-96": + return linux_tcp_authopt.TCP_AUTHOPT_ALG_HMAC_SHA_1_96 + elif alg_name == "AES-128-CMAC-96": + return linux_tcp_authopt.TCP_AUTHOPT_ALG_AES_128_CMAC_96 + else: + raise ValueError() + + +@skipif_cant_capture +@pytest.mark.parametrize( + "address_family,alg_name,include_options", + [ + (socket.AF_INET, "HMAC-SHA-1-96", True), + (socket.AF_INET, "AES-128-CMAC-96", True), + (socket.AF_INET, "AES-128-CMAC-96", False), + (socket.AF_INET6, "HMAC-SHA-1-96", True), + (socket.AF_INET6, "HMAC-SHA-1-96", False), + (socket.AF_INET6, "AES-128-CMAC-96", True), + ], +) +def test_verify_capture(exit_stack, address_family, alg_name, include_options): + master_key = b"testvector" + alg_id = get_alg_id(alg_name) + + session = FullTCPSniffSession(server_port=DEFAULT_TCP_SERVER_PORT) + sniffer = exit_stack.enter_context( + AsyncSnifferContext( + filter=f"tcp port {DEFAULT_TCP_SERVER_PORT}", + iface="lo", + session=session, + ) + ) + + listen_socket = create_listen_socket(family=address_family) + listen_socket = exit_stack.enter_context(listen_socket) + exit_stack.enter_context(SimpleServerThread(listen_socket, mode="echo")) + + client_socket = socket.socket(address_family, socket.SOCK_STREAM) + client_socket = exit_stack.push(client_socket) + + set_tcp_authopt_key( + listen_socket, + tcp_authopt_key(alg=alg_id, key=master_key, include_options=include_options), + ) + set_tcp_authopt_key( + client_socket, + tcp_authopt_key(alg=alg_id, key=master_key, include_options=include_options), + ) + + # even if one signature is incorrect keep processing the capture + old_nstat = nstat_json() + valkey = TcpAuthValidatorKey( + key=master_key, alg_name=alg_name, include_options=include_options + ) + validator = TcpAuthValidator(keys=[valkey]) + + try: + client_socket.settimeout(1.0) + client_socket.connect(("localhost", DEFAULT_TCP_SERVER_PORT)) + for _ in range(5): + check_socket_echo(client_socket) + except socket.timeout: + logger.warning("socket timeout", exc_info=True) + pass + client_socket.close() + session.wait_close() + sniffer.stop() + + logger.info("capture: %r", sniffer.results) + for p in sniffer.results: + validator.handle_packet(p) + + assert not validator.any_fail + assert not validator.any_unsigned + # Fails because of duplicate packets: + # assert not validator.any_incomplete + new_nstat = nstat_json() + assert ( + 0 + == new_nstat["kernel"]["TcpExtTCPAuthOptFailure"] + - old_nstat["kernel"]["TcpExtTCPAuthOptFailure"] + ) diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py new file mode 100644 index 000000000000..5f0e83421af5 --- /dev/null +++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py @@ -0,0 +1,158 @@ +# SPDX-License-Identifier: GPL-2.0 +from tcp_authopt_test.utils import scapy_tcp_get_authopt_val +import typing +import logging + +from dataclasses import dataclass +from scapy.packet import Packet +from scapy.layers.inet import TCP +from . import tcp_authopt_alg +from .tcp_authopt_alg import IPvXAddress, TCPAuthContext +from .tcp_authopt_alg import get_scapy_ipvx_src +from .tcp_authopt_alg import get_scapy_ipvx_dst + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class TCPSocketPair: + """TCP connection identifier""" + + saddr: IPvXAddress = None + daddr: IPvXAddress = None + sport: int = 0 + dport: int = 0 + + def rev(self) -> "TCPSocketPair": + return TCPSocketPair(self.daddr, self.saddr, self.dport, self.sport) + + +@dataclass +class TcpAuthValidatorKey: + key: bytes + alg_name: str + include_options: bool = True + keyid: typing.Optional[int] = None + sport: typing.Optional[int] = None + dport: typing.Optional[int] = None + + def match_packet(self, p: Packet): + if not TCP in p: + return False + authopt = scapy_tcp_get_authopt_val(p[TCP]) + if authopt is None: + return False + if self.keyid is not None and authopt.keyid != self.keyid: + return False + if self.sport is not None and p[TCP].sport != self.sport: + return False + if self.dport is not None and p[TCP].dport != self.dport: + return False + return True + + def get_alg_imp(self): + return tcp_authopt_alg.get_alg(self.alg_name) + + +def is_init_syn(p: Packet) -> bool: + return p[TCP].flags.S and not p[TCP].flags.A + + +class TcpAuthValidator: + """Validate TCP auth sessions inside a capture""" + + keys: typing.List[TcpAuthValidatorKey] + conn_dict: typing.Dict[TCPSocketPair, TCPAuthContext] + any_incomplete: bool = False + any_unsigned: bool = False + any_fail: bool = False + + def __init__(self, keys=None): + self.keys = keys or [] + self.conn_dict = {} + + def get_key_for_packet(self, p): + for k in self.keys: + if k.match_packet(p): + return k + return None + + def handle_packet(self, p: Packet): + if TCP not in p: + logger.debug("skip non-TCP packet") + return + key = self.get_key_for_packet(p) + if not key: + self.any_unsigned = True + logger.debug("skip packet not matching any known keys: %r", p) + return + authopt = scapy_tcp_get_authopt_val(p[TCP]) + if not authopt: + self.any_unsigned = True + logger.debug("skip packet without tcp authopt: %r", p) + return + captured_mac = authopt.mac + + saddr = get_scapy_ipvx_src(p) + daddr = get_scapy_ipvx_dst(p) + + conn_key = TCPSocketPair(saddr, daddr, p[TCP].sport, p[TCP].dport) + if p[TCP].flags.S: + conn = self.conn_dict.get(conn_key, None) + if conn is not None: + logger.warning("overwrite %r", conn) + self.any_incomplete = True + conn = TCPAuthContext() + conn.saddr = saddr + conn.daddr = daddr + conn.sport = p[TCP].sport + conn.dport = p[TCP].dport + self.conn_dict[conn_key] = conn + + if p[TCP].flags.A == False: + # SYN + conn.sisn = p[TCP].seq + conn.disn = 0 + logger.info("Initialized for SYN: %r", conn) + else: + # SYN/ACK + conn.sisn = p[TCP].seq + conn.disn = p[TCP].ack - 1 + logger.info("Initialized for SYNACK: %r", conn) + + # Update opposite connection with dst_isn + rconn_key = conn_key.rev() + rconn = self.conn_dict.get(rconn_key, None) + if rconn is None: + logger.warning("missing SYN for SYNACK: %s", rconn_key) + self.any_incomplete = True + else: + assert rconn.sisn == conn.disn + assert rconn.disn == 0 or rconn.disn == conn.sisn + rconn.disn = conn.sisn + rconn.update_from_synack_packet(p) + logger.info("Updated peer for SYNACK: %r", rconn) + else: + conn = self.conn_dict.get(conn_key, None) + if conn is None: + logger.warning("missing TCP syn for %r", conn_key) + self.any_incomplete = True + return + # logger.debug("conn %r found for packet %r", conn, p) + + context_bytes = conn.pack(syn=is_init_syn(p)) + alg = key.get_alg_imp() + traffic_key = alg.kdf(key.key, context_bytes) + message_bytes = tcp_authopt_alg.build_message_from_scapy( + p, include_options=key.include_options + ) + computed_mac = alg.mac(traffic_key, message_bytes) + if computed_mac == captured_mac: + logger.debug("ok - mac %s", computed_mac.hex()) + else: + self.any_fail = True + logger.error( + "not ok - captured %s computed %s", + captured_mac.hex(), + computed_mac.hex(), + ) -- 2.25.1