"""Test multicast device."""
+import enum
import os
+import select
import socket
import struct
import time
PORT = 38245
-def multicast_works() -> bool:
+class MulticastSupport(enum.Enum):
+ NO = 0
+ BLOCKED = 1
+ YES = 2
+
+
+def multicast_works() -> MulticastSupport:
"""Check if multicast is supported and works."""
msg = b"foobar"
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
client.sendto(msg, (MCAST_ADDR, PORT))
- return msg == server.recv(16)
+ try:
+ select.select([server], [], [], 1)
+ if msg == server.recv(16, socket.MSG_DONTWAIT):
+ return MulticastSupport.YES
+ except OSError:
+ pass
+
+ return MulticastSupport.BLOCKED
except OSError:
- return False
+ log.info("no")
+ return MulticastSupport.NO
def test_no_mcast_support(foo: Tinc) -> None:
foo.cmd("set", "Device", f"{MCAST_ADDR} {PORT}")
foo.cmd("set", "LogLevel", "10")
- if multicast_works():
+ multicast_support = multicast_works()
+ if multicast_support == MulticastSupport.YES:
+ log.info("multicast supported")
test_rx_tx(foo)
+ elif multicast_support == MulticastSupport.BLOCKED:
+ log.info("multicast blocked")
+ pass
else:
+ log.info("multicast not supported")
test_no_mcast_support(foo)