1 """Classes for working with compiled instances of tinc and tincd binaries."""
6 import subprocess as subp
8 from platform import system
10 from . import check, path
12 from .script import TincScript, Script, ScriptType
13 from .template import make_script, make_cmd_wrap
14 from .util import random_string, random_port
16 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
17 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
20 def _make_wd(name: str) -> str:
21 work_dir = os.path.join(path.TEST_WD, "data", name)
22 os.makedirs(work_dir, exist_ok=True)
26 def _random_octet() -> int:
27 return random.randint(1, 254)
30 def _rand_localhost() -> str:
31 """Generate random IP in subnet 127.0.0.0/8 for operating systems that support
32 it without additional configuration. For all others, return 127.0.0.1.
34 if _FULL_LOCALHOST_SUBNET:
35 return f"127.{_random_octet()}.{_random_octet()}.{_random_octet()}"
40 """Optional features supported by both tinc and tincd."""
44 COMP_ZLIB = "comp_zlib"
46 JUMBOGRAMS = "jumbograms"
47 LEGACY_PROTOCOL = "legacy_protocol"
48 LIBGCRYPT = "libgcrypt"
49 MINIUPNPC = "miniupnpc"
59 """Thin wrapper around Popen that simplifies running tinc/tincd
60 binaries by passing required arguments, checking exit codes, etc.
66 _port: T.Optional[int]
67 _scripts: T.Dict[str, TincScript]
68 _procs: T.List[subp.Popen]
70 def __init__(self, name: str = "", addr: str = "") -> None:
71 self.name = name if name else random_string(10)
72 self.address = addr if addr else _rand_localhost()
73 self._work_dir = _make_wd(self.name)
78 def randomize_port(self) -> int:
79 """Use a random port for this node."""
80 self._port = random_port()
83 def read_port(self) -> int:
84 """Read port used by tincd from its pidfile and update the _port field."""
85 pidfile = self.sub("pid")
86 log.debug("reading pidfile at %s", pidfile)
88 with open(pidfile, "r", encoding="utf-8") as f:
90 log.debug("found data %s", content)
92 _, _, _, token, port = content.split()
93 check.equals("port", token)
95 self._port = int(port)
99 def port(self) -> int:
100 """Port that tincd is listening on."""
101 assert self._port is not None
104 def __str__(self) -> str:
107 def __getitem__(self, script: ScriptType) -> TincScript:
108 if isinstance(script, Script):
110 return self._scripts[script]
115 def __exit__(self, exc_type, exc_val, exc_tb):
119 def features(self) -> T.List[Feature]:
120 """List of features supported by tinc and tincd."""
121 tinc, _ = self.cmd("--version")
122 tincd, _ = self.tincd("--version").communicate(timeout=5)
123 prefix, features = "Features: ", []
125 for out in tinc, tincd:
126 for line in out.splitlines():
127 if not line.startswith(prefix):
129 tokens = line[len(prefix) :].split()
131 features.append(Feature(token))
134 log.info('supported features: "%s"', features)
138 def _common_args(self) -> T.List[str]:
148 def sub(self, *paths: str) -> str:
149 """Return path to a subdirectory within the working dir for this node."""
150 return os.path.join(self._work_dir, *paths)
153 def script_up(self) -> str:
154 """Name of the hosts/XXX-up script for this node."""
155 return f"hosts/{self.name}-up"
158 def script_down(self) -> str:
159 """Name of the hosts/XXX-down script for this node."""
160 return f"hosts/{self.name}-down"
162 def cleanup(self) -> None:
163 """Terminate all tinc and tincd processes started from this instance."""
164 log.info("running node cleanup for %s", self)
168 except (AssertionError, ValueError):
169 log.info("unsuccessfully tried to stop node %s", self)
171 for proc in self._procs:
172 if proc.returncode is not None:
173 log.debug("PID %d exited, skipping", proc.pid)
175 log.info("PID %d still running, stopping", proc.pid)
178 except OSError as ex:
179 log.error("could not kill PID %d", proc.pid, exc_info=ex)
181 log.debug("waiting on %d to prevent zombies", proc.pid)
184 except OSError as ex:
185 log.error("waiting on %d failed", proc.pid, exc_info=ex)
189 def start(self, *args: str) -> int:
190 """Start the node, wait for it to call tinc-up, and get the port it's
191 listening on from the pid file. Don't use this method unless you need
192 to know the port tincd is running on. Call .cmd("start"), it's faster.
194 Reading pidfile and setting the port cannot be done from tinc-up because
195 you can't send tinc commands to yourself there — the daemon doesn't
196 respond to them until tinc-up is finished. The port field on this Tinc
197 instance is updated to reflect the correct port. If tinc-up is missing,
198 this command creates a new one, and then disables it.
200 new_script = Script.TINC_UP.name not in self._scripts
202 self.add_script(Script.TINC_UP)
204 tinc_up = self[Script.TINC_UP]
205 self.cmd(*args, "start")
211 self._port = self.read_port()
212 self.cmd("set", "Port", str(self._port))
217 self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
218 ) -> T.Tuple[str, str]:
219 """Run command through tinc, writes `stdin` to it (if the argument is not None),
220 check its return code (if the argument is not None), and return (stdout, stderr).
222 proc = self.tinc(*args, binary=isinstance(stdin, bytes))
223 log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
225 out, err = proc.communicate(stdin, timeout=60)
226 res = proc.returncode
227 self._procs.remove(proc)
228 log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
231 check.equals(code, res)
233 return out if out else "", err if err else ""
235 def tinc(self, *args: str, binary=False) -> subp.Popen:
236 """Start tinc with the specified arguments."""
237 args = tuple(filter(bool, args))
238 cmd = [path.TINC_PATH, *self._common_args, *args]
239 log.debug('starting tinc %s: "%s"', self.name, " ".join(cmd))
240 # pylint: disable=consider-using-with
247 encoding=None if binary else "utf-8",
249 self._procs.append(proc)
252 def tincd(self, *args: str, env: T.Optional[T.Dict[str, str]] = None) -> subp.Popen:
253 """Start tincd with the specified arguments."""
254 args = tuple(filter(bool, args))
263 log.debug('starting tincd %s: "%s"', self.name, " ".join(cmd))
265 env = {**os.environ, **env}
266 # pylint: disable=consider-using-with
276 self._procs.append(proc)
279 def add_script(self, script: ScriptType, source: str = "") -> TincScript:
280 """Create a script with the passed Python source code.
281 The source must either be empty, or start indentation with 4 spaces.
282 If the source is empty, the created script can be used to receive notifications.
284 rel_path = script if isinstance(script, str) else script.value
285 check.not_in(rel_path, self._scripts)
287 full_path = os.path.join(self._work_dir, rel_path)
288 tinc_script = TincScript(self.name, rel_path, full_path)
290 log.debug("creating script %s at %s", script, full_path)
291 with open(full_path, "w", encoding="utf-8") as f:
292 content = make_script(self.name, rel_path, source)
296 log.debug("creating .cmd script wrapper at %s", full_path)
297 win_content = make_cmd_wrap(full_path)
298 with open(f"{full_path}.cmd", "w", encoding="utf-8") as f:
301 os.chmod(full_path, 0o755)
303 if isinstance(script, Script):
304 self._scripts[script.name] = tinc_script
305 self._scripts[rel_path] = tinc_script