[PATCH 1/2] tests: factor out multicast connectivity check

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



A test may want to check multicast connectivity independent of
unicast, or check mcast without exercising unicast first.
Factor out the multicast connectivity check code into its
own function.

Signed-off-by: Thomas Pedersen <thomas@xxxxxxxxxxxx>
---
 tests/hwsim/hostapd.py       |   3 +
 tests/hwsim/hwsim_utils.py   | 160 ++++++++++++++++-------------------
 tests/hwsim/wpasupplicant.py |  10 +++
 3 files changed, 85 insertions(+), 88 deletions(-)

diff --git a/tests/hwsim/hostapd.py b/tests/hwsim/hostapd.py
index 67e8a7fb8..fac287e99 100644
--- a/tests/hwsim/hostapd.py
+++ b/tests/hwsim/hostapd.py
@@ -179,6 +179,9 @@ class Hostapd:
             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
         return self.bssid
 
+    def get_addr(self, group=False):
+        return self.own_addr()
+
     def request(self, cmd):
         logger.debug(self.dbg + ": CTRL: " + cmd)
         return self.ctrl.request(cmd)
diff --git a/tests/hwsim/hwsim_utils.py b/tests/hwsim/hwsim_utils.py
index dccdefc71..64b554425 100644
--- a/tests/hwsim/hwsim_utils.py
+++ b/tests/hwsim/hwsim_utils.py
@@ -11,32 +11,7 @@ logger = logging.getLogger()
 
 from wpasupplicant import WpaSupplicant
 
-def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
-                          ifname1=None, ifname2=None, config=True, timeout=5,
-                          multicast_to_unicast=False, broadcast=True,
-                          send_len=None):
-    addr1 = dev1.own_addr()
-    if not dev1group and isinstance(dev1, WpaSupplicant):
-        addr = dev1.get_driver_status_field('addr')
-        if addr:
-            addr1 = addr
-
-    addr2 = dev2.own_addr()
-    if not dev2group and isinstance(dev2, WpaSupplicant):
-        addr = dev2.get_driver_status_field('addr')
-        if addr:
-            addr2 = addr
-
-    dev1.dump_monitor()
-    dev2.dump_monitor()
-
-    if dev1.hostname is None and dev2.hostname is None:
-        broadcast_retry_c = 1
-    else:
-        broadcast_retry_c = 10
-
-    try:
-        if config:
+def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2):
             cmd = "DATA_TEST_CONFIG 1"
             if ifname1:
                 cmd = cmd + " ifname=" + ifname1
@@ -57,6 +32,69 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
             if "OK" not in res:
                 raise Exception("Failed to enable data test functionality")
 
+def test_mcast_connectivity(dev1, dev2, tos=None, dev1group=False, dev2group=False,
+                            ifname1=None, ifname2=None, config=True, timeout=5,
+                            send_len=None, multicast_to_unicast=False, broadcast_retry_c=1):
+    addr1 = dev1.get_addr(dev1group)
+    addr2 = dev2.get_addr(dev2group)
+
+    if config:
+        config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
+
+    cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
+    if send_len is not None:
+        cmd += " len=" + str(send_len)
+    for i in range(broadcast_retry_c):
+        try:
+            if dev1group:
+                dev1.group_request(cmd)
+            else:
+                dev1.request(cmd)
+            if dev2group:
+                ev = dev2.wait_group_event(["DATA-TEST-RX"],
+                                           timeout=timeout)
+            else:
+                ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
+            if ev is None:
+                raise Exception("dev1->dev2 broadcast data delivery failed")
+            if multicast_to_unicast:
+                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing")
+                if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)")
+            else:
+                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data result")
+            if send_len is not None:
+                if " len=" + str(send_len) not in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data length")
+            else:
+                if " len=" in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data length")
+            break
+        except Exception as e:
+            if i == broadcast_retry_c - 1:
+                raise
+
+def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
+                          ifname1=None, ifname2=None, config=True, timeout=5,
+                          multicast_to_unicast=False, broadcast=True,
+                          send_len=None):
+    addr1 = dev1.get_addr(dev1group)
+    addr2 = dev2.get_addr(dev2group)
+
+    dev1.dump_monitor()
+    dev2.dump_monitor()
+
+    if dev1.hostname is None and dev2.hostname is None:
+        broadcast_retry_c = 1
+    else:
+        broadcast_retry_c = 10
+
+    try:
+        if config:
+            config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
+
         cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
         if send_len is not None:
             cmd += " len=" + str(send_len)
