1 """Classes for working with compiled instances of tinc and tincd binaries."""
6 import subprocess as subp
8 from platform import system
10 from . import check, path
12 from .script import TincScript, Script, ScriptType
13 from .template import make_script, make_cmd_wrap
14 from .util import random_string, random_port
16 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
17 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
20 def _make_wd(name: str) -> str:
21 work_dir = os.path.join(path.TEST_WD, "data", name)
22 os.makedirs(work_dir, exist_ok=True)
26 def _random_octet() -> int:
27 return random.randint(1, 254)
30 def _rand_localhost() -> str:
31 """Generate random IP in subnet 127.0.0.0/8 for operating systems that support
32 it without additional configuration. For all others, return 127.0.0.1.
34 if _FULL_LOCALHOST_SUBNET:
35 return f"127.{_random_octet()}.{_random_octet()}.{_random_octet()}"
40 """Optional features supported by both tinc and tincd."""
44 COMP_ZLIB = "comp_zlib"
46 JUMBOGRAMS = "jumbograms"
47 LEGACY_PROTOCOL = "legacy_protocol"
48 LIBGCRYPT = "libgcrypt"
49 MINIUPNPC = "miniupnpc"
58 """Thin wrapper around Popen that simplifies running tinc/tincd
59 binaries by passing required arguments, checking exit codes, etc.
65 _port: T.Optional[int]
66 _scripts: T.Dict[str, TincScript]
67 _procs: T.List[subp.Popen]
69 def __init__(self, name: str = "", addr: str = "") -> None:
70 self.name = name if name else random_string(10)
71 self.address = addr if addr else _rand_localhost()
72 self._work_dir = _make_wd(self.name)
77 def randomize_port(self) -> int:
78 """Use a random port for this node."""
79 self._port = random_port()
82 def read_port(self) -> int:
83 """Read port used by tincd from its pidfile and update the _port field."""
84 pidfile = self.sub("pid")
85 log.debug("reading pidfile at %s", pidfile)
87 with open(pidfile, "r", encoding="utf-8") as f:
89 log.debug("found data %s", content)
91 _, _, _, token, port = content.split()
92 check.equals("port", token)
94 self._port = int(port)
98 def port(self) -> int:
99 """Port that tincd is listening on."""
100 assert self._port is not None
103 def __str__(self) -> str:
106 def __getitem__(self, script: ScriptType) -> TincScript:
107 if isinstance(script, Script):
109 return self._scripts[script]
114 def __exit__(self, exc_type, exc_val, exc_tb):
118 def features(self) -> T.List[Feature]:
119 """List of features supported by tinc and tincd."""
120 tinc, _ = self.cmd("--version")
121 tincd, _ = self.tincd("--version").communicate(timeout=5)
122 prefix, features = "Features: ", []
124 for out in tinc, tincd:
125 for line in out.splitlines():
126 if not line.startswith(prefix):
128 tokens = line[len(prefix) :].split()
130 features.append(Feature(token))
133 log.info('supported features: "%s"', features)
137 def _common_args(self) -> T.List[str]:
147 def sub(self, *paths: str) -> str:
148 """Return path to a subdirectory within the working dir for this node."""
149 return os.path.join(self._work_dir, *paths)
152 def script_up(self) -> str:
153 """Name of the hosts/XXX-up script for this node."""
154 return f"hosts/{self.name}-up"
157 def script_down(self) -> str:
158 """Name of the hosts/XXX-down script for this node."""
159 return f"hosts/{self.name}-down"
161 def cleanup(self) -> None:
162 """Terminate all tinc and tincd processes started from this instance."""
163 log.info("running node cleanup for %s", self)
167 except (AssertionError, ValueError):
168 log.info("unsuccessfully tried to stop node %s", self)
170 for proc in self._procs:
171 if proc.returncode is not None:
172 log.debug("PID %d exited, skipping", proc.pid)
174 log.info("PID %d still running, stopping", proc.pid)
177 except OSError as ex:
178 log.error("could not kill PID %d", proc.pid, exc_info=ex)
180 log.debug("waiting on %d to prevent zombies", proc.pid)
183 except OSError as ex:
184 log.error("waiting on %d failed", proc.pid, exc_info=ex)
188 def start(self, *args: str) -> int:
189 """Start the node, wait for it to call tinc-up, and get the port it's
190 listening on from the pid file. Don't use this method unless you need
191 to know the port tincd is running on. Call .cmd("start"), it's faster.
193 Reading pidfile and setting the port cannot be done from tinc-up because
194 you can't send tinc commands to yourself there — the daemon doesn't
195 respond to them until tinc-up is finished. The port field on this Tinc
196 instance is updated to reflect the correct port. If tinc-up is missing,
197 this command creates a new one, and then disables it.
199 new_script = Script.TINC_UP.name not in self._scripts
201 self.add_script(Script.TINC_UP)
203 tinc_up = self[Script.TINC_UP]
204 self.cmd(*args, "start")
210 self._port = self.read_port()
211 self.cmd("set", "Port", str(self._port))
216 self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[str] = None
217 ) -> T.Tuple[str, str]:
218 """Run command through tinc, writes `stdin` to it (if the argument is not None),
219 check its return code (if the argument is not None), and return (stdout, stderr).
221 proc = self.tinc(*args)
222 log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
224 out, err = proc.communicate(stdin, timeout=60)
225 res = proc.returncode
226 self._procs.remove(proc)
227 log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
230 check.equals(code, res)
232 return out if out else "", err if err else ""
234 def tinc(self, *args: str) -> subp.Popen:
235 """Start tinc with the specified arguments."""
236 args = tuple(filter(bool, args))
237 cmd = [path.TINC_PATH, *self._common_args, *args]
238 log.debug('starting tinc %s: "%s"', self.name, " ".join(cmd))
239 # pylint: disable=consider-using-with
248 self._procs.append(proc)
251 def tincd(self, *args: str, env: T.Optional[T.Dict[str, str]] = None) -> subp.Popen:
252 """Start tincd with the specified arguments."""
253 args = tuple(filter(bool, args))
262 log.debug('starting tincd %s: "%s"', self.name, " ".join(cmd))
264 env = {**os.environ, **env}
265 # pylint: disable=consider-using-with
275 self._procs.append(proc)
278 def add_script(self, script: ScriptType, source: str = "") -> TincScript:
279 """Create a script with the passed Python source code.
280 The source must either be empty, or start indentation with 4 spaces.
281 If the source is empty, the created script can be used to receive notifications.
283 rel_path = script if isinstance(script, str) else script.value
284 check.not_in(rel_path, self._scripts)
286 full_path = os.path.join(self._work_dir, rel_path)
287 tinc_script = TincScript(self.name, rel_path, full_path)
289 log.debug("creating script %s at %s", script, full_path)
290 with open(full_path, "w", encoding="utf-8") as f:
291 content = make_script(self.name, rel_path, source)
295 log.debug("creating .cmd script wrapper at %s", full_path)
296 win_content = make_cmd_wrap(full_path)
297 with open(f"{full_path}.cmd", "w", encoding="utf-8") as f:
300 os.chmod(full_path, 0o755)
302 if isinstance(script, Script):
303 self._scripts[script.name] = tinc_script
304 self._scripts[rel_path] = tinc_script