1 """Wrappers for running external commands."""
3 import subprocess as subp
9 _netns_created: T.Set[str] = set()
12 def _netns_cleanup() -> None:
13 for namespace in _netns_created.copy():
14 netns_delete(namespace)
17 atexit.register(_netns_cleanup)
20 def _netns_action(action: str, namespace: str) -> bool:
21 log.debug("%s network namespace %s", action, namespace)
23 res = subp.run(["ip", "netns", action, namespace], check=False)
25 log.error("could not %s netns %s", action, namespace)
27 log.debug("OK %s netns %s", action, namespace)
29 return not res.returncode
32 def netns_delete(namespace: str) -> bool:
33 """Remove a previously created network namespace."""
34 success = _netns_action("delete", namespace)
36 _netns_created.remove(namespace)
40 def netns_add(namespace: str) -> bool:
41 """Add a network namespace (which can be removed manually or automatically at exit)."""
42 success = _netns_action("add", namespace)
44 _netns_created.add(namespace)