Signed-off-by: Masashi Honma <masashi.honma@xxxxxxxxx> --- tests/hwsim/test_wpas_mesh.py | 47 ++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/tests/hwsim/test_wpas_mesh.py b/tests/hwsim/test_wpas_mesh.py index 45317d0d3..a44d393d4 100644 --- a/tests/hwsim/test_wpas_mesh.py +++ b/tests/hwsim/test_wpas_mesh.py @@ -11,6 +11,7 @@ import struct import subprocess import time import json +import binascii import hwsim_utils import hostapd @@ -272,7 +273,7 @@ def _test_mesh_open_rssi_threshold(dev, apdev, value, expected): cmd = subprocess.Popen([ "iw", "dev", dev[0].ifname, "get", "mesh_param", "mesh_rssi_threshold" ], stdout=subprocess.PIPE) - mesh_rssi_threshold = int(cmd.stdout.read().split(" ")[0]) + mesh_rssi_threshold = int(cmd.stdout.read().decode().split(" ")[0]) dev[0].mesh_group_remove() check_mesh_group_removed(dev[0]) @@ -666,7 +667,8 @@ def test_wpas_mesh_secure_dropped_frame(dev, apdev): if rx_msg['subtype'] == 13: logger.info("Drop the first Action frame") break - if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))): + if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format( + rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())): raise Exception("MGMT_RX_PROCESS failed") dev[0].request("SET ext_mgmt_frame_handling 0") @@ -1181,6 +1183,7 @@ def _test_mesh_open_vht_160(dev, apdev): reg = cmd.stdout.read() found = False for entry in reg.splitlines(): + entry = entry.decode() if "@ 160)" in entry and "DFS" not in entry: found = True break @@ -1980,14 +1983,15 @@ def test_mesh_missing_mic(dev, apdev): (categ, action) = struct.unpack('BB', payload[0:2]) if categ == 15 and action == 1 and remove_mic: # Mesh Peering Open - pos = frame.find('\x8c\x10') + pos = frame.find(b'\x8c\x10') if not pos: raise Exception("Could not find MIC element") logger.info("Found MIC at %d" % pos) # Remove MIC rx_msg['frame'] = frame[0:pos] remove_mic = False - if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))): + if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format( + rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())): raise Exception("MGMT_RX_PROCESS failed") ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01) if ev: @@ -2048,15 +2052,16 @@ def test_mesh_pmkid_mismatch(dev, apdev): (categ, action) = struct.unpack('BB', payload[0:2]) if categ == 15 and action == 1 and break_pmkid: # Mesh Peering Open - pos = frame.find('\x75\x14') + pos = frame.find(b'\x75\x14') if not pos: raise Exception("Could not find Mesh Peering Management element") logger.info("Found Mesh Peering Management element at %d" % pos) # Break PMKID to hit "Mesh RSN: Invalid PMKID (Chosen PMK did # not match calculated PMKID)" - rx_msg['frame'] = frame[0:pos + 6] + '\x00\x00\x00\x00' + frame[pos + 10:] + rx_msg['frame'] = frame[0:pos + 6] + b'\x00\x00\x00\x00' + frame[pos + 10:] break_pmkid = False - if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))): + if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format( + rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())): raise Exception("MGMT_RX_PROCESS failed") ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01) if ev: @@ -2088,7 +2093,7 @@ def test_mesh_peering_proto(dev, apdev): (categ, action) = struct.unpack('BB', payload[0:2]) if categ == 15 and action == 1 and test == 1: # Mesh Peering Open - pos = frame.find('\x75\x04') + pos = frame.find(b'\x75\x04') if not pos: raise Exception("Could not find Mesh Peering Management element") logger.info("Found Mesh Peering Management element at %d" % pos) @@ -2098,7 +2103,7 @@ def test_mesh_peering_proto(dev, apdev): test += 1 elif categ == 15 and action == 1 and test == 2: # Mesh Peering Open - pos = frame.find('\x72\x0e') + pos = frame.find(b'\x72\x0e') if not pos: raise Exception("Could not find Mesh ID element") logger.info("Found Mesh ID element at %d" % pos) @@ -2108,13 +2113,13 @@ def test_mesh_peering_proto(dev, apdev): test += 1 elif categ == 15 and action == 1 and test == 3: # Mesh Peering Open - pos = frame.find('\x72\x0e') + pos = frame.find(b'\x72\x0e') if not pos: raise Exception("Could not find Mesh ID element") logger.info("Found Mesh ID element at %d" % pos) # Replace Mesh ID to hit "MPM: Mesh ID or Mesh Configuration # element do not match local MBSS" - rx_msg['frame'] = frame[0:pos] + '\x72\x0etest-test-test' + frame[pos + 16:] + rx_msg['frame'] = frame[0:pos] + b'\x72\x0etest-test-test' + frame[pos + 16:] test += 1 elif categ == 15 and action == 1 and test == 4: # Mesh Peering Open @@ -2130,16 +2135,17 @@ def test_mesh_peering_proto(dev, apdev): test += 1 elif categ == 15 and action == 1 and test == 6: # Mesh Peering Open - pos = frame.find('\x75\x04') + pos = frame.find(b'\x75\x04') if not pos: raise Exception("Could not find Mesh Peering Management element") logger.info("Found Mesh Peering Management element at %d" % pos) # Truncate the element to hit # "MPM: Invalid peer mgmt ie" and # "MPM: Mesh parsing rejected frame" - rx_msg['frame'] = frame[0:pos] + '\x75\x00\x00\x00' + frame[pos + 6:] + rx_msg['frame'] = frame[0:pos] + b'\x75\x00\x00\x00' + frame[pos + 6:] test += 1 - if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))): + if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format( + rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())): raise Exception("MGMT_RX_PROCESS failed") ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01) if ev: @@ -2253,8 +2259,8 @@ def test_mesh_holding(dev, apdev): if categ != 0x0f or action != 0x03: raise Exception("Did not see Mesh Peering Close") - peer_lid = payload[-6:-4].encode("hex") - my_lid = payload[-4:-2].encode("hex") + peer_lid = binascii.hexlify(payload[-6:-4]).decode() + my_lid = binascii.hexlify(payload[-4:-2]).decode() # Drop Mesh Peering Close and instead, process an unexpected Mesh Peering # Open to trigger transmission of another Mesh Peering Close in the HOLDING @@ -2296,12 +2302,13 @@ def test_mesh_cnf_rcvd_event_cls_acpt(dev, apdev): rx_msg = dev[0].mgmt_rx() # Allow Mesh Peering Confirm to go through - if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))): + if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format( + rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())): raise Exception("MGMT_RX_PROCESS failed") payload = rx_msg['payload'] - peer_lid = payload[51:53].encode("hex") - my_lid = payload[53:55].encode("hex") + peer_lid = binascii.hexlify(payload[51:53]).decode() + my_lid = binascii.hexlify(payload[53:55]).decode() dst = addr0.replace(':', '') src = addr1.replace(':', '') @@ -2338,7 +2345,7 @@ def test_mesh_opn_snt_event_cls_acpt(dev, apdev): payload = rx_msg['payload'] peer_lid = "0000" - my_lid = payload[53:55].encode("hex") + my_lid = binascii.hexlify(payload[53:55]).decode() dst = addr0.replace(':', '') src = addr1.replace(':', '') -- 2.17.1 _______________________________________________ Hostap mailing list Hostap@xxxxxxxxxxxxxxxxxxx http://lists.infradead.org/mailman/listinfo/hostap