1 """Classes for working with compiled instances of tinc and tincd binaries."""
6 import subprocess as subp
8 from platform import system
10 from . import check, path
12 from .script import TincScript, Script, ScriptType
13 from .template import make_script, make_cmd_wrap
14 from .util import random_string, random_port
16 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
17 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
20 def _make_wd(name: str) -> str:
21 work_dir = os.path.join(path.TEST_WD, "data", name)
22 os.makedirs(work_dir, exist_ok=True)
26 def _random_octet() -> int:
27 return random.randint(1, 254)
30 def _rand_localhost() -> str:
31 """Generate random IP in subnet 127.0.0.0/8 for operating systems that support
32 it without additional configuration. For all others, return 127.0.0.1.
34 if _FULL_LOCALHOST_SUBNET:
35 return f"127.{_random_octet()}.{_random_octet()}.{_random_octet()}"
40 """Optional features supported by both tinc and tincd."""
44 COMP_ZLIB = "comp_zlib"
46 JUMBOGRAMS = "jumbograms"
47 LEGACY_PROTOCOL = "legacy_protocol"
48 LIBGCRYPT = "libgcrypt"
49 MINIUPNPC = "miniupnpc"
59 """Thin wrapper around Popen that simplifies running tinc/tincd
60 binaries by passing required arguments, checking exit codes, etc.
66 _port: T.Optional[int]
67 _scripts: T.Dict[str, TincScript]
68 _procs: T.List[subp.Popen]
70 def __init__(self, name: str = "", addr: str = "") -> None:
71 self.name = name if name else random_string(10)
72 self.address = addr if addr else _rand_localhost()
73 self._work_dir = _make_wd(self.name)
78 def randomize_port(self) -> int:
79 """Use a random port for this node."""
80 self._port = random_port()
83 def read_port(self) -> int:
84 """Read port used by tincd from its pidfile and update the _port field."""
85 pidfile = self.sub("pid")
86 log.debug("reading pidfile at %s", pidfile)
88 with open(pidfile, "r", encoding="utf-8") as f:
90 log.debug("found data %s", content)
92 _, _, _, token, port = content.split()
93 check.equals("port", token)
95 self._port = int(port)
99 def port(self) -> int:
100 """Port that tincd is listening on."""
101 assert self._port is not None
104 def __str__(self) -> str:
107 def __getitem__(self, script: ScriptType) -> TincScript:
108 if isinstance(script, Script):
110 return self._scripts[script]
115 def __exit__(self, exc_type, exc_val, exc_tb):
119 def features(self) -> T.List[Feature]:
120 """List of features supported by tinc and tincd."""
121 tinc, _ = self.cmd("--version")
122 tincd, _ = self.tincd("--version").communicate(timeout=5)
123 prefix, features = "Features: ", []
125 for out in tinc, tincd:
126 for line in out.splitlines():
127 if not line.startswith(prefix):
129 tokens = line[len(prefix) :].split()
131 features.append(Feature(token))
134 log.info('supported features: "%s"', features)
138 def _common_args(self) -> T.List[str]:
148 def sub(self, *paths: str) -> str:
149 """Return path to a subdirectory within the working dir for this node."""
150 return os.path.join(self._work_dir, *paths)
154 """Node's working directory."""
155 return self._work_dir
158 def script_up(self) -> str:
159 """Name of the hosts/XXX-up script for this node."""
160 return f"hosts/{self.name}-up"
163 def script_down(self) -> str:
164 """Name of the hosts/XXX-down script for this node."""
165 return f"hosts/{self.name}-down"
167 def cleanup(self) -> None:
168 """Terminate all tinc and tincd processes started from this instance."""
169 log.info("running node cleanup for %s", self)
173 except (AssertionError, ValueError):
174 log.info("unsuccessfully tried to stop node %s", self)
176 for proc in self._procs:
177 if proc.returncode is not None:
178 log.debug("PID %d exited, skipping", proc.pid)
180 log.info("PID %d still running, stopping", proc.pid)
183 except OSError as ex:
184 log.error("could not kill PID %d", proc.pid, exc_info=ex)
186 log.debug("waiting on %d to prevent zombies", proc.pid)
189 except OSError as ex:
190 log.error("waiting on %d failed", proc.pid, exc_info=ex)
194 def start(self, *args: str) -> int:
195 """Start the node, wait for it to call tinc-up, and get the port it's
196 listening on from the pid file. Don't use this method unless you need
197 to know the port tincd is running on. Call .cmd("start"), it's faster.
199 Reading pidfile and setting the port cannot be done from tinc-up because
200 you can't send tinc commands to yourself there — the daemon doesn't
201 respond to them until tinc-up is finished. The port field on this Tinc
202 instance is updated to reflect the correct port. If tinc-up is missing,
203 this command creates a new one, and then disables it.
205 new_script = Script.TINC_UP.name not in self._scripts
207 self.add_script(Script.TINC_UP)
209 tinc_up = self[Script.TINC_UP]
210 self.cmd(*args, "start")
216 self._port = self.read_port()
217 self.cmd("set", "Port", str(self._port))
222 self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
223 ) -> T.Tuple[str, str]:
224 """Run command through tinc, writes `stdin` to it (if the argument is not None),
225 check its return code (if the argument is not None), and return (stdout, stderr).
227 proc = self.tinc(*args, binary=isinstance(stdin, bytes))
228 log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
230 out, err = proc.communicate(stdin, timeout=60)
231 res = proc.returncode
232 self._procs.remove(proc)
233 log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
236 check.equals(code, res)
238 return out if out else "", err if err else ""
240 def tinc(self, *args: str, binary=False) -> subp.Popen:
241 """Start tinc with the specified arguments."""
242 args = tuple(filter(bool, args))
243 cmd = [path.TINC_PATH, *self._common_args, *args]
244 log.debug('starting tinc %s: "%s"', self.name, " ".join(cmd))
245 # pylint: disable=consider-using-with
252 encoding=None if binary else "utf-8",
254 self._procs.append(proc)
257 def tincd(self, *args: str, env: T.Optional[T.Dict[str, str]] = None) -> subp.Popen:
258 """Start tincd with the specified arguments."""
259 args = tuple(filter(bool, args))
268 log.debug('starting tincd %s: "%s"', self.name, " ".join(cmd))
270 env = {**os.environ, **env}
271 # pylint: disable=consider-using-with
281 self._procs.append(proc)
284 def add_script(self, script: ScriptType, source: str = "") -> TincScript:
285 """Create a script with the passed Python source code.
286 The source must either be empty, or start indentation with 4 spaces.
287 If the source is empty, the created script can be used to receive notifications.
289 rel_path = script if isinstance(script, str) else script.value
290 check.not_in(rel_path, self._scripts)
292 full_path = os.path.join(self._work_dir, rel_path)
293 tinc_script = TincScript(self.name, rel_path, full_path)
295 log.debug("creating script %s at %s", script, full_path)
296 with open(full_path, "w", encoding="utf-8") as f:
297 content = make_script(self.name, rel_path, source)
301 log.debug("creating .cmd script wrapper at %s", full_path)
302 win_content = make_cmd_wrap(full_path)
303 with open(f"{full_path}.cmd", "w", encoding="utf-8") as f:
306 os.chmod(full_path, 0o755)
308 if isinstance(script, Script):
309 self._scripts[script.name] = tinc_script
310 self._scripts[rel_path] = tinc_script