LKML Archive on lore.kernel.org
help / color / mirror / Atom feed
From: Leonard Crestez <cdleonard@gmail.com>
To: Dmitry Safonov <0x7f454c46@gmail.com>,
David Ahern <dsahern@kernel.org>, Shuah Khan <shuah@kernel.org>
Cc: Eric Dumazet <edumazet@google.com>,
"David S. Miller" <davem@davemloft.net>,
Herbert Xu <herbert@gondor.apana.org.au>,
Kuniyuki Iwashima <kuniyu@amazon.co.jp>,
Hideaki YOSHIFUJI <yoshfuji@linux-ipv6.org>,
Jakub Kicinski <kuba@kernel.org>,
Yuchung Cheng <ycheng@google.com>,
Francesco Ruggeri <fruggeri@arista.com>,
Mat Martineau <mathew.j.martineau@linux.intel.com>,
Christoph Paasch <cpaasch@apple.com>,
Ivan Delalande <colona@arista.com>,
Priyaranjan Jha <priyarjha@google.com>,
Menglong Dong <dong.menglong@zte.com.cn>,
netdev@vger.kernel.org, linux-crypto@vger.kernel.org,
linux-kselftest@vger.kernel.org, linux-kernel@vger.kernel.org
Subject: [RFCv3 04/15] selftests: tcp_authopt: Initial sockopt manipulation
Date: Wed, 25 Aug 2021 00:34:37 +0300 [thread overview]
Message-ID: <e5d1d887325815300c92953575b2b5272e5bc0e5.1629840814.git.cdleonard@gmail.com> (raw)
In-Reply-To: <cover.1629840814.git.cdleonard@gmail.com>
Signed-off-by: Leonard Crestez <cdleonard@gmail.com>
---
.../tcp_authopt/tcp_authopt_test/conftest.py | 21 ++
.../tcp_authopt_test/linux_tcp_authopt.py | 188 ++++++++++++++++++
.../tcp_authopt/tcp_authopt_test/sockaddr.py | 101 ++++++++++
.../tcp_authopt_test/test_sockopt.py | 74 +++++++
4 files changed, 384 insertions(+)
create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
new file mode 100644
index 000000000000..c17c8ea2a943
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
@@ -0,0 +1,21 @@
+# SPDX-License-Identifier: GPL-2.0
+from tcp_authopt_test.linux_tcp_authopt import has_tcp_authopt
+import pytest
+import logging
+from contextlib import ExitStack
+
+logger = logging.getLogger(__name__)
+
+skipif_missing_tcp_authopt = pytest.mark.skipif(
+ not has_tcp_authopt(), reason="Need CONFIG_TCP_AUTHOPT"
+)
+
+
+@pytest.fixture
+def exit_stack():
+ """Return a contextlib.ExitStack as a pytest fixture
+
+ This reduces indentation making code more readable
+ """
+ with ExitStack() as exit_stack:
+ yield exit_stack
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
new file mode 100644
index 000000000000..41374f9851aa
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
@@ -0,0 +1,188 @@
+# SPDX-License-Identifier: GPL-2.0
+"""Python wrapper around linux TCP_AUTHOPT ABI"""
+
+from dataclasses import dataclass
+from ipaddress import IPv4Address, IPv6Address, ip_address
+import socket
+import errno
+import logging
+from .sockaddr import sockaddr_in, sockaddr_in6, sockaddr_storage, sockaddr_unpack
+import typing
+import struct
+
+logger = logging.getLogger(__name__)
+
+
+def BIT(x):
+ return 1 << x
+
+
+TCP_AUTHOPT = 38
+TCP_AUTHOPT_KEY = 39
+
+TCP_AUTHOPT_MAXKEYLEN = 80
+
+TCP_AUTHOPT_FLAG_REJECT_UNEXPECTED = BIT(2)
+
+TCP_AUTHOPT_KEY_DEL = BIT(0)
+TCP_AUTHOPT_KEY_EXCLUDE_OPTS = BIT(1)
+TCP_AUTHOPT_KEY_BIND_ADDR = BIT(2)
+
+TCP_AUTHOPT_ALG_HMAC_SHA_1_96 = 1
+TCP_AUTHOPT_ALG_AES_128_CMAC_96 = 2
+
+
+@dataclass
+class tcp_authopt:
+ """Like linux struct tcp_authopt"""
+
+ flags: int = 0
+ sizeof = 4
+
+ def pack(self) -> bytes:
+ return struct.pack(
+ "I",
+ self.flags,
+ )
+
+ def __bytes__(self):
+ return self.pack()
+
+ @classmethod
+ def unpack(cls, b: bytes):
+ tup = struct.unpack("I", b)
+ return cls(*tup)
+
+
+def set_tcp_authopt(sock, opt: tcp_authopt):
+ return sock.setsockopt(socket.IPPROTO_TCP, TCP_AUTHOPT, bytes(opt))
+
+
+def get_tcp_authopt(sock: socket.socket) -> tcp_authopt:
+ b = sock.getsockopt(socket.IPPROTO_TCP, TCP_AUTHOPT, tcp_authopt.sizeof)
+ return tcp_authopt.unpack(b)
+
+
+class tcp_authopt_key:
+ """Like linux struct tcp_authopt_key"""
+
+ def __init__(
+ self,
+ flags: int = 0,
+ send_id: int = 0,
+ recv_id: int = 0,
+ alg=TCP_AUTHOPT_ALG_HMAC_SHA_1_96,
+ key: bytes = b"",
+ addr: bytes = b"",
+ include_options=None,
+ ):
+ self.flags = flags
+ self.send_id = send_id
+ self.recv_id = recv_id
+ self.alg = alg
+ self.key = key
+ self.addr = addr
+ if include_options is not None:
+ self.include_options = include_options
+
+ def pack(self):
+ if len(self.key) > TCP_AUTHOPT_MAXKEYLEN:
+ raise ValueError(f"Max key length is {TCP_AUTHOPT_MAXKEYLEN}")
+ data = struct.pack(
+ "IBBBB80s",
+ self.flags,
+ self.send_id,
+ self.recv_id,
+ self.alg,
+ len(self.key),
+ self.key,
+ )
+ data += bytes(self.addrbuf.ljust(sockaddr_storage.sizeof, b"\x00"))
+ return data
+
+ def __bytes__(self):
+ return self.pack()
+
+ @property
+ def key(self) -> bytes:
+ return self._key
+
+ @key.setter
+ def key(self, val: typing.Union[bytes, str]) -> bytes:
+ if isinstance(val, str):
+ val = val.encode("utf-8")
+ if len(val) > TCP_AUTHOPT_MAXKEYLEN:
+ raise ValueError(f"Max key length is {TCP_AUTHOPT_MAXKEYLEN}")
+ self._key = val
+ return val
+
+ @property
+ def addr(self):
+ if not self.addrbuf:
+ return None
+ else:
+ return sockaddr_unpack(bytes(self.addrbuf))
+
+ @addr.setter
+ def addr(self, val):
+ if isinstance(val, bytes):
+ if len(val) > sockaddr_storage.sizeof:
+ raise ValueError(f"Must be up to {sockaddr_storage.sizeof}")
+ self.addrbuf = val
+ elif val is None:
+ self.addrbuf = b""
+ elif isinstance(val, str):
+ self.addr = ip_address(val)
+ elif isinstance(val, IPv4Address):
+ self.addr = sockaddr_in(addr=val)
+ elif isinstance(val, IPv6Address):
+ self.addr = sockaddr_in6(addr=val)
+ elif (
+ isinstance(val, sockaddr_in)
+ or isinstance(val, sockaddr_in6)
+ or isinstance(val, sockaddr_storage)
+ ):
+ self.addr = bytes(val)
+ else:
+ raise TypeError(f"Can't handle addr {val}")
+ return self.addr
+
+ @property
+ def include_options(self) -> bool:
+ return (self.flags & TCP_AUTHOPT_KEY_EXCLUDE_OPTS) == 0
+
+ @include_options.setter
+ def include_options(self, value) -> bool:
+ if value:
+ self.flags &= ~TCP_AUTHOPT_KEY_EXCLUDE_OPTS
+ else:
+ self.flags |= TCP_AUTHOPT_KEY_EXCLUDE_OPTS
+
+ @property
+ def delete_flag(self) -> bool:
+ return bool(self.flags & TCP_AUTHOPT_KEY_DEL)
+
+ @delete_flag.setter
+ def delete_flag(self, value) -> bool:
+ if value:
+ self.flags |= TCP_AUTHOPT_KEY_DEL
+ else:
+ self.flags &= ~TCP_AUTHOPT_KEY_DEL
+
+
+def set_tcp_authopt_key(sock, key: tcp_authopt_key):
+ return sock.setsockopt(socket.IPPROTO_TCP, TCP_AUTHOPT_KEY, bytes(key))
+
+
+def has_tcp_authopt() -> bool:
+ """Check is TCP_AUTHOPT is implemented by the OS"""
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ try:
+ optbuf = bytes(4)
+ sock.setsockopt(socket.IPPROTO_TCP, TCP_AUTHOPT, optbuf)
+ return True
+ except OSError as e:
+ if e.errno == errno.ENOPROTOOPT:
+ return False
+ else:
+ raise
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
new file mode 100644
index 000000000000..f61d0f190a0c
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
@@ -0,0 +1,101 @@
+# SPDX-License-Identifier: GPL-2.0
+"""pack/unpack wrappers for sockaddr"""
+import socket
+import struct
+from dataclasses import dataclass
+from ipaddress import IPv4Address, IPv6Address
+
+
+@dataclass
+class sockaddr_in:
+ port: int
+ addr: IPv4Address
+ sizeof = 8
+
+ def __init__(self, port=0, addr=None):
+ self.port = port
+ if addr is None:
+ addr = IPv4Address(0)
+ self.addr = IPv4Address(addr)
+
+ def pack(self):
+ return struct.pack("HH4s", socket.AF_INET, self.port, self.addr.packed)
+
+ @classmethod
+ def unpack(cls, buffer):
+ family, port, addr_packed = struct.unpack("HH4s", buffer[:8])
+ if family != socket.AF_INET:
+ raise ValueError(f"Must be AF_INET not {family}")
+ return cls(port, addr_packed)
+
+ def __bytes__(self):
+ return self.pack()
+
+
+@dataclass
+class sockaddr_in6:
+ """Like sockaddr_in6 but for python. Always contains scope_id"""
+
+ port: int
+ addr: IPv6Address
+ flowinfo: int
+ scope_id: int
+ sizeof = 28
+
+ def __init__(self, port=0, addr=None, flowinfo=0, scope_id=0):
+ self.port = port
+ if addr is None:
+ addr = IPv6Address(0)
+ self.addr = IPv6Address(addr)
+ self.flowinfo = flowinfo
+ self.scope_id = scope_id
+
+ def pack(self):
+ return struct.pack(
+ "HHI16sI",
+ socket.AF_INET6,
+ self.port,
+ self.flowinfo,
+ self.addr.packed,
+ self.scope_id,
+ )
+
+ @classmethod
+ def unpack(cls, buffer):
+ family, port, flowinfo, addr_packed, scope_id = struct.unpack(
+ "HHI16sI", buffer[:28]
+ )
+ if family != socket.AF_INET6:
+ raise ValueError(f"Must be AF_INET6 not {family}")
+ return cls(port, addr_packed, flowinfo=flowinfo, scope_id=scope_id)
+
+ def __bytes__(self):
+ return self.pack()
+
+
+@dataclass
+class sockaddr_storage:
+ family: int
+ data: bytes
+ sizeof = 128
+
+ def pack(self):
+ return struct.pack("H126s", self.family, self.data)
+
+ def __bytes__(self):
+ return self.pack()
+
+ @classmethod
+ def unpack(cls, buffer):
+ return cls(*struct.unpack("H126s", buffer))
+
+
+def sockaddr_unpack(buffer: bytes):
+ """Unpack based on family"""
+ family = struct.unpack("H", buffer[:2])[0]
+ if family == socket.AF_INET:
+ return sockaddr_in.unpack(buffer)
+ elif family == socket.AF_INET6:
+ return sockaddr_in6.unpack(buffer)
+ else:
+ return sockaddr_storage.unpack(buffer)
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py
new file mode 100644
index 000000000000..06a05bf8aeec
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py
@@ -0,0 +1,74 @@
+# SPDX-License-Identifier: GPL-2.0
+"""Test TCP_AUTHOPT sockopt API"""
+import errno
+import socket
+import struct
+from ipaddress import IPv4Address, IPv6Address
+
+import pytest
+
+from .linux_tcp_authopt import (
+ set_tcp_authopt,
+ set_tcp_authopt_key,
+ tcp_authopt,
+ tcp_authopt_key,
+)
+from .sockaddr import sockaddr_unpack
+from .conftest import skipif_missing_tcp_authopt
+
+pytestmark = skipif_missing_tcp_authopt
+
+
+def test_authopt_key_pack_noaddr():
+ b = bytes(tcp_authopt_key(key=b"a\x00b"))
+ assert b[7] == 3
+ assert b[8:13] == b"a\x00b\x00\x00"
+
+
+def test_authopt_key_pack_addr():
+ b = bytes(tcp_authopt_key(key=b"a\x00b", addr="10.0.0.1"))
+ assert struct.unpack("H", b[88:90])[0] == socket.AF_INET
+ assert sockaddr_unpack(b[88:]).addr == IPv4Address("10.0.0.1")
+
+
+def test_authopt_key_pack_addr6():
+ b = bytes(tcp_authopt_key(key=b"abc", addr="fd00::1"))
+ assert struct.unpack("H", b[88:90])[0] == socket.AF_INET6
+ assert sockaddr_unpack(b[88:]).addr == IPv6Address("fd00::1")
+
+
+def test_tcp_authopt_key_del_without_active(exit_stack):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ exit_stack.push(sock)
+
+ # nothing happens:
+ key = tcp_authopt_key()
+ assert key.delete_flag is False
+ key.delete_flag = True
+ assert key.delete_flag is True
+ with pytest.raises(OSError) as e:
+ set_tcp_authopt_key(sock, key)
+ assert e.value.errno in [errno.EINVAL, errno.ENOENT]
+
+
+def test_tcp_authopt_key_setdel(exit_stack):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ exit_stack.push(sock)
+ set_tcp_authopt(sock, tcp_authopt())
+
+ # delete returns ENOENT
+ key = tcp_authopt_key()
+ key.delete_flag = True
+ with pytest.raises(OSError) as e:
+ set_tcp_authopt_key(sock, key)
+ assert e.value.errno == errno.ENOENT
+
+ key = tcp_authopt_key(send_id=1, recv_id=2)
+ set_tcp_authopt_key(sock, key)
+ # First delete works fine:
+ key.delete_flag = True
+ set_tcp_authopt_key(sock, key)
+ # Duplicate delete returns ENOENT
+ with pytest.raises(OSError) as e:
+ set_tcp_authopt_key(sock, key)
+ assert e.value.errno == errno.ENOENT
--
2.25.1
next prev parent reply other threads:[~2021-08-24 21:36 UTC|newest]
Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-08-24 21:34 [RFCv3 00/15] tcp: Initial support for RFC5925 auth option Leonard Crestez
2021-08-24 21:34 ` [RFCv3 01/15] tcp: authopt: Initial support and key management Leonard Crestez
2021-08-31 19:04 ` Dmitry Safonov
2021-09-03 14:26 ` Leonard Crestez
2021-08-24 21:34 ` [RFCv3 02/15] docs: Add user documentation for tcp_authopt Leonard Crestez
2021-08-24 21:34 ` [RFCv3 03/15] selftests: Initial tcp_authopt test module Leonard Crestez
2021-08-24 21:34 ` Leonard Crestez [this message]
2021-08-24 21:34 ` [RFCv3 05/15] tcp: authopt: Add crypto initialization Leonard Crestez
2021-08-24 23:02 ` Eric Dumazet
2021-08-24 23:34 ` Eric Dumazet
2021-08-25 8:08 ` Herbert Xu
2021-08-25 14:55 ` Eric Dumazet
2021-08-25 16:04 ` Ard Biesheuvel
2021-08-25 16:31 ` Leonard Crestez
2021-08-25 16:35 ` Leonard Crestez
2021-08-25 17:55 ` Eric Dumazet
2021-08-25 18:56 ` Leonard Crestez
2021-08-24 21:34 ` [RFCv3 06/15] tcp: authopt: Compute packet signatures Leonard Crestez
2021-08-24 21:34 ` [RFCv3 07/15] tcp: authopt: Hook into tcp core Leonard Crestez
2021-08-24 22:59 ` Eric Dumazet
2021-08-25 16:32 ` Leonard Crestez
2021-08-24 21:34 ` [RFCv3 08/15] tcp: authopt: Add snmp counters Leonard Crestez
2021-08-24 21:34 ` [RFCv3 09/15] selftests: tcp_authopt: Test key address binding Leonard Crestez
2021-08-25 5:18 ` David Ahern
2021-08-25 16:37 ` Leonard Crestez
2021-08-24 21:34 ` [RFCv3 10/15] selftests: tcp_authopt: Capture and verify packets Leonard Crestez
2021-08-24 21:34 ` [RFCv3 11/15] selftests: Initial tcp_authopt support for nettest Leonard Crestez
2021-08-24 21:34 ` [RFCv3 12/15] selftests: Initial tcp_authopt support for fcnal-test Leonard Crestez
2021-08-24 21:34 ` [RFCv3 13/15] selftests: Add -t tcp_authopt option for fcnal-test.sh Leonard Crestez
2021-08-24 21:34 ` [RFCv3 14/15] tcp: authopt: Add key selection controls Leonard Crestez
2021-08-24 21:34 ` [RFCv3 15/15] selftests: tcp_authopt: Add tests for rollover Leonard Crestez
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=e5d1d887325815300c92953575b2b5272e5bc0e5.1629840814.git.cdleonard@gmail.com \
--to=cdleonard@gmail.com \
--cc=0x7f454c46@gmail.com \
--cc=colona@arista.com \
--cc=cpaasch@apple.com \
--cc=davem@davemloft.net \
--cc=dong.menglong@zte.com.cn \
--cc=dsahern@kernel.org \
--cc=edumazet@google.com \
--cc=fruggeri@arista.com \
--cc=herbert@gondor.apana.org.au \
--cc=kuba@kernel.org \
--cc=kuniyu@amazon.co.jp \
--cc=linux-crypto@vger.kernel.org \
--cc=linux-kernel@vger.kernel.org \
--cc=linux-kselftest@vger.kernel.org \
--cc=mathew.j.martineau@linux.intel.com \
--cc=netdev@vger.kernel.org \
--cc=priyarjha@google.com \
--cc=shuah@kernel.org \
--cc=ycheng@google.com \
--cc=yoshfuji@linux-ipv6.org \
--subject='Re: [RFCv3 04/15] selftests: tcp_authopt: Initial sockopt manipulation' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).