1 """Classes for working with compiled instances of tinc and tincd binaries."""
7 import subprocess as subp
9 from platform import system
11 from . import check, path
13 from .script import TincScript, Script, ScriptType
14 from .template import make_script, make_cmd_wrap
15 from .util import random_string, random_port
17 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
18 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
20 # Path to the system temporary directory.
21 _TEMPDIR = tempfile.gettempdir()
24 def _make_wd(name: str) -> str:
25 work_dir = os.path.join(path.TEST_WD, "data", name)
26 os.makedirs(work_dir, exist_ok=True)
30 def _random_octet() -> int:
31 return random.randint(1, 254)
34 def _rand_localhost() -> str:
35 """Generate random IP in subnet 127.0.0.0/8 for operating systems that support
36 it without additional configuration. For all others, return 127.0.0.1.
38 if _FULL_LOCALHOST_SUBNET:
39 return f"127.{_random_octet()}.{_random_octet()}.{_random_octet()}"
44 """Optional features supported by both tinc and tincd."""
48 COMP_ZLIB = "comp_zlib"
50 JUMBOGRAMS = "jumbograms"
51 LEGACY_PROTOCOL = "legacy_protocol"
52 LIBGCRYPT = "libgcrypt"
53 MINIUPNPC = "miniupnpc"
63 """Thin wrapper around Popen that simplifies running tinc/tincd
64 binaries by passing required arguments, checking exit codes, etc.
71 _port: T.Optional[int]
72 _scripts: T.Dict[str, TincScript]
73 _procs: T.List[subp.Popen]
75 def __init__(self, name: str = "", addr: str = "") -> None:
76 self.name = name if name else random_string(10)
77 self.address = addr if addr else _rand_localhost()
78 self._work_dir = _make_wd(self.name)
79 os.makedirs(self._work_dir, exist_ok=True)
80 self._pid_file = os.path.join(_TEMPDIR, f"tinc_{self.name}")
85 def randomize_port(self) -> int:
86 """Use a random port for this node."""
87 self._port = random_port()
91 def pid_file(self) -> str:
92 """Get the path to the pid file."""
95 def read_port(self) -> int:
96 """Read port used by tincd from its pidfile and update the _port field."""
97 log.debug("reading pidfile at %s", self.pid_file)
99 with open(self.pid_file, "r", encoding="utf-8") as f:
101 log.debug("found data %s", content)
103 _, _, _, token, port = content.split()
104 check.equals("port", token)
106 self._port = int(port)
110 def port(self) -> int:
111 """Port that tincd is listening on."""
112 assert self._port is not None
115 def __str__(self) -> str:
118 def __getitem__(self, script: ScriptType) -> TincScript:
119 if isinstance(script, Script):
121 return self._scripts[script]
126 def __exit__(self, exc_type, exc_val, exc_tb):
130 def features(self) -> T.List[Feature]:
131 """List of features supported by tinc and tincd."""
132 tinc, _ = self.cmd("--version")
133 tincd, _ = self.tincd("--version").communicate(timeout=5)
134 prefix, features = "Features: ", []
136 for out in tinc, tincd:
137 for line in out.splitlines():
138 if not line.startswith(prefix):
140 tokens = line[len(prefix) :].split()
142 features.append(Feature(token))
145 log.info('supported features: "%s"', features)
149 def _common_args(self) -> T.List[str]:
159 def sub(self, *paths: str) -> str:
160 """Return path to a subdirectory within the working dir for this node."""
161 return os.path.join(self._work_dir, *paths)
165 """Node's working directory."""
166 return self._work_dir
169 def script_up(self) -> str:
170 """Name of the hosts/XXX-up script for this node."""
171 return f"hosts/{self.name}-up"
174 def script_down(self) -> str:
175 """Name of the hosts/XXX-down script for this node."""
176 return f"hosts/{self.name}-down"
178 def cleanup(self) -> None:
179 """Terminate all tinc and tincd processes started from this instance."""
180 log.info("running node cleanup for %s", self)
184 except (AssertionError, ValueError):
185 log.info("unsuccessfully tried to stop node %s", self)
187 for proc in self._procs:
188 if proc.returncode is not None:
189 log.debug("PID %d exited, skipping", proc.pid)
191 log.info("PID %d still running, stopping", proc.pid)
194 except OSError as ex:
195 log.error("could not kill PID %d", proc.pid, exc_info=ex)
197 log.debug("waiting on %d to prevent zombies", proc.pid)
200 except OSError as ex:
201 log.error("waiting on %d failed", proc.pid, exc_info=ex)
205 def start(self, *args: str) -> int:
206 """Start the node, wait for it to call tinc-up, and get the port it's
207 listening on from the pid file. Don't use this method unless you need
208 to know the port tincd is running on. Call .cmd("start"), it's faster.
210 Reading pidfile and setting the port cannot be done from tinc-up because
211 you can't send tinc commands to yourself there — the daemon doesn't
212 respond to them until tinc-up is finished. The port field on this Tinc
213 instance is updated to reflect the correct port. If tinc-up is missing,
214 this command creates a new one, and then disables it.
216 new_script = Script.TINC_UP.name not in self._scripts
218 self.add_script(Script.TINC_UP)
220 tinc_up = self[Script.TINC_UP]
221 self.cmd(*args, "start")
227 self._port = self.read_port()
228 self.cmd("set", "Port", str(self._port))
233 self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
234 ) -> T.Tuple[str, str]:
235 """Run command through tinc, writes `stdin` to it (if the argument is not None),
236 check its return code (if the argument is not None), and return (stdout, stderr).
238 proc = self.tinc(*args, binary=isinstance(stdin, bytes))
239 log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
241 out, err = proc.communicate(stdin, timeout=60)
242 res = proc.returncode
243 self._procs.remove(proc)
244 log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
247 check.equals(code, res)
249 return out if out else "", err if err else ""
251 def tinc(self, *args: str, binary=False) -> subp.Popen:
252 """Start tinc with the specified arguments."""
253 args = tuple(filter(bool, args))
254 cmd = [path.TINC_PATH, *self._common_args, *args]
255 log.debug('starting tinc %s: "%s"', self.name, " ".join(cmd))
256 # pylint: disable=consider-using-with
263 encoding=None if binary else "utf-8",
265 self._procs.append(proc)
268 def tincd(self, *args: str, env: T.Optional[T.Dict[str, str]] = None) -> subp.Popen:
269 """Start tincd with the specified arguments."""
270 args = tuple(filter(bool, args))
279 log.debug('starting tincd %s: "%s"', self.name, " ".join(cmd))
281 env = {**os.environ, **env}
282 # pylint: disable=consider-using-with
292 self._procs.append(proc)
295 def add_script(self, script: ScriptType, source: str = "") -> TincScript:
296 """Create a script with the passed Python source code.
297 The source must either be empty, or start indentation with 4 spaces.
298 If the source is empty, the created script can be used to receive notifications.
300 rel_path = script if isinstance(script, str) else script.value
301 check.not_in(rel_path, self._scripts)
303 full_path = os.path.join(self._work_dir, rel_path)
304 tinc_script = TincScript(self.name, rel_path, full_path)
306 log.debug("creating script %s at %s", script, full_path)
307 with open(full_path, "w", encoding="utf-8") as f:
308 content = make_script(self.name, rel_path, source)
312 log.debug("creating .cmd script wrapper at %s", full_path)
313 win_content = make_cmd_wrap(full_path)
314 with open(f"{full_path}.cmd", "w", encoding="utf-8") as f:
317 os.chmod(full_path, 0o755)
319 if isinstance(script, Script):
320 self._scripts[script.name] = tinc_script
321 self._scripts[rel_path] = tinc_script