Signed-off-by: Masashi Honma <masashi.honma@xxxxxxxxx> --- tests/hwsim/test_ap_eap.py | 48 +++++++++++++++++++------------------- tests/hwsim/test_ap_psk.py | 9 ++++--- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/tests/hwsim/test_ap_eap.py b/tests/hwsim/test_ap_eap.py index 9c77056fe..724bfab0b 100644 --- a/tests/hwsim/test_ap_eap.py +++ b/tests/hwsim/test_ap_eap.py @@ -1457,7 +1457,7 @@ def test_ap_wpa2_eap_ttls_mschapv2_utf8(dev, apdev): params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") hapd = hostapd.add_ap(apdev[0], params) eap_connect(dev[0], hapd, "TTLS", "utf8-user-hash", - anonymous_identity="ttls", password="secret-åäö-€-password", + anonymous_identity="ttls", password=u"secret-åäö-€-password", ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2") eap_connect(dev[1], hapd, "TTLS", "utf8-user", anonymous_identity="ttls", @@ -2047,13 +2047,13 @@ def test_ap_wpa2_eap_tls_blob(dev, apdev): params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") hapd = hostapd.add_ap(apdev[0], params) cert = read_pem("auth_serv/ca.pem") - if "OK" not in dev[0].request("SET blob cacert " + cert.encode("hex")): + if "OK" not in dev[0].request("SET blob cacert " + binascii.hexlify(cert).decode()): raise Exception("Could not set cacert blob") cert = read_pem("auth_serv/user.pem") - if "OK" not in dev[0].request("SET blob usercert " + cert.encode("hex")): + if "OK" not in dev[0].request("SET blob usercert " + binascii.hexlify(cert).decode()): raise Exception("Could not set usercert blob") key = read_pem("auth_serv/user.rsa-key") - if "OK" not in dev[0].request("SET blob userkey " + key.encode("hex")): + if "OK" not in dev[0].request("SET blob userkey " + binascii.hexlify(key).decode()): raise Exception("Could not set cacert blob") eap_connect(dev[0], hapd, "TLS", "tls user", ca_cert="blob://cacert", client_cert="blob://usercert", @@ -2128,10 +2128,10 @@ def test_ap_wpa2_eap_tls_pkcs12_blob(dev, apdev): params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") hapd = hostapd.add_ap(apdev[0], params) cert = read_pem("auth_serv/ca.pem") - if "OK" not in dev[0].request("SET blob cacert " + cert.encode("hex")): + if "OK" not in dev[0].request("SET blob cacert " + binascii.hexlify(cert).decode()): raise Exception("Could not set cacert blob") with open("auth_serv/user.pkcs12", "rb") as f: - if "OK" not in dev[0].request("SET blob pkcs12 " + f.read().encode("hex")): + if "OK" not in dev[0].request("SET blob pkcs12 " + binascii.hexlify(f.read()).decode()): raise Exception("Could not set pkcs12 blob") eap_connect(dev[0], hapd, "TLS", "tls user", ca_cert="blob://cacert", private_key="blob://pkcs12", @@ -2143,7 +2143,7 @@ def test_ap_wpa2_eap_tls_neg_incorrect_trust_root(dev, apdev): params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") hostapd.add_ap(apdev[0], params) cert = read_pem("auth_serv/ca-incorrect.pem") - if "OK" not in dev[0].request("SET blob cacert " + cert.encode("hex")): + if "OK" not in dev[0].request("SET blob cacert " + binascii.hexlify(cert).decode()): raise Exception("Could not set cacert blob") dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS", identity="DOMAIN\mschapv2 user", anonymous_identity="ttls", @@ -3421,7 +3421,7 @@ def test_ap_wpa2_eap_fast_text_pac_errors(dev, apdev): pac += "START\n" pac += "PAC-Type\n" pac += "END\n" - if "OK" not in dev[0].request("SET blob fast_pac_text_errors " + pac.encode("hex")): + if "OK" not in dev[0].request("SET blob fast_pac_text_errors " + binascii.hexlify(pac.encode()).decode()): raise Exception("Could not set blob") dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="FAST", @@ -4224,7 +4224,7 @@ def ica_ocsp(cert, md="-sha256"): "-cert", cert, "-no_nonce", "-text" ] cmd = subprocess.Popen(arg, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - res = cmd.stdout.read() + "\n" + cmd.stderr.read() + res = cmd.stdout.read().decode() + "\n" + cmd.stderr.read().decode() cmd.stdout.close() cmd.stderr.close() cmd.wait() @@ -4241,7 +4241,7 @@ def ica_ocsp(cert, md="-sha256"): "-text" ] cmd = subprocess.Popen(arg, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - res = cmd.stdout.read() + "\n" + cmd.stderr.read() + res = cmd.stdout.read().decode() + "\n" + cmd.stderr.read().decode() cmd.stdout.close() cmd.stderr.close() cmd.wait() @@ -4760,7 +4760,7 @@ def test_ap_wpa2_eap_ttls_dh_params_blob(dev, apdev): params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") hapd = hostapd.add_ap(apdev[0], params) dh = read_pem("auth_serv/dh2.conf") - if "OK" not in dev[0].request("SET blob dhparams " + dh.encode("hex")): + if "OK" not in dev[0].request("SET blob dhparams " + binascii.hexlify(dh).decode()): raise Exception("Could not set dhparams blob") eap_connect(dev[0], hapd, "TTLS", "pap user", anonymous_identity="ttls", password="password", @@ -4983,9 +4983,9 @@ def test_ap_wpa2_eap_non_ascii_identity(dev, apdev): params = int_eap_server_params() hostapd.add_ap(apdev[0], params) dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS", - identity="\x80", password="password", wait_connect=False) + identity=u"\x80", password="password", wait_connect=False) dev[1].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS", - identity="a\x80", password="password", wait_connect=False) + identity=u"a\x80", password="password", wait_connect=False) for i in range(0, 2): ev = dev[i].wait_event(["CTRL-EVENT-EAP-STARTED"], timeout=16) if ev is None: @@ -4999,9 +4999,9 @@ def test_ap_wpa2_eap_non_ascii_identity2(dev, apdev): params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") hostapd.add_ap(apdev[0], params) dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS", - identity="\x80", password="password", wait_connect=False) + identity=u"\x80", password="password", wait_connect=False) dev[1].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS", - identity="a\x80", password="password", wait_connect=False) + identity=u"a\x80", password="password", wait_connect=False) for i in range(0, 2): ev = dev[i].wait_event(["CTRL-EVENT-EAP-STARTED"], timeout=16) if ev is None: @@ -5128,7 +5128,7 @@ def test_wpa2_eap_ttls_pap_key_lifetime_in_memory(dev, apdev, params): get_key_locations(buf, pmk, "PMK") get_key_locations(buf, msk, "MSK") get_key_locations(buf, emsk, "EMSK") - if password not in buf: + if binascii.unhexlify(password) not in buf: raise HwsimSkip("Password not found while associated") if pmk not in buf: raise HwsimSkip("PMK not found while associated") @@ -5960,15 +5960,15 @@ def test_ap_wpa2_eap_sim_db(dev, apdev, params): class test_handler(SocketServer.DatagramRequestHandler): def handle(self): - data = self.request[0].strip() + data = self.request[0].decode().strip() socket = self.request[1] logger.debug("Received hlr_auc_gw request: " + data) # EAP-SIM DB: Failed to parse response string - socket.sendto("FOO", self.client_address) + socket.sendto(b"FOO", self.client_address) # EAP-SIM DB: Failed to parse response string - socket.sendto("FOO 1", self.client_address) + socket.sendto(b"FOO 1", self.client_address) # EAP-SIM DB: Unknown external response - socket.sendto("FOO 1 2", self.client_address) + socket.sendto(b"FOO 1 2", self.client_address) logger.info("No proper response - wait for pending eap_sim_db request timeout") server = SocketServer.UnixDatagramServer(sockpath, test_handler) @@ -5986,7 +5986,7 @@ def test_ap_wpa2_eap_sim_db(dev, apdev, params): class test_handler2(SocketServer.DatagramRequestHandler): def handle(self): - data = self.request[0].strip() + data = self.request[0].decode().strip() socket = self.request[1] logger.debug("Received hlr_auc_gw request: " + data) fname = os.path.join(params['logdir'], @@ -5994,10 +5994,10 @@ def test_ap_wpa2_eap_sim_db(dev, apdev, params): cmd = subprocess.Popen(['../../hostapd/hlr_auc_gw', '-m', fname, data], stdout=subprocess.PIPE) - res = cmd.stdout.read().strip() + res = cmd.stdout.read().decode().strip() cmd.stdout.close() logger.debug("hlr_auc_gw response: " + res) - socket.sendto(res, self.client_address) + socket.sendto(res.encode(), self.client_address) server.RequestHandlerClass = test_handler2 @@ -6431,7 +6431,7 @@ def test_ap_wpa2_eap_psk_mac_addr_change(dev, apdev): hapd = hostapd.add_ap(apdev[0], params) cmd = subprocess.Popen(['ps', '-eo', 'pid,command'], stdout=subprocess.PIPE) - res = cmd.stdout.read() + res = cmd.stdout.read().decode() cmd.stdout.close() pid = 0 for p in res.splitlines(): diff --git a/tests/hwsim/test_ap_psk.py b/tests/hwsim/test_ap_psk.py index 7d16f3445..94673dd8f 100644 --- a/tests/hwsim/test_ap_psk.py +++ b/tests/hwsim/test_ap_psk.py @@ -2328,9 +2328,10 @@ def find_wpas_process(dev): def read_process_memory(pid, key=None): buf = bytes() logger.info("Reading process memory (pid=%d)" % pid) - with open('/proc/%d/maps' % pid, 'r') as maps, \ - open('/proc/%d/mem' % pid, 'r') as mem: + with open('/proc/%d/maps' % pid, 'rb') as maps, \ + open('/proc/%d/mem' % pid, 'rb') as mem: for l in maps.readlines(): + l = l.decode() m = re.match(r'([0-9a-f]+)-([0-9a-f]+) ([-r][-w][-x][-p])', l) if not m: continue @@ -2349,7 +2350,7 @@ def read_process_memory(pid, key=None): mem.seek(start) data = mem.read(end - start) buf += data - if key and key in data: + if key and key in binascii.hexlify(data).decode(): logger.info("Key found in " + l) logger.info("Total process memory read: %d bytes" % len(buf)) return buf @@ -2368,6 +2369,8 @@ def get_key_locations(buf, key, keyname): count = 0 pos = 0 while True: + if not type(key) == bytes: + key = key.encode() pos = buf.find(key, pos) if pos < 0: break -- 2.17.1 _______________________________________________ Hostap mailing list Hostap@xxxxxxxxxxxxxxxxxxx http://lists.infradead.org/mailman/listinfo/hostap