[PATCH obexd v2 13/15] client-test: pbap-client uses new API

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Mikel Astiz <mikel.astiz@xxxxxxxxxxxx>

---
 test/pbap-client |  188 +++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 152 insertions(+), 36 deletions(-)

diff --git a/test/pbap-client b/test/pbap-client
index ecca14f..069bde3 100755
--- a/test/pbap-client
+++ b/test/pbap-client
@@ -1,41 +1,157 @@
 #!/usr/bin/python
 
+import gobject
+
 import sys
+import os
 import dbus
+import dbus.service
+import dbus.mainloop.glib
+
+class Transfer:
+	def __init__(self, callback_func):
+		self.callback_func = callback_func
+		self.path = None
+		self.filename = None
+
+class PbapClient:
+	def __init__(self, session_path):
+		self.pending_transfers = 0
+		self.transfer_dict = dict()
+		self.flush_func = None
+		bus = dbus.SessionBus()
+		obj = bus.get_object("org.openobex.client", session_path)
+		self.session = dbus.Interface(obj, "org.openobex.Session")
+		self.pbap = dbus.Interface(obj, "org.openobex.PhonebookAccess")
+		bus.add_signal_receiver(self.transfer_complete,
+					dbus_interface="org.openobex.Transfer",
+					signal_name="Complete",
+					path_keyword="path")
+		bus.add_signal_receiver(self.transfer_error,
+					dbus_interface="org.openobex.Transfer",
+					signal_name="Error",
+					path_keyword="path")
+
+	def register(self, reply, transfer):
+		(path, properties) = reply
+		transfer.path = path
+		transfer.filename = properties["Filename"]
+		self.transfer_dict[path] = transfer
+		print "Transfer created: %s (file %s)" % (path,
+							transfer.filename)
+
+	def error(self, err):
+		print err
+		mainloop.quit()
+
+	def transfer_complete(self, path):
+		req = self.transfer_dict.get(path)
+		if req == None:
+			return
+		self.pending_transfers -= 1
+		print "Transfer %s finished" % path
+		f = open(req.filename, "r")
+		os.remove(req.filename)
+		lines = f.readlines()
+		del self.transfer_dict[path]
+		req.callback_func(lines)
+
+		if ((len(self.transfer_dict) == 0) and
+						(self.pending_transfers == 0)):
+			if self.flush_func != None:
+				f = self.flush_func
+				self.flush_func = None
+				f()
+
+	def transfer_error(self, code, message, path):
+		req = self.transfer_dict.get(path)
+		if req == None:
+			return
+		print "Transfer finished with error %s: %s" % (code, message)
+		mainloop.quit()
+
+	def pull(self, vcard, func):
+		req = Transfer(func)
+		self.pbap.Pull(vcard, "",
+				reply_handler=lambda r: self.register(r, req),
+				error_handler=self.error)
+		self.pending_transfers += 1
+
+	def pull_all(self, func):
+		req = Transfer(func)
+		self.pbap.PullAll("",
+				reply_handler=lambda r: self.register(r, req),
+				error_handler=self.error)
+		self.pending_transfers += 1
+
+	def flush_transfers(self, func):
+		if ((len(self.transfer_dict) == 0) and
+						(self.pending_transfers == 0)):
+			return
+		self.flush_func = func
+
+	def interface(self):
+		return self.pbap
+
+if __name__ == '__main__':
+
+	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+	bus = dbus.SessionBus()
+	mainloop = gobject.MainLoop()
+
+	client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
+							"org.openobex.Client")
+
+	if (len(sys.argv) < 2):
+		print "Usage: %s <device>" % (sys.argv[0])
+		sys.exit(1)
+
+	print "Creating Session"
+	session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
+
+	pbap_client = PbapClient(session_path)
+
+	def process_result(lines, header):
+		if header != None:
+			print header
+		for line in lines:
+			print line,
+		print
+
+	def test_paths(paths):
+		if len(paths) == 0:
+			print
+			print "FINISHED"
+			mainloop.quit()
+			return
+
+		path = paths[0]
+
+		print "\n--- Select Phonebook %s ---\n" % (path)
+		pbap_client.interface().Select("int", path)
+
+		print "\n--- GetSize ---\n"
+		ret = pbap_client.interface().GetSize()
+		print "Size = %d\n" % (ret)
+
+		print "\n--- List vCard ---\n"
+		ret = pbap_client.interface().List()
+		for item in ret:
+			print "%s : %s" % (item[0], item[1])
+			pbap_client.interface().SetFormat("vcard30")
+			pbap_client.interface().SetFilter(["VERSION", "FN",
+									"TEL"])
+			pbap_client.pull(item[0],
+					lambda x: process_result(x, None))
+
+		pbap_client.interface().SetFormat("vcard30")
+		pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"])
+		pbap_client.pull_all(lambda x:
+				process_result(x, "\n--- PullAll ---\n"))
+
+		pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
+
+	test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
 
-bus = dbus.SessionBus()
-
-client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
-						"org.openobex.Client")
-
-print "Creating Session"
-session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
-pbap = dbus.Interface(bus.get_object("org.openobex.client", session_path),
-						"org.openobex.PhonebookAccess")
-session = dbus.Interface(bus.get_object("org.openobex.client", session_path),
-							"org.openobex.Session")
-
-paths = ["PB", "ICH", "OCH", "MCH", "CCH"]
-
-for path in paths:
-	print "\n--- Select Phonebook %s ---\n" % (path)
-	pbap.Select("int", path)
-
-	print "\n--- GetSize ---\n"
-	ret = pbap.GetSize()
-	print "Size = %d\n" % (ret)
-
-	print "\n--- List vCard ---\n"
-	ret = pbap.List()
-	for item in ret:
-		print "%s : %s" % (item[0], item[1])
-		pbap.SetFormat("vcard30")
-		pbap.SetFilter(["VERSION", "FN", "TEL"]);
-		ret = pbap.Pull(item[0])
-		print "%s" % (ret)
-
-	print "\n--- PullAll ---\n"
-	pbap.SetFormat("vcard30")
-	pbap.SetFilter(["VERSION", "FN", "TEL"]);
-	ret = pbap.PullAll()
-	print "%s" % (ret)
+	mainloop.run()
-- 
1.7.7.6

--
To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux