if cdata.has('HAVE_SYS_UN_H')
src_tincd += 'fd_device.c'
+ cdata.set('HAVE_FD_DEVICE', 1)
endif
confdata = configuration_data()
#endif
#ifdef ENABLE_VDE
" vde"
+#endif
+#ifdef HAVE_WATCHDOG
+ " watchdog"
#endif
"\n\n"
"Copyright (C) 1998-2021 Ivo Timmermans, Guus Sliepen and others.\n"
def init(ctx: Test) -> T.Tuple[Tinc, Tinc]:
"""Create test node."""
- foo, bar = ctx.node(), ctx.node()
-
- stdin = f"""
- init {foo}
- set Port 0
- set Address localhost
- set DeviceType dummy
- set AutoConnect no
- """
- foo.cmd(stdin=stdin)
-
+ bar = ctx.node()
+ foo = ctx.node(init="set AutoConnect no")
return foo, bar
def init(ctx: Test) -> Tinc:
"""Initialize new test nodes."""
- node = ctx.node()
+ node = ctx.node(init=f"set Sandbox {SANDBOX_LEVEL}")
node.add_script(Script.TINC_UP)
- stdin = f"""
- init {node}
- set Address localhost
- set Port 0
- set DeviceType dummy
- set Sandbox {SANDBOX_LEVEL}
- """
- node.cmd(stdin=stdin)
return node
node.cmd("stop")
log.info("checking tincd exit code")
- check.equals(0, tincd.wait())
+ check.success(tincd.wait())
with Test("foreground mode") as context:
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test binding to interfaces and addresses."""
+
+import json
+import socket
+import subprocess as subp
+import sys
+import typing as T
+
+from testlib import check, util
+from testlib.const import EXIT_SKIP
+from testlib.log import log
+from testlib.test import Test
+
+util.require_command("ss", "-nlup")
+util.require_command("ip", "--json", "addr")
+
+
+def connect_tcp(address: str, port: int) -> None:
+ """Check that a TCP connection to (address, port) works."""
+
+ family = socket.AF_INET if "." in address else socket.AF_INET6
+
+ with socket.socket(family, socket.SOCK_STREAM) as sock:
+ sock.connect((address, port))
+
+
+def get_interfaces() -> T.List[T.Tuple[str, T.List[str]]]:
+ """Get a list of network interfaces with assigned addresses."""
+
+ output = subp.run(
+ ["ip", "--json", "addr"], check=True, encoding="utf-8", stdout=subp.PIPE
+ ).stdout
+
+ result: T.List[T.Tuple[str, T.List[str]]] = []
+
+ for line in json.loads(output):
+ if not "UP" in line["flags"]:
+ continue
+ local: T.List[str] = []
+ for addr in line["addr_info"]:
+ if addr["family"] in ("inet", "inet6"):
+ local.append(addr["local"])
+ if local:
+ result.append((line["ifname"], local))
+
+ return result
+
+
+INTERFACES = get_interfaces()
+
+
+def get_udp_listen(pid: int) -> T.List[str]:
+ """Get a list of the currently listening UDP sockets."""
+
+ listen = subp.run(["ss", "-nlup"], check=True, stdout=subp.PIPE, encoding="utf-8")
+ addresses: T.List[str] = []
+
+ for line in listen.stdout.splitlines():
+ if f"pid={pid}," in line:
+ _, _, _, addr, _ = line.split(maxsplit=4)
+ addresses.append(addr)
+
+ return addresses
+
+
+def test_bind_interface(ctx: Test) -> None:
+ """Test BindToInterface."""
+
+ devname, addresses = INTERFACES[0]
+ log.info("using interface %s, addresses (%s)", devname, addresses)
+
+ init = f"""
+ set BindToInterface {devname}
+ set LogLevel 5
+ """
+ foo = ctx.node(init=init)
+ foo.start()
+
+ log.info("check that tincd opened UDP sockets")
+ listen = get_udp_listen(foo.pid)
+ check.is_in(f"%{devname}:{foo.port}", *listen)
+
+ log.info("check TCP sockets")
+ for addr in addresses:
+ connect_tcp(addr, foo.port)
+
+
+def test_bind_address(ctx: Test, kind: str) -> None:
+ """Test BindToAddress or ListenAddress."""
+
+ _, addresses = INTERFACES[0]
+
+ log.info("create and start tincd node")
+ foo = ctx.node(init="set LogLevel 10")
+ for addr in addresses:
+ foo.cmd("add", kind, addr)
+ foo.start()
+
+ log.info("check for correct log message")
+ for addr in addresses:
+ check.in_file(foo.sub("log"), f"Listening on {addr}")
+
+ log.info("test TCP connections")
+ for addr in addresses:
+ connect_tcp(addr, foo.port)
+
+ log.info("check that tincd opened UDP sockets")
+ listen = get_udp_listen(foo.pid)
+ for addr in addresses:
+ check.is_in(addr, *listen)
+ check.is_in(f":{foo.port}", *listen)
+ check.equals(len(addresses), len(listen))
+
+
+if not INTERFACES:
+ log.info("interface list is empty, skipping test")
+ sys.exit(EXIT_SKIP)
+
+with Test("test ListenAddress") as context:
+ test_bind_address(context, "ListenAddress")
+
+with Test("test BindToAddress") as context:
+ test_bind_address(context, "BindToAddress")
+
+with Test("test BindToInterface") as context:
+ test_bind_interface(context)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test binding to ports on localhost."""
+
+import socket
+import sys
+import typing as T
+
+from testlib import check, util
+from testlib.const import EXIT_SKIP
+from testlib.log import log
+from testlib.proc import Script
+from testlib.test import Test
+
+# Call to close opened port
+Closer = T.Callable[[], None]
+
+
+def bind_random_port() -> T.Tuple[T.Optional[int], Closer]:
+ """Bind to random port and return it, keeping the bind."""
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(("127.0.0.1", 0))
+ sock.listen()
+ _, port = sock.getsockname()
+ return port, sock.close
+ except OSError:
+ return None, sys.exit
+
+
+def test_bind_port(ctx: Test, ok_ports: T.List[int], bad_ports: T.List[int]) -> None:
+ """Test binding to ports on localhost."""
+
+ foo = ctx.node(init="set LogLevel 1")
+ foo.add_script(Script.TINC_UP)
+ foo.add_script(Script.TINC_DOWN)
+ log_path = foo.sub("log")
+
+ if ok_ports:
+ log.info("check that tincd successfully binds to %s", ok_ports)
+
+ for port in ok_ports:
+ foo.cmd("add", "BindToAddress", f"127.0.0.1 {port}")
+
+ proc = foo.tincd("-D")
+ foo[Script.TINC_UP].wait()
+ foo.cmd("stop")
+ foo[Script.TINC_DOWN].wait()
+ check.success(proc.wait())
+
+ foo_log = util.read_text(log_path)
+
+ for port in ok_ports:
+ check.is_in(f"Listening on 127.0.0.1 port {port}", foo_log)
+
+ if bad_ports:
+ log.info("check that tincd fails to bind to %s", bad_ports)
+
+ for port in bad_ports:
+ foo.cmd("add", "BindToAddress", f"127.0.0.1 {port}")
+
+ util.remove_file(log_path)
+ proc = foo.tincd("-D")
+
+ # Flush logs to the log file
+ if ok_ports:
+ foo[Script.TINC_UP].wait()
+ foo.cmd("stop")
+ foo[Script.TINC_DOWN].wait()
+ check.success(proc.wait())
+ else:
+ check.failure(proc.wait())
+
+ foo_log = util.read_text(log_path)
+
+ for port in bad_ports:
+ check.is_in(f"Can't bind to 127.0.0.1 port {port}", foo_log)
+
+ if not ok_ports:
+ check.is_in("Unable to create any listening socket", foo_log)
+
+
+port0, close0 = bind_random_port()
+port1, close1 = bind_random_port()
+
+if not port0 or not port1:
+ log.info("could not bind ports, skipping test")
+ sys.exit(EXIT_SKIP)
+
+with Test("test binding with both ports unavailable") as context:
+ test_bind_port(context, [], [port0, port1])
+
+with Test("test binding to one free and one unavailable port") as context:
+ close0()
+ test_bind_port(context, [port0], [port1])
+
+with Test("test binding to two free ports") as context:
+ close1()
+ test_bind_port(context, [port0, port1], [])
)
-def init(ctx: Test) -> Tinc:
- """Initialize a node."""
-
- node = ctx.node()
- stdin = f"""
- init {node}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- node.cmd(stdin=stdin)
- return node
-
-
def try_dot(src: str) -> None:
"""Try passing graph source through the dot binary, if it's present."""
try:
def run_tests(ctx: Test) -> None:
"""Run all tests."""
- foo = init(ctx)
- bar = ctx.node()
+ foo, bar = ctx.node(init=True), ctx.node()
log.info("set %s subnets", foo)
for sub in SUBNETS_FOO:
def init(ctx: Test) -> Tinc:
"""Initialize a node."""
-
- node = ctx.node()
- stdin = f"""
- init {node}
- set Port 0
- set Address localhost
- set DeviceType dummy
- set AutoConnect no
- """
- node.cmd(stdin=stdin)
- return node
+ return ctx.node(init="set AutoConnect no")
def test_import(foo: Tinc) -> None:
FAKE_INVITE = "localhost:65535/pVOZMJGm3MqTvTu0UnhMGb2cfuqygiu79MdnERnGYdga5v8C"
-def init(ctx: Test) -> Tinc:
- """Initialize a node."""
-
- node = ctx.node()
- stdin = f"""
- init {node}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- node.cmd(stdin=stdin)
- return node
-
-
def test_invite(foo: Tinc) -> None:
"""Test successful 'invite'."""
with Test("run invite success tests") as context:
- test_invite(init(context))
+ test_invite(context.node(init=True))
with Test("run invite error tests") as context:
- test_invite_errors(init(context))
+ test_invite_errors(context.node(init=True))
with Test("run join tests") as context:
- test_join_errors(init(context))
+ test_join_errors(context.node(init=True))
from testlib.test import Test
-def init(ctx: Test) -> Tinc:
- """Initialize a node."""
-
- node = ctx.node()
- stdin = f"""
- init {node}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- node.cmd(stdin=stdin)
- return node
-
-
TEST_DATA = b"foo bar baz"
with Test("run tests") as context:
- run_tests(init(context))
+ run_tests(context.node(init=True))
#!/usr/bin/env python3
"""Test miscellaneous commands."""
-
-import time
+import os
import typing as T
-from testlib import check, cmd, util
+from testlib import check, cmd
from testlib.log import log
from testlib.proc import Tinc, Script
from testlib.test import Test
SUBNETS_BAR = ("10.20.30.40", "fe80::")
-def init(ctx: Test) -> Tinc:
- """Initialize a node."""
-
- node = ctx.node()
- stdin = f"""
- init {node}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- node.cmd(stdin=stdin)
- return node
-
-
def configure_nodes(ctx: Test) -> T.Tuple[Tinc, Tinc]:
"""Create and configure nodes."""
log.info("initialize nodes")
- foo, bar = init(ctx), init(ctx)
+ foo, bar = ctx.node(init=True), ctx.node(init=True)
log.info("configure and start nodes")
foo.cmd("add", "Subnet", "1.2.3.4")
check.is_in("Too many arguments", err)
log.info("test pid without arguments")
- pidfile = util.read_text(foo.pid_file)
- pid, _ = pidfile.split(maxsplit=1)
-
out, _ = foo.cmd("pid")
- check.equals(pid, out.strip())
+ check.equals(foo.pid, int(out.strip()))
def test_debug(foo: Tinc) -> None:
log.info("test correct call")
log_client = foo.tinc("log")
+ foo.cmd("set", "LogLevel", "10")
foo.cmd("reload")
+
+ foo.add_script(Script.TINC_DOWN)
foo.cmd("stop")
- time.sleep(1)
+ foo[Script.TINC_DOWN].wait()
out, _ = log_client.communicate()
check.true(out)
test_pid(foo)
test_debug(foo)
test_log(foo)
- test_restart(foo)
+
+ # Too unstable on Windows because of how it works with services (impossible to
+ # start the service if it has been marked for deletion, but not yet deleted).
+ # Since lots of things can prevent service removal (like opened task manager or
+ # services.msc) the `restart` command is inherently unreliable.
+ if os.name != "nt":
+ test_restart(foo)
with Test("run tests") as context:
def init(ctx: Test) -> Tinc:
"""Initialize a node."""
-
- node = ctx.node()
- stdin = f"""
- init {node}
- set Port 0
- set Address localhost
- set DeviceType dummy
- set AutoConnect no
- """
- node.cmd(stdin=stdin)
- return node
+ return ctx.node(init="set AutoConnect no")
def test_network(foo: Tinc) -> None:
def init(ctx: Test) -> Tinc:
"""Initialize new test nodes."""
- tinc = ctx.node()
- stdin = f"""
- init {tinc}
- set Port 0
- set Address localhost
- set DeviceType dummy
- set Sandbox {SANDBOX_LEVEL}
- """
- tinc.cmd(stdin=stdin)
+ tinc = ctx.node(init=f"set Sandbox {SANDBOX_LEVEL}")
tinc.add_script(Script.TINC_UP)
return tinc
log.info("stopping tinc through '%s'", pidfile)
foo.cmd("--pidfile", pidfile, "stop")
- check.equals(0, tincd.wait())
+ check.success(tincd.wait())
# Leave behind as debugging aid if there's an exception
shutil.rmtree(shortcut)
recv = receiver.stdout.read()
log.info('received %d bytes: "%s"', len(recv), recv)
- check.equals(0, receiver.wait())
+ check.success(receiver.wait())
check.equals(CONTENT, recv.rstrip())
"""Test that unsupported compression level fails to start."""
tincd = node.tincd()
_, stderr = tincd.communicate()
- check.equals(1, tincd.returncode)
+ check.failure(tincd.returncode)
check.is_in("Bogus compression level", stderr)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test device configuration variables."""
+
+import os
+import platform
+import typing as T
+
+from testlib import check
+from testlib.feature import Feature
+from testlib.log import log
+from testlib.proc import Script
+from testlib.test import Test
+
+system = platform.system()
+
+
+def unknown_device_types(
+ features: T.Container[Feature],
+) -> T.Generator[str, T.Any, None]:
+ """Get devices unsupported by current OS."""
+
+ yield "foobar"
+
+ if Feature.UML not in features:
+ yield "uml"
+
+ if Feature.TUNEMU not in features:
+ yield "tunemu"
+
+ if system != "Darwin":
+ if not system.endswith("BSD"):
+ yield "tunnohead"
+ yield "tunifhead"
+
+ yield "utun"
+
+ if system == "Windows":
+ yield "tun"
+ yield "tap"
+
+
+def test_unknown_types(ctx: Test) -> None:
+ """Test unknown device types."""
+
+ foo = ctx.node(init=True)
+
+ for dev_type in unknown_device_types(foo.features):
+ log.info("testing unknown device type %s", dev_type)
+ _, err = foo.cmd("start", "-o", f"DeviceType={dev_type}", code=1)
+ check.is_in(f"Unknown device type {dev_type}", err)
+
+
+def test_device_standby(ctx: Test) -> None:
+ """Test DeviceStandby."""
+
+ foo, bar, baz = ctx.node(init=True), ctx.node(), ctx.node()
+
+ log.info("configure %s", foo)
+ foo.cmd("set", "DeviceStandby", "yes")
+ foo.add_script(Script.TINC_UP)
+ foo.add_script(Script.TINC_DOWN)
+
+ log.info("starting tincd must not call tinc-up")
+ foo.cmd("start")
+ assert not foo[Script.TINC_UP].wait(timeout=1)
+
+ log.info("invite %s", bar)
+ url, _ = foo.cmd("invite", bar.name)
+ bar.cmd("join", url.strip())
+ bar.cmd("set", "DeviceType", "dummy")
+ bar.cmd("set", "Port", "0")
+
+ log.info("invite %s", baz)
+ url, _ = foo.cmd("invite", baz.name)
+ baz.cmd("join", url.strip())
+ baz.cmd("set", "DeviceType", "dummy")
+ baz.cmd("set", "Port", "0")
+
+ log.info("starting first client must call tinc-up")
+ bar.start()
+ foo[Script.TINC_UP].wait()
+
+ log.info("starting second client must not call tinc-up")
+ baz.start()
+ assert not foo[Script.TINC_UP].wait(timeout=1)
+
+ log.info("stopping next-to-last client must not call tinc-down")
+ bar.add_script(Script.TINC_DOWN)
+ bar.cmd("stop")
+ bar[Script.TINC_DOWN].wait()
+ assert not foo[Script.TINC_DOWN].wait(timeout=1)
+
+ log.info("stopping last client must call tinc-down")
+ baz.cmd("stop")
+ foo[Script.TINC_DOWN].wait()
+
+ log.info("stopping tincd must not call tinc-down")
+ foo.cmd("stop")
+ assert not foo[Script.TINC_DOWN].wait(timeout=1)
+
+
+# Device types are not checked on Windows.
+# /dev/net/tun is not available in Docker containers.
+if system != "Windows" and (system != "Linux" or os.path.exists("/dev/net/tun")):
+ with Test("unknown device types") as context:
+ test_unknown_types(context)
+
+if system != "Windows":
+ with Test("test DeviceStandby = yes") as context:
+ test_device_standby(context)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test FD device support."""
+
+import array
+import socket
+import tempfile
+import threading
+import time
+
+from testlib import check
+from testlib.log import log
+from testlib.test import Test
+from testlib.proc import Script
+
+JUNK_FRAME = b"\xFF" * 80
+
+
+def start_fd_server(unix: socket.socket, payload: bytes, file_desc: int) -> None:
+ """Start UNIX socket server and then the FD to the first connected client."""
+
+ def send_fd() -> None:
+ conn, _ = unix.accept()
+ with conn:
+ log.info("accepted connection %s", conn)
+ ancillary = array.array("i", [file_desc])
+ conn.sendmsg([payload], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, ancillary)])
+
+ threading.Thread(target=send_fd).start()
+
+
+def test_device_fd(ctx: Test) -> None:
+ """Test some FD device error conditions."""
+
+ foo = ctx.node(init="set DeviceType fd")
+
+ log.info("test with empty Device")
+ _, err = foo.cmd("start", code=1)
+ check.is_in("Could not read device", err)
+
+ log.info("test with too long UNIX socket path")
+ device = "x" * 110
+ _, err = foo.cmd("start", "-o", f"Device={device}", code=1)
+ check.is_in("Unix socket path too long", err)
+
+ foo.cmd("set", "Device", "/dev/null")
+
+ log.info("check that Mode=switch fails")
+ _, err = foo.cmd("start", "-o", "Mode=switch", code=1)
+ check.is_in("Switch mode not supported", err)
+
+ log.info("test with incorrect Device")
+ _, err = foo.cmd("start", code=1)
+ check.is_in("Receiving fd from Unix socket", err)
+ check.is_in("Could not connect to Unix socket", err)
+
+ log.info("test with invalid FD")
+ _, err = foo.cmd("start", "-o", "Device=-1", code=1)
+ check.is_in("Could not open", err)
+
+ log.info("create a UNIX socket to transfer FD")
+ unix = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ unix_path = tempfile.mktemp()
+ unix.bind(unix_path)
+ unix.listen(1)
+
+ foo.cmd("set", "Device", unix_path)
+ myself, him = socket.socketpair(socket.AF_UNIX)
+
+ log.info("start with empty data")
+ start_fd_server(unix, b"", him.fileno())
+ _, err = foo.cmd("start", "-o", f"Device={unix_path}", code=1)
+ check.is_in("Could not read from unix socket", err)
+
+ foo_log = foo.sub("log")
+ foo.add_script(Script.TINC_UP)
+
+ log.info("start with correct amount of data")
+ start_fd_server(unix, b" ", him.fileno())
+
+ log.info("wait for tincd to connect")
+ _, err = foo.cmd("start", "-o", f"Device={unix_path}", "--logfile", foo_log, "-d10")
+ foo[Script.TINC_UP].wait()
+ check.is_in("adapter set up", err)
+
+ log.info("send junk data and make sure tincd receives it")
+ for _ in range(10):
+ myself.send(JUNK_FRAME)
+ time.sleep(0.1)
+
+ foo.add_script(Script.TINC_DOWN)
+ foo.cmd("stop")
+ foo[Script.TINC_DOWN].wait()
+
+ check.in_file(foo_log, "Unknown IP version while reading packet from fd/")
+
+
+with Test("test FD device") as context:
+ test_device_fd(context)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test multicast device."""
+
+import os
+import socket
+import struct
+import time
+
+from testlib import check
+from testlib.log import log
+from testlib.proc import Tinc, Script
+from testlib.test import Test
+
+MCAST_ADDR = "224.15.98.12"
+PORT = 38245
+
+
+def multicast_works() -> bool:
+ """Check if multicast is supported and works."""
+
+ msg = b"foobar"
+
+ try:
+ with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server:
+ server.bind((MCAST_ADDR, PORT))
+
+ req = struct.pack("=4sl", socket.inet_aton(MCAST_ADDR), socket.INADDR_ANY)
+ server.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, req)
+
+ with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
+ client.sendto(msg, (MCAST_ADDR, PORT))
+
+ return msg == server.recv(16)
+ except OSError:
+ return False
+
+
+def test_no_mcast_support(foo: Tinc) -> None:
+ """Check that startup fails on systems without multicast support."""
+
+ code = foo.tincd("-D").wait()
+ check.failure(code)
+ check.in_file(foo.sub("log"), f"Can't bind to {MCAST_ADDR}")
+
+
+def test_rx_tx(foo: Tinc) -> None:
+ """Test sending real data to a multicast device."""
+
+ foo.start()
+ packet = os.urandom(137)
+
+ with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) as sock:
+ for _ in range(5):
+ sent = sock.sendto(packet, (MCAST_ADDR, PORT))
+ log.info("sent broken packet (%d)", sent)
+ time.sleep(0.1)
+
+ foo.add_script(Script.TINC_DOWN)
+ foo.cmd("stop")
+ foo[Script.TINC_DOWN].wait()
+
+ check.in_file(foo.sub("log"), "Read packet of 137 bytes from multicast socket")
+
+
+def test_device_multicast(ctx: Test) -> None:
+ """Test multicast device."""
+
+ foo = ctx.node(init=True)
+ foo.cmd("set", "DeviceType", "multicast")
+
+ log.info("check that multicast does not work without Device")
+ _, err = foo.cmd("start", "-D", code=1)
+ check.is_in("Device variable required for multicast socket", err)
+
+ log.info("check that Device requires a port")
+ foo.cmd("set", "Device", "localhost")
+ _, err = foo.cmd("start", "-D", code=1)
+ check.is_in("Port number required", err)
+
+ log.info("check that multicast receives data")
+ foo.cmd("set", "Device", f"{MCAST_ADDR} {PORT}")
+ foo.cmd("set", "LogLevel", "10")
+
+ if multicast_works():
+ test_rx_tx(foo)
+ else:
+ test_no_mcast_support(foo)
+
+
+with Test("test DeviceType = multicast") as context:
+ test_device_multicast(context)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test raw socket device support."""
+
+import sys
+import subprocess as subp
+
+from testlib import check, util
+from testlib.log import log
+from testlib.const import EXIT_SKIP
+from testlib.proc import Script
+from testlib.test import Test
+from testlib.external import veth_add, move_dev, ping
+
+util.require_root()
+util.require_command("ip", "link")
+
+FAKE_DEV = "cqhqdr7knaLzYeMSdy"
+
+IP_NETNS = "10.198.96.1"
+IP_HOST = "10.198.96.2"
+
+
+def test_device_raw_socket(ctx: Test) -> None:
+ """Test raw socket device."""
+
+ foo = ctx.node(init="set DeviceType raw_socket")
+ foo_log = foo.sub("log")
+
+ log.info("test with a bad Interface")
+ _, err = foo.cmd("start", "-o", f"Interface={FAKE_DEV}", code=1)
+ if "Raw socket device not supported" in err:
+ sys.exit(EXIT_SKIP)
+ check.is_in(f"Can't find interface {FAKE_DEV}", err)
+
+ log.info("create a veth pair")
+ dev0, dev1 = util.random_string(10), util.random_string(10)
+ veth_add(dev0, dev1)
+
+ log.info("configure the veth pair")
+ move_dev(dev1, dev1, f"{IP_NETNS}/30")
+ subp.run(["ip", "addr", "add", f"{IP_HOST}/30", "dev", dev0], check=True)
+ subp.run(["ip", "link", "set", dev0, "up"], check=True)
+
+ log.info("set Interface and Device")
+ foo.cmd("set", "Interface", dev0)
+ foo.cmd("set", "Device", f"dev_{dev0}")
+ foo.add_script(Script.TINC_UP)
+
+ log.info("start tincd")
+ _, err = foo.cmd("start", "--logfile", foo_log, "-d10")
+ check.is_in(f"dev_{dev0} is a raw_socket", err)
+
+ log.info("send some data to tincd interface")
+ foo[Script.TINC_UP].wait()
+ assert ping(IP_NETNS)
+
+ log.info("stop tincd")
+ foo.add_script(Script.TINC_DOWN)
+ foo.cmd("stop")
+ foo[Script.TINC_DOWN].wait()
+
+ log.info("check that tincd received some data")
+ check.in_file(foo_log, "Writing packet of")
+ check.in_file(foo_log, "Read packet of")
+
+
+with Test("test raw socket device") as context:
+ test_device_raw_socket(context)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test TAP device support."""
+
+import typing as T
+
+from testlib import check, util, cmd
+from testlib.log import log
+from testlib.proc import Script, Tinc
+from testlib.test import Test
+from testlib.external import netns_add, netns_exec, ping
+
+util.require_root()
+util.require_command("ip", "netns", "list")
+util.require_path("/dev/net/tun")
+
+IP_FOO = "10.0.0.1"
+IP_BAR = "10.0.0.2"
+IP_DUMMY = "10.0.0.3"
+
+ARP_WORKS = {
+ "router": False,
+ "hub": True,
+ "switch": True,
+}
+
+
+def make_up(node: str, address: str) -> str:
+ """Create a network configuration script."""
+ return f"""
+ import subprocess as subp
+ subp.run(['ip', 'link', 'set', 'dev', '{node}', 'netns', '{node}'], check=True)
+ subp.run(['ip', 'netns', 'exec', '{node}', 'ip', 'addr', 'add', 'dev', '{node}', '{address}/24'], check=True)
+ subp.run(['ip', 'netns', 'exec', '{node}', 'ip', 'link', 'set', '{node}', 'up'], check=True)
+ """
+
+
+def init(ctx: Test, mode: str) -> T.Tuple[Tinc, Tinc]:
+ """Configure nodes."""
+
+ stdin = f"""
+ set DeviceType tap
+ add Subnet {IP_FOO}
+ set Mode {mode}
+ """
+ foo = ctx.node(init=stdin)
+ foo.cmd("set", "Interface", foo.name)
+ netns_add(foo.name)
+
+ stdin = f"""
+ set DeviceType tap
+ add Subnet {IP_BAR}
+ set Mode {mode}
+ """
+ bar = ctx.node(init=stdin)
+ bar.cmd("set", "Interface", bar.name)
+ netns_add(bar.name)
+
+ return foo, bar
+
+
+def run_tests(ctx: Test, mode: str) -> None:
+ """Test BindToAddress or ListenAddress."""
+
+ foo, bar = init(ctx, mode)
+
+ log.info("add tinc-up scripts")
+ foo.add_script(Script.TINC_UP, make_up(foo.name, IP_FOO))
+ bar.add_script(Script.TINC_UP, make_up(bar.name, IP_BAR))
+
+ log.info("start nodes and wait for them to connect")
+ cmd.connect(foo, bar)
+
+ log.info("test ICMP")
+ assert ping(IP_FOO, bar.name)
+
+ log.info("create a dummy device for sending ARP requests")
+ netns_exec(bar.name, "ip", "link", "add", "dummy0", "type", "dummy", check=True)
+ netns_exec(bar.name, "ip", "addr", "add", IP_DUMMY, "dev", "dummy0", check=True)
+ netns_exec(bar.name, "ip", "link", "set", "dummy0", "up", check=True)
+
+ log.info("test ARP with Mode %s", mode)
+ proc = netns_exec(foo.name, "arping", "-c1", IP_DUMMY)
+ check.equals(ARP_WORKS[dev_mode], proc.returncode == 0)
+
+
+for dev_mode in "switch", "hub", "router":
+ with Test(f"test TAP device ({dev_mode})") as context:
+ run_tests(context, dev_mode)
cmd = [exe, "--help"]
log.info('testing command "%s"', cmd)
res = run(cmd, stdout=PIPE, stderr=PIPE, encoding="utf-8", timeout=10, check=False)
- check.equals(0, res.returncode)
+ check.success(res.returncode)
check.is_in("Usage:", res.stdout, res.stderr)
"""Test peer information import and export."""
-import typing as T
-
from testlib import check, cmd
from testlib.log import log
-from testlib.proc import Tinc, Script
+from testlib.proc import Script
from testlib.test import Test
-def init(ctx: Test) -> T.Tuple[Tinc, Tinc, Tinc]:
- """Initialize new test nodes."""
- foo, bar, baz = ctx.node(), ctx.node(), ctx.node()
-
- log.info("configure %s", foo.name)
- stdin = f"""
- init {foo}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- foo.cmd(stdin=stdin)
-
- log.info("configure %s", bar.name)
- stdin = f"""
- init {bar}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- bar.cmd(stdin=stdin)
-
- log.info("configure %s", baz.name)
- stdin = f"""
- init {baz}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- baz.cmd(stdin=stdin)
-
- return foo, bar, baz
-
-
def run_tests(ctx: Test) -> None:
"""Run all tests."""
- foo, bar, baz = init(ctx)
+ foo, bar, baz = ctx.node(init=True), ctx.node(init=True), ctx.node(init=True)
tinc_up = f"""
bar, baz = Tinc('{bar}'), Tinc('{baz}')
"""Test tinc peer invitations."""
+import time
+import subprocess as subp
+
from testlib import check, util
+from testlib.proc import Tinc
from testlib.log import log
from testlib.test import Test
def run_port0_test(ctx: Test) -> None:
"""Checks that tinc invite fails if called with Port 0 and tincd stopped."""
- foo = ctx.node()
- stdin = f"""
- init {foo}
- set Port 0
- set Address localhost
- set DeviceType dummy
- """
- foo.cmd(stdin=stdin)
+ foo = ctx.node(init=True)
_, err = foo.cmd("invite", "bar", code=1)
check.is_in("Please start tincd", err)
-def run_invite_test(ctx: Test, start_before_invite: bool) -> None:
- """Run tests. If start_before_invite is True,
- tincd is started *before* creating invitation, and vice versa.
- """
- foo, bar = ctx.node(), ctx.node()
+def init(ctx: Test) -> Tinc:
+ """Initialize a node."""
+ foo = ctx.node()
stdin = f"""
init {foo}
set Port 12345
set Broadcast no
"""
foo.cmd(stdin=stdin)
+ return foo
+
+
+def run_expiration_test(ctx: Test) -> None:
+ """Make sure that invites can't be used after expiration date."""
+
+ foo, bar = init(ctx), ctx.node()
+ foo.cmd("set", "InvitationExpire", "1")
+ foo.start()
+
+ url, _ = foo.cmd("invite", bar.name)
+ url = url.strip()
+ time.sleep(2)
+
+ try:
+ bar.cmd("join", url, code=1, timeout=1)
+ except subp.TimeoutExpired:
+ pass
+
+ foo.cmd("stop")
+ foo_log = util.read_text(foo.sub("log"))
+ check.is_in("tried to use expired invitation", foo_log)
+
+
+def run_invite_test(ctx: Test, start_before_invite: bool) -> None:
+ """Run tests. If start_before_invite is True,
+ tincd is started *before* creating invitation, and vice versa.
+ """
+ foo = init(ctx)
+ bar = ctx.node()
if start_before_invite:
foo.cmd("set", "Port", "0")
with Test("online mode") as context:
run_invite_test(context, start_before_invite=True)
+
+with Test("invite expiration") as context:
+ run_expiration_test(context)
def init(ctx: Test) -> T.Tuple[Tinc, Tinc]:
"""Initialize new test nodes."""
- foo, bar = ctx.node(), ctx.node()
-
- stdin = f"""
- init {foo}
- set Port 0
- set DeviceType dummy
- set Address localhost
- """
- foo.cmd(stdin=stdin)
+ foo, bar = ctx.node(init=True), ctx.node()
foo.start()
-
return foo, bar
tests = [
'address_cache.py',
'basic.py',
+ 'bind_port.py',
'cmd_dump.py',
'cmd_fsck.py',
'cmd_import.py',
'cmd_net.py',
'cmd_sign_verify.py',
'commandline.py',
+ 'device.py',
+ 'device_multicast.py',
'executables.py',
'import_export.py',
'invite.py',
'invite_tinc_up.py',
+ 'net.py',
'proxy.py',
'sandbox.py',
'scripts.py',
if os_name == 'linux'
tests += [
- 'ns_ping.py',
+ 'bind_address.py',
'compression.py',
+ 'device_raw_socket.py',
+ 'device_tap.py',
+ 'ns_ping.py',
]
+ if not opt_systemd.disabled()
+ tests += 'systemd.py'
+ endif
+endif
+
+if cdata.has('HAVE_FD_DEVICE')
+ tests += 'device_fd.py'
endif
exe_splice = executable(
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test various network-related configuration variables."""
+
+from testlib import check, cmd
+from testlib.test import Test
+
+
+def test_tunnel_server(ctx: Test, enabled: bool) -> None:
+ """Test TunnelServer."""
+
+ foo, mid, bar = (
+ ctx.node(init=True),
+ ctx.node(init=f"set TunnelServer {'yes' if enabled else 'no'}"),
+ ctx.node(init=True),
+ )
+
+ mid.start()
+
+ for peer in foo, bar:
+ cmd.exchange(peer, mid)
+ peer.cmd("add", "ConnectTo", mid.name)
+ peer.add_script(mid.script_up)
+ peer.start()
+
+ foo[mid.script_up].wait()
+ bar[mid.script_up].wait()
+
+ edge_peers = 2 if enabled else 3
+
+ check.nodes(foo, edge_peers)
+ check.nodes(mid, 3)
+ check.nodes(bar, edge_peers)
+
+
+with Test("test TunnelServer = yes") as context:
+ test_tunnel_server(context, True)
+
+with Test("test TunnelServer = no") as context:
+ test_tunnel_server(context, False)
"""Create two network namespaces and run ping between them."""
-import subprocess as subp
import typing as T
from testlib import external as ext, util, template, cmd
from testlib.log import log
from testlib.proc import Tinc, Script
from testlib.test import Test
+from testlib.external import ping
util.require_root()
util.require_command("ip", "netns", "list")
return foo, bar
-def ping(namespace: str, ip_addr: str) -> int:
- """Send pings between two network namespaces."""
- log.info("pinging node from netns %s at %s", namespace, ip_addr)
- proc = subp.run(
- ["ip", "netns", "exec", namespace, "ping", "-W1", "-c1", ip_addr], check=False
- )
-
- log.info("ping finished with code %d", proc.returncode)
- return proc.returncode
-
-
with Test("ns-ping") as context:
foo_node, bar_node = init(context)
bar_node.cmd("start")
bar_node[Script.TINC_UP].wait()
log.info("ping must not work when there is no connection")
- assert ping(foo_node.name, IP_BAR)
+ assert not ping(IP_BAR, foo_node.name)
log.info("add script foo/host-up")
bar_node.add_script(foo_node.script_up)
bar_node[foo_node.script_up].wait()
log.info("ping must work after connection is up")
- assert not ping(foo_node.name, IP_BAR)
+ assert ping(IP_BAR, foo_node.name)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Test systemd integration."""
+
+import os
+import socket
+import tempfile
+import time
+
+from testlib import check, path
+from testlib.log import log
+from testlib.feature import Feature
+from testlib.const import MAXSOCKETS
+from testlib.proc import Tinc, Script
+from testlib.test import Test
+
+
+def tincd_start_socket(foo: Tinc, pass_pid: bool) -> int:
+ """Start tincd as systemd socket activation does it."""
+
+ pid = os.fork()
+ if not pid:
+ env = {**os.environ, "LISTEN_FDS": str(MAXSOCKETS + 1)}
+ if pass_pid:
+ env["LISTEN_PID"] = str(os.getpid())
+ args = [
+ path.TINCD_PATH,
+ "-c",
+ foo.work_dir,
+ "--pidfile",
+ foo.pid_file,
+ "--logfile",
+ foo.sub("log"),
+ ]
+ assert not os.execve(path.TINCD_PATH, args, env)
+
+ assert pid > 0
+
+ _, status = os.waitpid(pid, 0)
+ assert os.WIFEXITED(status)
+ return os.WEXITSTATUS(status)
+
+
+def test_listen_fds(foo: Tinc) -> None:
+ """Test systemd socket activation."""
+
+ foo_log = foo.sub("log")
+
+ log.info("foreground tincd fails with too high LISTEN_FDS")
+ status = tincd_start_socket(foo, pass_pid=True)
+ check.failure(status)
+ check.in_file(foo_log, "Too many listening sockets")
+
+ foo.add_script(Script.TINC_UP)
+ foo.add_script(Script.TINC_DOWN)
+ os.remove(foo_log)
+
+ log.info("LISTEN_FDS is ignored without LISTEN_PID")
+ status = tincd_start_socket(foo, pass_pid=False)
+ foo[Script.TINC_UP].wait()
+ foo.cmd("stop")
+ foo[Script.TINC_DOWN].wait()
+ check.success(status)
+ check.not_in_file(foo_log, "Too many listening sockets")
+
+
+def recv_until(sock: socket.socket, want: bytes) -> None:
+ """Receive from a datagram socket until a specific value is found."""
+
+ while True:
+ msg = sock.recv(65000)
+ log.info("received %s", msg)
+ if msg == want:
+ break
+
+
+def test_watchdog(foo: Tinc) -> None:
+ """Test systemd watchdog."""
+
+ address = tempfile.mktemp()
+ foo_log = foo.sub("log")
+
+ log.info("watchdog is disabled if no env vars are passed")
+ foo.cmd("start", "--logfile", foo_log)
+ foo.cmd("stop")
+ check.in_file(foo_log, "Watchdog is disabled")
+
+ log.info("watchdog is enabled by systemd env vars")
+ foo.add_script(Script.TINC_UP)
+ foo.add_script(Script.TINC_DOWN)
+
+ with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
+ sock.bind(address)
+
+ watchdog = 0.5
+ env = {"NOTIFY_SOCKET": address, "WATCHDOG_USEC": str(int(watchdog * 1e6))}
+ proc = foo.tincd("-D", env=env)
+ recv_until(sock, b"READY=1")
+
+ for _ in range(6):
+ before = time.monotonic()
+ recv_until(sock, b"WATCHDOG=1")
+ spent = time.monotonic() - before
+ assert spent < watchdog
+
+ foo.cmd("stop")
+ recv_until(sock, b"STOPPING=1")
+
+ check.success(proc.wait())
+
+
+with Test("socket activation") as context:
+ test_listen_fds(context.node(init=True))
+
+with Test("watchdog") as context:
+ node = context.node(init=True)
+ if Feature.WATCHDOG in node.features:
+ test_watchdog(node)
raise ValueError(f'expected "{value}" to be falsy')
+def success(value: int) -> None:
+ """Check that value represents a successful exit code."""
+ if not isinstance(value, int) or value != 0:
+ raise ValueError(f'expected "{value}" to be 0', value)
+
+
+def failure(value: int) -> None:
+ """Check that value represents an unsuccessful exit code."""
+ if not isinstance(value, int) or value == 0:
+ raise ValueError(f'expected "{value}" to NOT be 0', value)
+
+
def true(value: T.Any) -> None:
"""Check that value is truthy."""
if not value:
raise ValueError(f'expected all "{haystacks}" NOT to include "{needle}"')
+def _read_content(path: T.Union[str, os.PathLike], search: T.AnyStr) -> T.AnyStr:
+ """Read text or binary content, depending on the type of search argument."""
+ if isinstance(search, str):
+ mode, enc = "r", "utf-8"
+ else:
+ mode, enc = "rb", None
+ with open(path, mode=mode, encoding=enc) as f:
+ return f.read()
+
+
+def in_file(path: T.Union[str, os.PathLike], text: T.AnyStr) -> None:
+ """Check that file contains a string."""
+ is_in(text, _read_content(path, text))
+
+
+def not_in_file(path: T.Union[str, os.PathLike], text: T.AnyStr) -> None:
+ """Check that file does not contain a string."""
+ not_in(text, _read_content(path, text))
+
+
def nodes(node, want_nodes: int) -> None:
"""Check that node can reach exactly N nodes (including itself)."""
log.debug("want %d reachable nodes from tinc %s", want_nodes, node)
stdout, _ = node.cmd("dump", "reachable", "nodes")
- equals(want_nodes, len(stdout.splitlines()))
+ lines(stdout, want_nodes)
def files_eq(path0: str, path1: str) -> None:
]
+def connect(node0: Tinc, node1: Tinc) -> ExchangeIO:
+ """Exchange configuration between nodes and start
+ them in such an order that `Port 0` works on both sides.
+ """
+ node0.add_script(node1.script_up)
+ node0.start()
+ result = exchange(node0, node1)
+ node1.add_script(node0.script_up)
+ node1.cmd("add", "ConnectTo", node0.name)
+ node1.start()
+ node0[node1.script_up].wait()
+ node1[node0.script_up].wait()
+ return result
+
+
def exchange(node0: Tinc, node1: Tinc, export_all: bool = False) -> ExchangeIO:
"""Run `export(-all) | exchange | import` between the passed nodes.
`export-all` is used if export_all is set to True.
# Do access checks on files. Disabled when not available or not applicable.
RUN_ACCESS_CHECKS = os.name != "nt" and os.geteuid() != 0
+
+# Copy of the same define from net.h
+MAXSOCKETS = 8
import typing as T
from .log import log
+from .util import random_string
_netns_created: T.Set[str] = set()
+_iface_created: T.Set[str] = set()
-def _netns_cleanup() -> None:
+def _cleanup() -> None:
for namespace in _netns_created.copy():
netns_delete(namespace)
+ # Ignore errors since device may have been moved to a different netns
+ for iface in _iface_created.copy():
+ subp.run(["ip", "link", "delete", iface], check=False)
-atexit.register(_netns_cleanup)
+
+atexit.register(_cleanup)
def _netns_action(action: str, namespace: str) -> bool:
if success:
_netns_created.add(namespace)
return success
+
+
+def netns_exec(netns: str, *args: str, check: bool = False) -> subp.CompletedProcess:
+ """Execute command in the network namespace."""
+ return subp.run(["ip", "netns", "exec", netns, *args], check=check)
+
+
+def ping(address: str, netns: T.Optional[str] = None) -> bool:
+ """Ping the address from inside the network namespace."""
+ args = ["ping", "-l1", "-W1", "-i0.1", "-c10", address]
+ if netns:
+ proc = netns_exec(netns, *args)
+ else:
+ proc = subp.run(args, check=False)
+ return proc.returncode == 0
+
+
+def move_dev(netns: str, device: str, ip_addr: str) -> None:
+ """Move device to the network namespace."""
+ if netns not in _netns_created:
+ netns_add(netns)
+ subp.run(["ip", "link", "set", device, "netns", netns], check=True)
+ netns_exec(netns, "ip", "addr", "add", ip_addr, "dev", device, check=True)
+ netns_exec(netns, "ip", "link", "set", device, "up", check=True)
+
+
+def veth_add(name0: str, name1: str) -> None:
+ """Create a veth link pair."""
+ subp.run(
+ ["ip", "link", "add", name0, "type", "veth", "peer", "name", name1], check=True
+ )
+ _iface_created.add(name0)
+
+
+def link_add(link_type: str) -> str:
+ """Create a virtual link."""
+ name = random_string(10)
+ if link_type in ("tun", "tap"):
+ subp.run(["ip", "tuntap", "add", "mode", link_type, "dev", name], check=True)
+ else:
+ subp.run(["ip", "link", "add", name, "type", link_type], check=True)
+ _iface_created.add(name)
+ return name
MINIUPNPC = "miniupnpc"
OPENSSL = "openssl"
READLINE = "readline"
- TUNEMU = "tunemu"
SANDBOX = "sandbox"
+ TUNEMU = "tunemu"
UML = "uml"
VDE = "vde"
+ WATCHDOG = "watchdog"
class Tinc:
name: str
address: str
_work_dir: str
- _pid_file: str
+ _pid: T.Optional[int]
_port: T.Optional[int]
_scripts: T.Dict[str, TincScript]
_procs: T.List[subp.Popen]
self.address = addr if addr else _rand_localhost()
self._work_dir = _make_wd(self.name)
os.makedirs(self._work_dir, exist_ok=True)
- self._pid_file = os.path.join(_TEMPDIR, f"tinc_{self.name}")
self._port = None
self._scripts = {}
self._procs = []
@property
def pid_file(self) -> str:
"""Get the path to the pid file."""
- return self._pid_file
+ return os.path.join(_TEMPDIR, f"tinc_{self.name}")
def read_port(self) -> int:
"""Read port used by tincd from its pidfile and update the _port field."""
content = f.read()
log.debug("found data %s", content)
- _, _, _, token, port = content.split()
+ pid, _, _, token, port = content.split()
check.equals("port", token)
self._port = int(port)
+ self._pid = int(pid)
return self._port
@property
assert self._port is not None
return self._port
+ @property
+ def pid(self) -> int:
+ """pid of the main tincd process."""
+ assert self._pid is not None
+ return self._pid
+
def __str__(self) -> str:
return self.name
self.add_script(Script.TINC_UP)
tinc_up = self[Script.TINC_UP]
- self.cmd(*args, "start")
+ self.cmd(*args, "start", "--logfile", self.sub("log"))
tinc_up.wait()
if new_script:
return self._port
def cmd(
- self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
+ self,
+ *args: str,
+ code: T.Optional[int] = 0,
+ stdin: T.Optional[T.AnyStr] = None,
+ timeout: T.Optional[int] = None,
) -> T.Tuple[str, str]:
"""Run command through tinc, writes `stdin` to it (if the argument is not None),
check its return code (if the argument is not None), and return (stdout, stderr).
proc = self.tinc(*args, binary=isinstance(stdin, bytes))
log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
- out, err = proc.communicate(stdin, timeout=60)
+ out, err = proc.communicate(stdin, timeout=60 if timeout is None else timeout)
res = proc.returncode
self._procs.remove(proc)
log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
self._nodes = []
self.name = name
- def node(self, addr: str = "") -> Tinc:
+ def node(self, addr: str = "", init: T.Union[str, bool] = "") -> Tinc:
"""Create a Tinc instance and remember it for termination on exit."""
node = Tinc(addr=addr)
self._nodes.append(node)
+ if init:
+ if isinstance(init, bool):
+ init = ""
+ stdin = f"""
+ init {node}
+ set Port 0
+ set Address localhost
+ set DeviceType dummy
+ {init}
+ """
+ node.cmd(stdin=stdin)
return node
def __str__(self) -> str:
"""Check that command args runs with exit code 0.
Exit with code 77 otherwise.
"""
- if subp.run(args, check=False).returncode:
- log.info('this test requires command "%s" to work', " ".join(args))
- sys.exit(EXIT_SKIP)
+ try:
+ if subp.run(args, check=False).returncode == 0:
+ return
+ except FileNotFoundError:
+ pass
+ log.info('this test requires command "%s" to work', " ".join(args))
+ sys.exit(EXIT_SKIP)
def require_path(path: str) -> None: