1 """Classes for working with compiled instances of tinc and tincd binaries."""
7 import subprocess as subp
9 from platform import system
11 from . import check, path
13 from .script import TincScript, Script, ScriptType
14 from .template import make_script, make_cmd_wrap
15 from .util import random_string, random_port
17 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
18 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
20 # Path to the system temporary directory.
21 _TEMPDIR = tempfile.gettempdir()
24 def _make_wd(name: str) -> str:
25 work_dir = os.path.join(path.TEST_WD, "data", name)
26 os.makedirs(work_dir, exist_ok=True)
30 def _random_octet() -> int:
31 return random.randint(1, 254)
34 def _rand_localhost() -> str:
35 """Generate random IP in subnet 127.0.0.0/8 for operating systems that support
36 it without additional configuration. For all others, return 127.0.0.1.
38 if _FULL_LOCALHOST_SUBNET:
39 return f"127.{_random_octet()}.{_random_octet()}.{_random_octet()}"
44 """Optional features supported by both tinc and tincd."""
48 COMP_ZLIB = "comp_zlib"
50 JUMBOGRAMS = "jumbograms"
51 LEGACY_PROTOCOL = "legacy_protocol"
52 LIBGCRYPT = "libgcrypt"
53 MINIUPNPC = "miniupnpc"
64 """Thin wrapper around Popen that simplifies running tinc/tincd
65 binaries by passing required arguments, checking exit codes, etc.
72 _port: T.Optional[int]
73 _scripts: T.Dict[str, TincScript]
74 _procs: T.List[subp.Popen]
76 def __init__(self, name: str = "", addr: str = "") -> None:
77 self.name = name if name else random_string(10)
78 self.address = addr if addr else _rand_localhost()
79 self._work_dir = _make_wd(self.name)
80 os.makedirs(self._work_dir, exist_ok=True)
85 def randomize_port(self) -> int:
86 """Use a random port for this node."""
87 self._port = random_port()
91 def pid_file(self) -> str:
92 """Get the path to the pid file."""
93 return os.path.join(_TEMPDIR, f"tinc_{self.name}")
95 def read_port(self) -> int:
96 """Read port used by tincd from its pidfile and update the _port field."""
97 log.debug("reading pidfile at %s", self.pid_file)
99 with open(self.pid_file, "r", encoding="utf-8") as f:
101 log.debug("found data %s", content)
103 pid, _, _, token, port = content.split()
104 check.equals("port", token)
106 self._port = int(port)
111 def port(self) -> int:
112 """Port that tincd is listening on."""
113 assert self._port is not None
117 def pid(self) -> int:
118 """pid of the main tincd process."""
119 assert self._pid is not None
122 def __str__(self) -> str:
125 def __getitem__(self, script: ScriptType) -> TincScript:
126 if isinstance(script, Script):
128 return self._scripts[script]
133 def __exit__(self, exc_type, exc_val, exc_tb):
137 def features(self) -> T.List[Feature]:
138 """List of features supported by tinc and tincd."""
139 tinc, _ = self.cmd("--version")
140 tincd, _ = self.tincd("--version").communicate(timeout=5)
141 prefix, features = "Features: ", []
143 for out in tinc, tincd:
144 for line in out.splitlines():
145 if not line.startswith(prefix):
147 tokens = line[len(prefix) :].split()
149 features.append(Feature(token))
152 log.info('supported features: "%s"', features)
156 def _common_args(self) -> T.List[str]:
166 def sub(self, *paths: str) -> str:
167 """Return path to a subdirectory within the working dir for this node."""
168 return os.path.join(self._work_dir, *paths)
172 """Node's working directory."""
173 return self._work_dir
176 def script_up(self) -> str:
177 """Name of the hosts/XXX-up script for this node."""
178 return f"hosts/{self.name}-up"
181 def script_down(self) -> str:
182 """Name of the hosts/XXX-down script for this node."""
183 return f"hosts/{self.name}-down"
185 def cleanup(self) -> None:
186 """Terminate all tinc and tincd processes started from this instance."""
187 log.info("running node cleanup for %s", self)
191 except (AssertionError, ValueError):
192 log.info("unsuccessfully tried to stop node %s", self)
194 for proc in self._procs:
195 if proc.returncode is not None:
196 log.debug("PID %d exited, skipping", proc.pid)
198 log.info("PID %d still running, stopping", proc.pid)
201 except OSError as ex:
202 log.error("could not kill PID %d", proc.pid, exc_info=ex)
204 log.debug("waiting on %d to prevent zombies", proc.pid)
207 except OSError as ex:
208 log.error("waiting on %d failed", proc.pid, exc_info=ex)
212 def start(self, *args: str) -> int:
213 """Start the node, wait for it to call tinc-up, and get the port it's
214 listening on from the pid file. Don't use this method unless you need
215 to know the port tincd is running on. Call .cmd("start"), it's faster.
217 Reading pidfile and setting the port cannot be done from tinc-up because
218 you can't send tinc commands to yourself there — the daemon doesn't
219 respond to them until tinc-up is finished. The port field on this Tinc
220 instance is updated to reflect the correct port. If tinc-up is missing,
221 this command creates a new one, and then disables it.
223 new_script = Script.TINC_UP.name not in self._scripts
225 self.add_script(Script.TINC_UP)
227 tinc_up = self[Script.TINC_UP]
228 self.cmd(*args, "start", "--logfile", self.sub("log"))
234 self._port = self.read_port()
235 self.cmd("set", "Port", str(self._port))
242 code: T.Optional[int] = 0,
243 stdin: T.Optional[T.AnyStr] = None,
244 timeout: T.Optional[int] = None,
245 ) -> T.Tuple[str, str]:
246 """Run command through tinc, writes `stdin` to it (if the argument is not None),
247 check its return code (if the argument is not None), and return (stdout, stderr).
249 proc = self.tinc(*args, binary=isinstance(stdin, bytes))
250 log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
252 out, err = proc.communicate(stdin, timeout=60 if timeout is None else timeout)
253 res = proc.returncode
254 self._procs.remove(proc)
255 log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
258 check.equals(code, res)
260 return out if out else "", err if err else ""
262 def tinc(self, *args: str, binary=False) -> subp.Popen:
263 """Start tinc with the specified arguments."""
264 args = tuple(filter(bool, args))
265 cmd = [path.TINC_PATH, *self._common_args, *args]
266 log.debug('starting tinc %s: "%s"', self.name, " ".join(cmd))
267 # pylint: disable=consider-using-with
274 encoding=None if binary else "utf-8",
276 self._procs.append(proc)
279 def tincd(self, *args: str, env: T.Optional[T.Dict[str, str]] = None) -> subp.Popen:
280 """Start tincd with the specified arguments."""
281 args = tuple(filter(bool, args))
290 log.debug('starting tincd %s: "%s"', self.name, " ".join(cmd))
292 env = {**os.environ, **env}
293 # pylint: disable=consider-using-with
303 self._procs.append(proc)
306 def add_script(self, script: ScriptType, source: str = "") -> TincScript:
307 """Create a script with the passed Python source code.
308 The source must either be empty, or start indentation with 4 spaces.
309 If the source is empty, the created script can be used to receive notifications.
311 rel_path = script if isinstance(script, str) else script.value
312 check.not_in(rel_path, self._scripts)
314 full_path = os.path.join(self._work_dir, rel_path)
315 tinc_script = TincScript(self.name, rel_path, full_path)
317 log.debug("creating script %s at %s", script, full_path)
318 with open(full_path, "w", encoding="utf-8") as f:
319 content = make_script(self.name, rel_path, source)
323 log.debug("creating .cmd script wrapper at %s", full_path)
324 win_content = make_cmd_wrap(full_path)
325 with open(f"{full_path}.cmd", "w", encoding="utf-8") as f:
328 os.chmod(full_path, 0o755)
330 if isinstance(script, Script):
331 self._scripts[script.name] = tinc_script
332 self._scripts[rel_path] = tinc_script