@@ -80,34 +118,9 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
                 raise Exception("Unexpected dev1->dev2 unicast data length")
 
         if broadcast:
-            cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
-            if send_len is not None:
-                cmd += " len=" + str(send_len)
-            for i in range(broadcast_retry_c):
-                try:
-                    if dev1group:
-                        dev1.group_request(cmd)
-                    else:
-                        dev1.request(cmd)
-                    if dev2group:
-                        ev = dev2.wait_group_event(["DATA-TEST-RX"],
-                                                   timeout=timeout)
-                    else:
-                        ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
-                    if ev is None:
-                        raise Exception("dev1->dev2 broadcast data delivery failed")
-                    if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
-                        raise Exception("Unexpected dev1->dev2 broadcast data result")
-                    if send_len is not None:
-                        if " len=" + str(send_len) not in ev:
-                            raise Exception("Unexpected dev1->dev2 broadcast data length")
-                    else:
-                        if " len=" in ev:
-                            raise Exception("Unexpected dev1->dev2 broadcast data length")
-                    break
-                except Exception as e:
-                    if i == broadcast_retry_c - 1:
-                        raise
+            test_mcast_connectivity(dev1, dev2, tos, dev1group, dev2group,
+                                    ifname1, ifname2, False, timeout, send_len,
+                                    broadcast_retry_c=broadcast_retry_c)
 
         cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
         if send_len is not None:
@@ -132,40 +145,11 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
                 raise Exception("Unexpected dev2->dev1 unicast data length")
 
         if broadcast:
-            cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
-            if send_len is not None:
-                cmd += " len=" + str(send_len)
-            for i in range(broadcast_retry_c):
-                try:
-                    if dev2group:
-                        dev2.group_request(cmd)
-                    else:
-                        dev2.request(cmd)
-                    if dev1group:
-                        ev = dev1.wait_group_event(["DATA-TEST-RX"],
-                                                   timeout=timeout)
-                    else:
-                        ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
-                    if ev is None:
-                        raise Exception("dev2->dev1 broadcast data delivery failed")
-                    if multicast_to_unicast:
-                        if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing")
-                        if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)")
-                    else:
-                        if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data result")
-                    if send_len is not None:
-                        if " len=" + str(send_len) not in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data length")
-                    else:
-                        if " len=" in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data length")
-                    break
-                except Exception as e:
-                    if i == broadcast_retry_c - 1:
-                        raise
+            test_mcast_connectivity(dev2, dev1, tos, dev1group, dev2group,
+                                    ifname1, ifname2, False,
+                                    timeout, send_len, multicast_to_unicast,
+                                    broadcast_retry_c)
+
     finally:
         if config:
             if dev1group:
diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py
index 47bbb3a1a..93189609a 100644
--- a/tests/hwsim/wpasupplicant.py
+++ b/tests/hwsim/wpasupplicant.py
@@ -598,6 +598,16 @@ class WpaSupplicant:
             res = self.p2p_dev_addr()
         return res
 
+    def get_addr(self, group):
+        dev_addr = self.own_addr()
+        if not group:
+            addr = self.get_driver_status_field('addr')
+            if addr:
+                dev_addr = addr
+
+        return dev_addr
+
+
     def p2p_listen(self):
         return self.global_request("P2P_LISTEN")
 
-- 
2.20.1


_______________________________________________
Hostap mailing list
Hostap@xxxxxxxxxxxxxxxxxxx
http://lists.infradead.org/mailman/listinfo/hostap



[Index of Archives]     [Linux Wireless]     [Linux Kernel]     [ATH6KL]     [Linux Bluetooth]     [Linux Netdev]     [Kernel Newbies]     [IDE]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux ATA RAID]     [Samba]     [Device Mapper]

  Powered by Linux