[PATCH 13/19] selftests: tcp_authopt: Add scapy-based packet signing code

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Tools like tcpdump and wireshark can parse the TCP Authentication Option
but there is not yet support to verify correct signatures.

This patch implements TCP-AO signature verification using scapy and the
python cryptography package.

The python code is verified itself with a subset of IETF test vectors
from this page:
https://datatracker.ietf.org/doc/html/draft-touch-tcpm-ao-test-vectors-02

The code in this commit is not specific to linux

Signed-off-by: Leonard Crestez <cdleonard@xxxxxxxxx>
---
 .../tcp_authopt_test/scapy_tcp_authopt.py     | 211 ++++++++++
 .../tcp_authopt_test/scapy_utils.py           | 176 +++++++++
 .../tcp_authopt_test/test_vectors.py          | 359 ++++++++++++++++++
 .../tcp_authopt/tcp_authopt_test/validator.py | 127 +++++++
 4 files changed, 873 insertions(+)
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_tcp_authopt.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_utils.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py

diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_tcp_authopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_tcp_authopt.py
new file mode 100644
index 000000000000..c32f9d931d2b
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_tcp_authopt.py
@@ -0,0 +1,211 @@
+# SPDX-License-Identifier: GPL-2.0
+"""Packet-processing utilities implementing RFC5925 and RFC2926"""
+
+import logging
+from scapy.layers.inet import TCP
+from scapy.packet import Packet
+from .scapy_utils import TCPOPT_AUTHOPT, IPvXAddress, get_packet_ipvx_src, get_packet_ipvx_dst, get_tcp_pseudoheader, get_tcp_doff
+import struct
+import hmac
+
+logger = logging.getLogger(__name__)
+
+
+def _cmac_aes_digest(key: bytes, msg: bytes) -> bytes:
+    from cryptography.hazmat.primitives import cmac
+    from cryptography.hazmat.primitives.ciphers import algorithms
+    from cryptography.hazmat.backends import default_backend
+
+    backend = default_backend()
+    c = cmac.CMAC(algorithms.AES(key), backend=backend)
+    c.update(bytes(msg))
+    return c.finalize()
+
+
+class TcpAuthOptAlg:
+    @classmethod
+    def kdf(cls, master_key: bytes, context: bytes) -> bytes:
+        raise NotImplementedError()
+
+    @classmethod
+    def mac(cls, traffic_key: bytes, message: bytes) -> bytes:
+        raise NotImplementedError()
+
+    maclen = -1
+
+
+class TcpAuthOptAlg_HMAC_SHA1(TcpAuthOptAlg):
+    @classmethod
+    def kdf(cls, master_key: bytes, context: bytes) -> bytes:
+        input = b"\x01" + b"TCP-AO" + context + b"\x00\xa0"
+        return hmac.digest(master_key, input, "SHA1")
+
+    @classmethod
+    def mac(cls, traffic_key: bytes, message: bytes) -> bytes:
+        return hmac.digest(traffic_key, message, "SHA1")[:12]
+
+    maclen = 12
+
+
+class TcpAuthOptAlg_CMAC_AES(TcpAuthOptAlg):
+    @classmethod
+    def kdf(self, master_key: bytes, context: bytes) -> bytes:
+        if len(master_key) == 16:
+            key = master_key
+        else:
+            key = _cmac_aes_digest(b"\x00" * 16, master_key)
+        return _cmac_aes_digest(key, b"\x01" + b"TCP-AO" + context + b"\x00\x80")
+
+    @classmethod
+    def mac(self, traffic_key: bytes, message: bytes) -> bytes:
+        return _cmac_aes_digest(traffic_key, message)[:12]
+
+    maclen = 12
+
+
+def get_alg(name: str) -> TcpAuthOptAlg:
+    if name.upper() == "HMAC-SHA-1-96":
+        return TcpAuthOptAlg_HMAC_SHA1()
+    elif name.upper() == "AES-128-CMAC-96":
+        return TcpAuthOptAlg_CMAC_AES()
+    else:
+        raise ValueError(f"Bad TCP AuthOpt algorithms {name}")
+
+
+def build_context(
+    saddr: IPvXAddress, daddr: IPvXAddress, sport, dport, src_isn, dst_isn
+) -> bytes:
+    """Build context bytes as specified by RFC5925 section 5.2"""
+    return (
+        saddr.packed
+        + daddr.packed
+        + struct.pack(
+            "!HHII",
+            sport,
+            dport,
+            src_isn,
+            dst_isn,
+        )
+    )
+
+
+def build_context_from_packet(p: Packet, src_isn: int, dst_isn: int) -> bytes:
+    """Build context based on a scapy Packet and src/dst initial-sequence numbers"""
+    return build_context(
+        get_packet_ipvx_src(p),
+        get_packet_ipvx_dst(p),
+        p[TCP].sport,
+        p[TCP].dport,
+        src_isn,
+        dst_isn,
+    )
+
+
+def build_message_from_packet(p: Packet, include_options=True, sne=0) -> bytearray:
+    """Build message bytes as described by RFC5925 section 5.1"""
+    result = bytearray()
+    result += struct.pack("!I", sne)
+    th = p[TCP]
+
+    # ip pseudo-header:
+    result += get_tcp_pseudoheader(th)
+
+    # tcp header with checksum set to zero
+    th_bytes = bytes(p[TCP])
+    result += th_bytes[:16]
+    result += b"\x00\x00"
+    result += th_bytes[18:20]
+
+    # Even if include_options=False the TCP-AO option itself is still included
+    # with the MAC set to all-zeros. This means we need to parse TCP options.
+    pos = 20
+    tcphdr_optend = get_tcp_doff(th) * 4
+    # logger.info("th_bytes: %s", th_bytes.hex(' '))
+    assert len(th_bytes) >= tcphdr_optend
+    while pos < tcphdr_optend:
+        optnum = th_bytes[pos]
+        pos += 1
+        if optnum == 0 or optnum == 1:
+            if include_options:
+                result += bytes([optnum])
+            continue
+
+        optlen = th_bytes[pos]
+        pos += 1
+        if pos + optlen - 2 > tcphdr_optend:
+            logger.info(
+                "bad tcp option %d optlen %d beyond end-of-header", optnum, optlen
+            )
+            break
+        if optlen < 2:
+            logger.info("bad tcp option %d optlen %d less than two", optnum, optlen)
+            break
+        if optnum == TCPOPT_AUTHOPT:
+            if optlen < 4:
+                logger.info("bad tcp option %d optlen %d", optnum, optlen)
+                break
+            result += bytes([optnum, optlen])
+            result += th_bytes[pos : pos + 2]
+            result += (optlen - 4) * b"\x00"
+        elif include_options:
+            result += bytes([optnum, optlen])
+            result += th_bytes[pos : pos + optlen - 2]
+        pos += optlen - 2
+    result += bytes(p[TCP].payload)
+    return result
+
+
+def check_tcp_authopt_signature(
+    p: Packet, alg: TcpAuthOptAlg, master_key, sisn, disn, include_options=True, sne=0
+):
+    from .scapy_utils import scapy_tcp_get_authopt_val
+
+    ao = scapy_tcp_get_authopt_val(p[TCP])
+    if ao is None:
+        return None
+
+    context_bytes = build_context_from_packet(p, sisn, disn)
+    traffic_key = alg.kdf(master_key, context_bytes)
+    message_bytes = build_message_from_packet(
+        p, include_options=include_options, sne=sne
+    )
+    mac = alg.mac(traffic_key, message_bytes)
+    return mac == ao.mac
+
+
+def add_tcp_authopt_signature(
+    p: Packet,
+    alg: TcpAuthOptAlg,
+    master_key,
+    sisn,
+    disn,
+    keyid=0,
+    rnextkeyid=0,
+    include_options=True,
+    sne=0,
+):
+    """Sign a packet"""
+    th = p[TCP]
+    keyids = struct.pack("BB", keyid, rnextkeyid)
+    th.options = th.options + [(TCPOPT_AUTHOPT, keyids + alg.maclen * b"\x00")]
+
+    context_bytes = build_context_from_packet(p, sisn, disn)
+    traffic_key = alg.kdf(master_key, context_bytes)
+    message_bytes = build_message_from_packet(
+        p, include_options=include_options, sne=sne
+    )
+    mac = alg.mac(traffic_key, message_bytes)
+    th.options[-1] = (TCPOPT_AUTHOPT, keyids + mac)
+
+
+def break_tcp_authopt_signature(packet: Packet):
+    """Invalidate TCP-AO signature inside a packet
+
+    The packet must already be signed and it gets modified in-place.
+    """
+    opt = packet[TCP].options[-1]
+    if opt[0] != TCPOPT_AUTHOPT:
+        raise ValueError("TCP option list must end with TCP_AUTHOPT")
+    opt_mac = bytearray(opt[1])
+    opt_mac[-1] ^= 0xFF
+    packet[TCP].options[-1] = (opt[0], bytes(opt_mac))
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_utils.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_utils.py
new file mode 100644
index 000000000000..5000b8fe9ada
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/scapy_utils.py
@@ -0,0 +1,176 @@
+import typing
+import struct
+import socket
+import threading
+from dataclasses import dataclass
+from ipaddress import IPv4Address, IPv6Address
+
+from scapy.packet import Packet
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6
+from scapy.config import conf as scapy_conf
+from scapy.sendrecv import AsyncSniffer
+
+from .utils import netns_context
+
+# TCPOPT numbers are apparently not available in scapy
+TCPOPT_MD5SIG = 19
+TCPOPT_AUTHOPT = 29
+
+# Easy generic handling of IPv4/IPv6Address
+IPvXAddress = typing.Union[IPv4Address, IPv6Address]
+
+
+def get_packet_ipvx_src(p: Packet) -> IPvXAddress:
+    if IP in p:
+        return IPv4Address(p[IP].src)
+    elif IPv6 in p:
+        return IPv6Address(p[IPv6].src)
+    else:
+        raise Exception("Neither IP nor IPv6 found on packet")
+
+
+def get_packet_ipvx_dst(p: Packet) -> IPvXAddress:
+    if IP in p:
+        return IPv4Address(p[IP].dst)
+    elif IPv6 in p:
+        return IPv6Address(p[IPv6].dst)
+    else:
+        raise Exception("Neither IP nor IPv6 found on packet")
+
+
+def get_tcp_doff(th: TCP):
+    """Get the TCP data offset, even if packet is not yet built"""
+    doff = th.dataofs
+    if doff is None:
+        opt_len = len(th.get_field("options").i2m(th, th.options))
+        doff = 5 + ((opt_len + 3) // 4)
+    return doff
+
+
+def get_tcp_v4_pseudoheader(tcp_packet: TCP) -> bytes:
+    iph = tcp_packet.underlayer
+    return struct.pack(
+        "!4s4sHH",
+        IPv4Address(iph.src).packed,
+        IPv4Address(iph.dst).packed,
+        socket.IPPROTO_TCP,
+        get_tcp_doff(tcp_packet) * 4 + len(tcp_packet.payload),
+    )
+
+
+def get_tcp_v6_pseudoheader(tcp_packet: TCP) -> bytes:
+    ipv6 = tcp_packet.underlayer
+    return struct.pack(
+        "!16s16sII",
+        IPv6Address(ipv6.src).packed,
+        IPv6Address(ipv6.dst).packed,
+        get_tcp_doff(tcp_packet) * 4 + len(tcp_packet.payload),
+        socket.IPPROTO_TCP,
+    )
+
+
+def get_tcp_pseudoheader(tcp_packet: TCP):
+    if isinstance(tcp_packet.underlayer, IP):
+        return get_tcp_v4_pseudoheader(tcp_packet)
+    if isinstance(tcp_packet.underlayer, IPv6):
+        return get_tcp_v6_pseudoheader(tcp_packet)
+    raise ValueError("TCP underlayer is neither IP nor IPv6")
+
+
+def tcp_seq_wrap(seq):
+    return seq & 0xFFFFFFFF
+
+
+@dataclass
+class tcphdr_authopt:
+    """Representation of a TCP auth option as it appears in a TCP packet"""
+
+    keyid: int
+    rnextkeyid: int
+    mac: bytes
+
+    @classmethod
+    def unpack(cls, buf) -> "tcphdr_authopt":
+        return cls(buf[0], buf[1], buf[2:])
+
+    def __repr__(self):
+        return f"tcphdr_authopt({self.keyid}, {self.rnextkeyid}, bytes.fromhex({self.mac.hex(' ')!r})"
+
+
+def scapy_tcp_get_authopt_val(tcp) -> typing.Optional[tcphdr_authopt]:
+    for optnum, optval in tcp.options:
+        if optnum == TCPOPT_AUTHOPT:
+            return tcphdr_authopt.unpack(optval)
+    return None
+
+
+def scapy_tcp_get_md5_sig(tcp) -> typing.Optional[bytes]:
+    """Return the MD5 signature (as bytes) or None"""
+    for optnum, optval in tcp.options:
+        if optnum == TCPOPT_MD5SIG:
+            return optval
+    return None
+
+
+def calc_tcp_md5_hash(p, key: bytes) -> bytes:
+    """Calculate TCP-MD5 hash from packet and return a 16-byte string"""
+    import hashlib
+
+    h = hashlib.md5()
+    tp = p[TCP]
+    th_bytes = bytes(p[TCP])
+    h.update(get_tcp_pseudoheader(tp))
+    h.update(th_bytes[:16])
+    h.update(b"\x00\x00")
+    h.update(th_bytes[18:20])
+    h.update(bytes(tp.payload))
+    h.update(key)
+
+    return h.digest()
+
+
+def create_l2socket(ns: str = "", **kw):
+    """Create a scapy L2socket inside a namespace"""
+
+    with netns_context(ns):
+        return scapy_conf.L2socket(**kw)
+
+
+def create_capture_socket(ns: str = "", **kw):
+    """Create a scapy L2listen socket inside a namespace"""
+    from scapy.config import conf as scapy_conf
+
+    with netns_context(ns):
+        return scapy_conf.L2listen(**kw)
+
+
+def scapy_sniffer_start_block(sniffer: AsyncSniffer, timeout=1):
+    """Like AsyncSniffer.start except block until sniffing starts
+
+    This ensures no lost packets and no delays
+    """
+    if sniffer.kwargs.get("started_callback"):
+        raise ValueError("sniffer must not already have a started_callback")
+
+    e = threading.Event()
+    sniffer.kwargs["started_callback"] = e.set
+    sniffer.start()
+    e.wait(timeout=timeout)
+    if not e.is_set():
+        raise TimeoutError("Timed out waiting for sniffer to start")
+
+
+def scapy_sniffer_stop(sniffer: AsyncSniffer):
+    """Like AsyncSniffer.stop except no error is raising if not running"""
+    if sniffer is not None and sniffer.running:
+        sniffer.stop()
+
+
+class AsyncSnifferContext(AsyncSniffer):
+    def __enter__(self):
+        scapy_sniffer_start_block(self)
+        return self
+
+    def __exit__(self, *a):
+        scapy_sniffer_stop(self)
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py
new file mode 100644
index 000000000000..e918439ef9f4
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vectors.py
@@ -0,0 +1,359 @@
+# SPDX-License-Identifier: GPL-2.0
+import logging
+from ipaddress import IPv4Address, IPv6Address
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6
+from .scapy_tcp_authopt import get_alg, build_context_from_packet, build_message_from_packet
+from .scapy_utils import scapy_tcp_get_authopt_val
+import socket
+
+logger = logging.getLogger(__name__)
+
+
+class TestIETFVectors:
+    """Test python implementation of TCP-AO algorithms
+
+    Data is a subset of IETF test vectors:
+    https://datatracker.ietf.org/doc/html/draft-touch-tcpm-ao-test-vectors-02
+    """
+
+    master_key = b"testvector"
+    client_keyid = 61
+    server_keyid = 84
+    client_ipv4 = IPv4Address("10.11.12.13")
+    client_ipv6 = IPv6Address("FD00::1")
+    server_ipv4 = IPv4Address("172.27.28.29")
+    server_ipv6 = IPv6Address("FD00::2")
+
+    client_isn_41x = 0xFBFBAB5A
+    server_isn_41x = 0x11C14261
+    client_isn_42x = 0xCB0EFBEE
+    server_isn_42x = 0xACD5B5E1
+    client_isn_61x = 0x176A833F
+    server_isn_61x = 0x3F51994B
+    client_isn_62x = 0x020C1E69
+    server_isn_62x = 0xEBA3734D
+
+    def check(
+        self,
+        packet_hex: str,
+        traffic_key_hex: str,
+        mac_hex: str,
+        src_isn,
+        dst_isn,
+        include_options=True,
+        alg_name="HMAC-SHA-1-96",
+        sne=0,
+    ):
+        packet_bytes = bytes.fromhex(packet_hex)
+
+        # sanity check for ip version
+        ipv = packet_bytes[0] >> 4
+        if ipv == 4:
+            p = IP(bytes.fromhex(packet_hex))
+            assert p[IP].proto == socket.IPPROTO_TCP
+        elif ipv == 6:
+            p = IPv6(bytes.fromhex(packet_hex))
+            assert p[IPv6].nh == socket.IPPROTO_TCP
+        else:
+            raise ValueError(f"bad ipv={ipv}")
+
+        # sanity check for seq/ack in SYN/ACK packets
+        if p[TCP].flags.S and p[TCP].flags.A is False:
+            assert p[TCP].seq == src_isn
+            assert p[TCP].ack == 0
+        if p[TCP].flags.S and p[TCP].flags.A:
+            assert p[TCP].seq == src_isn
+            assert p[TCP].ack == dst_isn + 1
+
+        # check traffic key
+        alg = get_alg(alg_name)
+        context_bytes = build_context_from_packet(p, src_isn, dst_isn)
+        traffic_key = alg.kdf(self.master_key, context_bytes)
+        assert traffic_key.hex(" ") == traffic_key_hex
+
+        # check mac
+        message_bytes = build_message_from_packet(
+            p, include_options=include_options, sne=sne
+        )
+        mac = alg.mac(traffic_key, message_bytes)
+        assert mac.hex(" ") == mac_hex
+
+        # check option bytes in header
+        opt = scapy_tcp_get_authopt_val(p[TCP])
+        assert opt is not None
+        assert opt.keyid in [self.client_keyid, self.server_keyid]
+        assert opt.rnextkeyid in [self.client_keyid, self.server_keyid]
+        assert opt.mac.hex(" ") == mac_hex
+
+    def test_4_1_1(self):
+        self.check(
+            """
+            45 e0 00 4c dd 0f 40 00 ff 06 bf 6b 0a 0b 0c 0d
+            ac 1b 1c 1d e9 d7 00 b3 fb fb ab 5a 00 00 00 00
+            e0 02 ff ff ca c4 00 00 02 04 05 b4 01 03 03 08
+            04 02 08 0a 00 15 5a b7 00 00 00 00 1d 10 3d 54
+            2e e4 37 c6 f8 ed e6 d7 c4 d6 02 e7
+            """,
+            "6d 63 ef 1b 02 fe 15 09 d4 b1 40 27 07 fd 7b 04 16 ab b7 4f",
+            "2e e4 37 c6 f8 ed e6 d7 c4 d6 02 e7",
+            self.client_isn_41x,
+            0,
+        )
+
+    def test_4_1_2(self):
+        self.check(
+            """
+            45 e0 00 4c 65 06 40 00 ff 06 37 75 ac 1b 1c 1d
+            0a 0b 0c 0d 00 b3 e9 d7 11 c1 42 61 fb fb ab 5b
+            e0 12 ff ff 37 76 00 00 02 04 05 b4 01 03 03 08
+            04 02 08 0a 84 a5 0b eb 00 15 5a b7 1d 10 54 3d
+            ee ab 0f e2 4c 30 10 81 51 16 b3 be
+            """,
+            "d9 e2 17 e4 83 4a 80 ca 2f 3f d8 de 2e 41 b8 e6 79 7f ea 96",
+            "ee ab 0f e2 4c 30 10 81 51 16 b3 be",
+            self.server_isn_41x,
+            self.client_isn_41x,
+        )
+
+    def test_4_1_3(self):
+        self.check(
+            """
+            45 e0 00 87 36 a1 40 00 ff 06 65 9f 0a 0b 0c 0d
+            ac 1b 1c 1d e9 d7 00 b3 fb fb ab 5b 11 c1 42 62
+            c0 18 01 04 a1 62 00 00 01 01 08 0a 00 15 5a c1
+            84 a5 0b eb 1d 10 3d 54 70 64 cf 99 8c c6 c3 15
+            c2 c2 e2 bf ff ff ff ff ff ff ff ff ff ff ff ff
+            ff ff ff ff 00 43 01 04 da bf 00 b4 0a 0b 0c 0d
+            26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02
+            00 02 02 42 00 02 06 41 04 00 00 da bf 02 08 40
+            06 00 64 00 01 01 00
+            """,
+            "d2 e5 9c 65 ff c7 b1 a3 93 47 65 64 63 b7 0e dc 24 a1 3d 71",
+            "70 64 cf 99 8c c6 c3 15 c2 c2 e2 bf",
+            self.client_isn_41x,
+            self.server_isn_41x,
+        )
+
+    def test_4_1_4(self):
+        self.check(
+            """
+            45 e0 00 87 1f a9 40 00 ff 06 7c 97 ac 1b 1c 1d
+            0a 0b 0c 0d 00 b3 e9 d7 11 c1 42 62 fb fb ab 9e
+            c0 18 01 00 40 0c 00 00 01 01 08 0a 84 a5 0b f5
+            00 15 5a c1 1d 10 54 3d a6 3f 0e cb bb 2e 63 5c
+            95 4d ea c7 ff ff ff ff ff ff ff ff ff ff ff ff
+            ff ff ff ff 00 43 01 04 da c0 00 b4 ac 1b 1c 1d
+            26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02
+            00 02 02 42 00 02 06 41 04 00 00 da c0 02 08 40
+            06 00 64 00 01 01 00
+            """,
+            "d9 e2 17 e4 83 4a 80 ca 2f 3f d8 de 2e 41 b8 e6 79 7f ea 96",
+            "a6 3f 0e cb bb 2e 63 5c 95 4d ea c7",
+            self.server_isn_41x,
+            self.client_isn_41x,
+        )
+
+    def test_4_2_1(self):
+        self.check(
+            """
+            45 e0 00 4c 53 99 40 00 ff 06 48 e2 0a 0b 0c 0d
+            ac 1b 1c 1d ff 12 00 b3 cb 0e fb ee 00 00 00 00
+            e0 02 ff ff 54 1f 00 00 02 04 05 b4 01 03 03 08
+            04 02 08 0a 00 02 4c ce 00 00 00 00 1d 10 3d 54
+            80 af 3c fe b8 53 68 93 7b 8f 9e c2
+            """,
+            "30 ea a1 56 0c f0 be 57 da b5 c0 45 22 9f b1 0a 42 3c d7 ea",
+            "80 af 3c fe b8 53 68 93 7b 8f 9e c2",
+            self.client_isn_42x,
+            0,
+            include_options=False,
+        )
+
+    def test_4_2_2(self):
+        self.check(
+            """
+            45 e0 00 4c 32 84 40 00 ff 06 69 f7 ac 1b 1c 1d
+            0a 0b 0c 0d 00 b3 ff 12 ac d5 b5 e1 cb 0e fb ef
+            e0 12 ff ff 38 8e 00 00 02 04 05 b4 01 03 03 08
+            04 02 08 0a 57 67 72 f3 00 02 4c ce 1d 10 54 3d
+            09 30 6f 9a ce a6 3a 8c 68 cb 9a 70
+            """,
+            "b5 b2 89 6b b3 66 4e 81 76 b0 ed c6 e7 99 52 41 01 a8 30 7f",
+            "09 30 6f 9a ce a6 3a 8c 68 cb 9a 70",
+            self.server_isn_42x,
+            self.client_isn_42x,
+            include_options=False,
+        )
+
+    def test_4_2_3(self):
+        self.check(
+            """
+            45 e0 00 87 a8 f5 40 00 ff 06 f3 4a 0a 0b 0c 0d
+            ac 1b 1c 1d ff 12 00 b3 cb 0e fb ef ac d5 b5 e2
+            c0 18 01 04 6c 45 00 00 01 01 08 0a 00 02 4c ce
+            57 67 72 f3 1d 10 3d 54 71 06 08 cc 69 6c 03 a2
+            71 c9 3a a5 ff ff ff ff ff ff ff ff ff ff ff ff
+            ff ff ff ff 00 43 01 04 da bf 00 b4 0a 0b 0c 0d
+            26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02
+            00 02 02 42 00 02 06 41 04 00 00 da bf 02 08 40
+            06 00 64 00 01 01 00
+            """,
+            "f3 db 17 93 d7 91 0e cd 80 6c 34 f1 55 ea 1f 00 34 59 53 e3",
+            "71 06 08 cc 69 6c 03 a2 71 c9 3a a5",
+            self.client_isn_42x,
+            self.server_isn_42x,
+            include_options=False,
+        )
+
+    def test_4_2_4(self):
+        self.check(
+            """
+            45 e0 00 87 54 37 40 00 ff 06 48 09 ac 1b 1c 1d
+            0a 0b 0c 0d 00 b3 ff 12 ac d5 b5 e2 cb 0e fc 32
+            c0 18 01 00 46 b6 00 00 01 01 08 0a 57 67 72 f3
+            00 02 4c ce 1d 10 54 3d 97 76 6e 48 ac 26 2d e9
+            ae 61 b4 f9 ff ff ff ff ff ff ff ff ff ff ff ff
+            ff ff ff ff 00 43 01 04 da c0 00 b4 ac 1b 1c 1d
+            26 02 06 01 04 00 01 00 01 02 02 80 00 02 02 02
+            00 02 02 42 00 02 06 41 04 00 00 da c0 02 08 40
+            06 00 64 00 01 01 00
+            """,
+            "b5 b2 89 6b b3 66 4e 81 76 b0 ed c6 e7 99 52 41 01 a8 30 7f",
+            "97 76 6e 48 ac 26 2d e9 ae 61 b4 f9",
+            self.server_isn_42x,
+            self.client_isn_42x,
+            include_options=False,
+        )
+
+    def test_5_1_1(self):
+        self.check(
+            """
+            45 e0 00 4c 7b 9f 40 00 ff 06 20 dc 0a 0b 0c 0d
+            ac 1b 1c 1d c4 fa 00 b3 78 7a 1d df 00 00 00 00
+            e0 02 ff ff 5a 0f 00 00 02 04 05 b4 01 03 03 08
+            04 02 08 0a 00 01 7e d0 00 00 00 00 1d 10 3d 54
+            e4 77 e9 9c 80 40 76 54 98 e5 50 91
+            """,
+            "f5 b8 b3 d5 f3 4f db b6 eb 8d 4a b9 66 0e 60 e3",
+            "e4 77 e9 9c 80 40 76 54 98 e5 50 91",
+            0x787A1DDF,
+            0,
+            include_options=True,
+            alg_name="AES-128-CMAC-96",
+        )
+
+    def test_6_1_1(self):
+        self.check(
+            """
+            6e 08 91 dc 00 38 06 40 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 01 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 02 f7 e4 00 b3 17 6a 83 3f
+            00 00 00 00 e0 02 ff ff 47 21 00 00 02 04 05 a0
+            01 03 03 08 04 02 08 0a 00 41 d0 87 00 00 00 00
+            1d 10 3d 54 90 33 ec 3d 73 34 b6 4c 5e dd 03 9f
+            """,
+            "62 5e c0 9d 57 58 36 ed c9 b6 42 84 18 bb f0 69 89 a3 61 bb",
+            "90 33 ec 3d 73 34 b6 4c 5e dd 03 9f",
+            self.client_isn_61x,
+            0,
+            include_options=True,
+        )
+
+    def test_6_1_2(self):
+        self.check(
+            """
+            6e 01 00 9e 00 38 06 40 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 01 00 b3 f7 e4 3f 51 99 4b
+            17 6a 83 40 e0 12 ff ff bf ec 00 00 02 04 05 a0
+            01 03 03 08 04 02 08 0a bd 33 12 9b 00 41 d0 87
+            1d 10 54 3d f1 cb a3 46 c3 52 61 63 f7 1f 1f 55
+            """,
+            "e4 a3 7a da 2a 0a fc a8 71 14 34 91 3f e1 38 c7 71 eb cb 4a",
+            "f1 cb a3 46 c3 52 61 63 f7 1f 1f 55",
+            self.server_isn_61x,
+            self.client_isn_61x,
+            include_options=True,
+        )
+
+    def test_6_2_2(self):
+        self.check(
+            """
+            6e 0a 7e 1f 00 38 06 40 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 01 00 b3 c6 cd eb a3 73 4d
+            02 0c 1e 6a e0 12 ff ff 77 4d 00 00 02 04 05 a0
+            01 03 03 08 04 02 08 0a 5e c9 9b 70 00 9d b9 5b
+            1d 10 54 3d 3c 54 6b ad 97 43 f1 2d f8 b8 01 0d
+            """,
+            "40 51 08 94 7f 99 65 75 e7 bd bc 26 d4 02 16 a2 c7 fa 91 bd",
+            "3c 54 6b ad 97 43 f1 2d f8 b8 01 0d",
+            self.server_isn_62x,
+            self.client_isn_62x,
+            include_options=False,
+        )
+
+    def test_6_2_4(self):
+        self.check(
+            """
+            6e 0a 7e 1f 00 73 06 40 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 01 00 b3 c6 cd eb a3 73 4e
+            02 0c 1e ad c0 18 01 00 71 6a 00 00 01 01 08 0a
+            5e c9 9b 7a 00 9d b9 65 1d 10 54 3d 55 9a 81 94
+            45 b4 fd e9 8d 9e 13 17 ff ff ff ff ff ff ff ff
+            ff ff ff ff ff ff ff ff 00 43 01 04 fd e8 00 b4
+            01 01 01 7a 26 02 06 01 04 00 01 00 01 02 02 80
+            00 02 02 02 00 02 02 42 00 02 06 41 04 00 00 fd
+            e8 02 08 40 06 00 64 00 01 01 00
+            """,
+            "40 51 08 94 7f 99 65 75 e7 bd bc 26 d4 02 16 a2 c7 fa 91 bd",
+            "55 9a 81 94 45 b4 fd e9 8d 9e 13 17",
+            self.server_isn_62x,
+            self.client_isn_62x,
+            include_options=False,
+        )
+
+    server_isn_71x = 0xA6744ECB
+    client_isn_71x = 0x193CCCEC
+
+    def test_7_1_2(self):
+        self.check(
+            """
+            6e 06 15 20 00 38 06 40 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 01 00 b3 f8 5a a6 74 4e cb
+            19 3c cc ed e0 12 ff ff ea bb 00 00 02 04 05 a0
+            01 03 03 08 04 02 08 0a 71 da ab c8 13 e4 ab 99
+            1d 10 54 3d dc 28 43 a8 4e 78 a6 bc fd c5 ed 80
+            """,
+            "cf 1b 1e 22 5e 06 a6 36 16 76 4a 06 7b 46 f4 b1",
+            "dc 28 43 a8 4e 78 a6 bc fd c5 ed 80",
+            self.server_isn_71x,
+            self.client_isn_71x,
+            alg_name="AES-128-CMAC-96",
+            include_options=True,
+        )
+
+    def test_7_1_4(self):
+        self.check(
+            """
+            6e 06 15 20 00 73 06 40 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 02 fd 00 00 00 00 00 00 00
+            00 00 00 00 00 00 00 01 00 b3 f8 5a a6 74 4e cc
+            19 3c cd 30 c0 18 01 00 52 f4 00 00 01 01 08 0a
+            71 da ab d3 13 e4 ab a3 1d 10 54 3d c1 06 9b 7d
+            fd 3d 69 3a 6d f3 f2 89 ff ff ff ff ff ff ff ff
+            ff ff ff ff ff ff ff ff 00 43 01 04 fd e8 00 b4
+            01 01 01 7a 26 02 06 01 04 00 01 00 01 02 02 80
+            00 02 02 02 00 02 02 42 00 02 06 41 04 00 00 fd
+            e8 02 08 40 06 00 64 00 01 01 00
+            """,
+            "cf 1b 1e 22 5e 06 a6 36 16 76 4a 06 7b 46 f4 b1",
+            "c1 06 9b 7d fd 3d 69 3a 6d f3 f2 89",
+            self.server_isn_71x,
+            self.client_isn_71x,
+            alg_name="AES-128-CMAC-96",
+            include_options=True,
+        )
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py
new file mode 100644
index 000000000000..9becd39dc31e
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/validator.py
@@ -0,0 +1,127 @@
+# SPDX-License-Identifier: GPL-2.0
+import logging
+import typing
+from dataclasses import dataclass
+
+from scapy.layers.inet import TCP
+from scapy.packet import Packet
+
+from . import scapy_tcp_authopt
+from .scapy_conntrack import TCPConnectionTracker, get_packet_tcp_connection_key
+from .scapy_utils import scapy_tcp_get_authopt_val
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class TcpAuthValidatorKey:
+    """Representation of a TCP Authentication Option key for the validator
+
+    The matching rules are independent.
+    """
+
+    key: bytes
+    alg_name: str
+    include_options: bool = True
+    keyid: typing.Optional[int] = None
+    sport: typing.Optional[int] = None
+    dport: typing.Optional[int] = None
+
+    def match_packet(self, p: Packet) -> bool:
+        """Determine if this key matches a specific packet"""
+        if not TCP in p:
+            return False
+        authopt = scapy_tcp_get_authopt_val(p[TCP])
+        if authopt is None:
+            return False
+        if self.keyid is not None and authopt.keyid != self.keyid:
+            return False
+        if self.sport is not None and p[TCP].sport != self.sport:
+            return False
+        if self.dport is not None and p[TCP].dport != self.dport:
+            return False
+        return True
+
+    def get_alg_imp(self):
+        return scapy_tcp_authopt.get_alg(self.alg_name)
+
+
+class TcpAuthValidator:
+    """Validate TCP Authentication Option signatures inside a capture
+
+    This can track multiple connections, determine their initial sequence numbers
+    and verify their signatues independently.
+
+    Keys are provided as a collection of `.TcpAuthValidatorKey`
+    """
+
+    keys: typing.List[TcpAuthValidatorKey]
+    tracker: TCPConnectionTracker
+    any_incomplete: bool = False
+    any_unsigned: bool = False
+    any_fail: bool = False
+
+    def __init__(self, keys=None):
+        self.keys = keys or []
+        self.tracker = TCPConnectionTracker()
+        self.conn_dict = {}
+
+    def get_key_for_packet(self, p):
+        for k in self.keys:
+            if k.match_packet(p):
+                return k
+        return None
+
+    def handle_packet(self, p: Packet):
+        if not TCP in p:
+            return
+        self.tracker.handle_packet(p)
+        authopt = scapy_tcp_get_authopt_val(p[TCP])
+        if not authopt:
+            self.any_unsigned = True
+            logger.debug("skip packet without tcp authopt: %r", p)
+            return
+        key = self.get_key_for_packet(p)
+        if not key:
+            self.any_unsigned = True
+            logger.debug("skip packet not matching any known keys: %r", p)
+            return
+        tcp_track_key = get_packet_tcp_connection_key(p)
+        conn = self.tracker.get(tcp_track_key)
+
+        if not conn.found_syn:
+            logger.warning("missing SYN for %s", p)
+            self.any_incomplete = True
+            return
+        if not conn.found_synack and not p[TCP].flags.S:
+            logger.warning("missing SYNACK for %s", p)
+            self.any_incomplete = True
+            return
+
+        alg = key.get_alg_imp()
+        context_bytes = scapy_tcp_authopt.build_context_from_packet(
+            p, conn.sisn or 0, conn.disn or 0
+        )
+        traffic_key = alg.kdf(key.key, context_bytes)
+        message_bytes = scapy_tcp_authopt.build_message_from_packet(
+            p, include_options=key.include_options
+        )
+        computed_mac = alg.mac(traffic_key, message_bytes)
+        captured_mac = authopt.mac
+        if computed_mac == captured_mac:
+            logger.debug("ok - mac %s", computed_mac.hex())
+        else:
+            self.any_fail = True
+            logger.error(
+                "not ok - captured %s computed %s",
+                captured_mac.hex(),
+                computed_mac.hex(),
+            )
+
+    def raise_errors(self, allow_unsigned=False, allow_incomplete=False):
+        if self.any_fail:
+            raise Exception("Found failed signatures")
+        if self.any_incomplete and not allow_incomplete:
+            raise Exception("Incomplete capture missing SYN/ACK")
+        if self.any_unsigned and not allow_unsigned:
+            raise Exception("Found unsigned packets")
-- 
2.25.1




[Index of Archives]     [Kernel]     [Gnu Classpath]     [Gnu Crypto]     [DM Crypt]     [Netfilter]     [Bugtraq]

  Powered by Linux