[PATCH] test: Reorganize LE Python example scripts

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch moves tools/gatt-example to test/ and renames it to
example-gatt-server in preparation for example-gatt-client.
The same naming convention (example-*) is also adopted for
test/advertisement-example, so it's now called
test/example-advertisement. Also, both scripts now have the execute
set appropriately to be consistent with the other Python scripts.
---
 test/advertisement-example | 170 ---------------
 test/example-advertisement | 170 +++++++++++++++
 test/example-gatt-server   | 533 +++++++++++++++++++++++++++++++++++++++++++++
 tools/gatt-example         | 533 ---------------------------------------------
 4 files changed, 703 insertions(+), 703 deletions(-)
 delete mode 100644 test/advertisement-example
 create mode 100755 test/example-advertisement
 create mode 100755 test/example-gatt-server
 delete mode 100644 tools/gatt-example

diff --git a/test/advertisement-example b/test/advertisement-example
deleted file mode 100644
index 98aeafa..0000000
--- a/test/advertisement-example
+++ /dev/null
@@ -1,170 +0,0 @@
-#!/usr/bin/python
-
-import dbus
-import dbus.exceptions
-import dbus.mainloop.glib
-import dbus.service
-
-import array
-import gobject
-
-from random import randint
-
-mainloop = None
-
-BLUEZ_SERVICE_NAME = 'org.bluez'
-LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
-DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
-DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
-
-LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
-
-
-class InvalidArgsException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
-
-
-class NotSupportedException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.NotSupported'
-
-
-class NotPermittedException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.NotPermitted'
-
-
-class InvalidValueLengthException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
-
-
-class FailedException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.Failed'
-
-
-class Advertisement(dbus.service.Object):
-    PATH_BASE = '/org/bluez/example/advertisement'
-
-    def __init__(self, bus, index, advertising_type):
-        self.path = self.PATH_BASE + str(index)
-        self.bus = bus
-        self.ad_type = advertising_type
-        self.service_uuids = None
-        self.manufacturer_data = None
-        self.solicit_uuids = None
-        self.service_data = None
-        dbus.service.Object.__init__(self, bus, self.path)
-
-    def get_properties(self):
-        properties = dict()
-        properties['Type'] = self.ad_type
-        if self.service_uuids is not None:
-            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
-                                                    signature='s')
-        if self.solicit_uuids is not None:
-            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
-                                                    signature='s')
-        if self.manufacturer_data is not None:
-            properties['ManufacturerData'] = dbus.Dictionary(
-                self.manufacturer_data, signature='qay')
-        if self.service_data is not None:
-            properties['ServiceData'] = dbus.Dictionary(self.service_data,
-                                                        signature='say')
-        return {LE_ADVERTISEMENT_IFACE: properties}
-
-    def get_path(self):
-        return dbus.ObjectPath(self.path)
-
-    def add_service_uuid(self, uuid):
-        if not self.service_uuids:
-            self.service_uuids = []
-        self.service_uuids.append(uuid)
-
-    def add_solicit_uuid(self, uuid):
-        if not self.solicit_uuids:
-            self.solicit_uuids = []
-        self.solicit_uuids.append(uuid)
-
-    def add_manufacturer_data(self, manuf_code, data):
-        if not self.manufacturer_data:
-            self.manufacturer_data = dict()
-        self.manufacturer_data[manuf_code] = data
-
-    def add_service_data(self, uuid, data):
-        if not self.service_data:
-            self.service_data = dict()
-        self.service_data[uuid] = data
-
-    @dbus.service.method(DBUS_PROP_IFACE,
-                         in_signature='s',
-                         out_signature='a{sv}')
-    def GetAll(self, interface):
-        print 'GetAll'
-        if interface != LE_ADVERTISEMENT_IFACE:
-            raise InvalidArgsException()
-        print 'returning props'
-        return self.get_properties()[LE_ADVERTISEMENT_IFACE]
-
-    @dbus.service.method(LE_ADVERTISEMENT_IFACE,
-                         in_signature='',
-                         out_signature='')
-    def Release(self):
-        print '%s: Released!' % self.path
-
-class TestAdvertisement(Advertisement):
-
-    def __init__(self, bus, index):
-        Advertisement.__init__(self, bus, index, 'broadcast')
-        self.add_service_uuid('0000180D-0000-1000-8000-00805F9B34FB')
-        self.add_service_uuid('0000180F-0000-1000-8000-00805F9B34FB')
-        self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04])
-        self.add_service_data('00009999-0000-1000-8000-00805F9B34FB',
-                              [0x00, 0x01, 0x02, 0x03, 0x04])
-
-
-def register_ad_cb():
-    print 'Advertisement registered'
-
-
-def register_ad_error_cb(error):
-    print 'Failed to register advertisement: ' + str(error)
-    mainloop.quit()
-
-
-def find_adapter(bus):
-    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
-                               DBUS_OM_IFACE)
-    objects = remote_om.GetManagedObjects()
-
-    for o, props in objects.iteritems():
-        if LE_ADVERTISING_MANAGER_IFACE in props:
-            return o
-
-    return None
-
-
-def main():
-    global mainloop
-
-    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
-
-    bus = dbus.SystemBus()
-
-    adapter = find_adapter(bus)
-    if not adapter:
-        print 'LEAdvertisingManager1 interface not found'
-        return
-
-    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
-                                LE_ADVERTISING_MANAGER_IFACE)
-
-    test_advertisement = TestAdvertisement(bus, 0)
-
-    mainloop = gobject.MainLoop()
-
-    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
-                                     reply_handler=register_ad_cb,
-                                     error_handler=register_ad_error_cb)
-
-    mainloop.run()
-
-if __name__ == '__main__':
-    main()
diff --git a/test/example-advertisement b/test/example-advertisement
new file mode 100755
index 0000000..98aeafa
--- /dev/null
+++ b/test/example-advertisement
@@ -0,0 +1,170 @@
+#!/usr/bin/python
+
+import dbus
+import dbus.exceptions
+import dbus.mainloop.glib
+import dbus.service
+
+import array
+import gobject
+
+from random import randint
+
+mainloop = None
+
+BLUEZ_SERVICE_NAME = 'org.bluez'
+LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+
+LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
+
+
+class InvalidArgsException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
+
+
+class NotSupportedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.NotSupported'
+
+
+class NotPermittedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.NotPermitted'
+
+
+class InvalidValueLengthException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
+
+
+class FailedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.Failed'
+
+
+class Advertisement(dbus.service.Object):
+    PATH_BASE = '/org/bluez/example/advertisement'
+
+    def __init__(self, bus, index, advertising_type):
+        self.path = self.PATH_BASE + str(index)
+        self.bus = bus
+        self.ad_type = advertising_type
+        self.service_uuids = None
+        self.manufacturer_data = None
+        self.solicit_uuids = None
+        self.service_data = None
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        properties = dict()
+        properties['Type'] = self.ad_type
+        if self.service_uuids is not None:
+            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
+                                                    signature='s')
+        if self.solicit_uuids is not None:
+            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
+                                                    signature='s')
+        if self.manufacturer_data is not None:
+            properties['ManufacturerData'] = dbus.Dictionary(
+                self.manufacturer_data, signature='qay')
+        if self.service_data is not None:
+            properties['ServiceData'] = dbus.Dictionary(self.service_data,
+                                                        signature='say')
+        return {LE_ADVERTISEMENT_IFACE: properties}
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_service_uuid(self, uuid):
+        if not self.service_uuids:
+            self.service_uuids = []
+        self.service_uuids.append(uuid)
+
+    def add_solicit_uuid(self, uuid):
+        if not self.solicit_uuids:
+            self.solicit_uuids = []
+        self.solicit_uuids.append(uuid)
+
+    def add_manufacturer_data(self, manuf_code, data):
+        if not self.manufacturer_data:
+            self.manufacturer_data = dict()
+        self.manufacturer_data[manuf_code] = data
+
+    def add_service_data(self, uuid, data):
+        if not self.service_data:
+            self.service_data = dict()
+        self.service_data[uuid] = data
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        print 'GetAll'
+        if interface != LE_ADVERTISEMENT_IFACE:
+            raise InvalidArgsException()
+        print 'returning props'
+        return self.get_properties()[LE_ADVERTISEMENT_IFACE]
+
+    @dbus.service.method(LE_ADVERTISEMENT_IFACE,
+                         in_signature='',
+                         out_signature='')
+    def Release(self):
+        print '%s: Released!' % self.path
+
+class TestAdvertisement(Advertisement):
+
+    def __init__(self, bus, index):
+        Advertisement.__init__(self, bus, index, 'broadcast')
+        self.add_service_uuid('0000180D-0000-1000-8000-00805F9B34FB')
+        self.add_service_uuid('0000180F-0000-1000-8000-00805F9B34FB')
+        self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04])
+        self.add_service_data('00009999-0000-1000-8000-00805F9B34FB',
+                              [0x00, 0x01, 0x02, 0x03, 0x04])
+
+
+def register_ad_cb():
+    print 'Advertisement registered'
+
+
+def register_ad_error_cb(error):
+    print 'Failed to register advertisement: ' + str(error)
+    mainloop.quit()
+
+
+def find_adapter(bus):
+    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
+                               DBUS_OM_IFACE)
+    objects = remote_om.GetManagedObjects()
+
+    for o, props in objects.iteritems():
+        if LE_ADVERTISING_MANAGER_IFACE in props:
+            return o
+
+    return None
+
+
+def main():
+    global mainloop
+
+    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+    bus = dbus.SystemBus()
+
+    adapter = find_adapter(bus)
+    if not adapter:
+        print 'LEAdvertisingManager1 interface not found'
+        return
+
+    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+                                LE_ADVERTISING_MANAGER_IFACE)
+
+    test_advertisement = TestAdvertisement(bus, 0)
+
+    mainloop = gobject.MainLoop()
+
+    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
+                                     reply_handler=register_ad_cb,
+                                     error_handler=register_ad_error_cb)
+
+    mainloop.run()
+
+if __name__ == '__main__':
+    main()
diff --git a/test/example-gatt-server b/test/example-gatt-server
new file mode 100755
index 0000000..a6f5cbe
--- /dev/null
+++ b/test/example-gatt-server
@@ -0,0 +1,533 @@
+#!/usr/bin/python
+
+import dbus
+import dbus.exceptions
+import dbus.mainloop.glib
+import dbus.service
+
+import array
+import gobject
+
+from random import randint
+
+mainloop = None
+
+BLUEZ_SERVICE_NAME = 'org.bluez'
+GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+
+GATT_SERVICE_IFACE = 'org.bluez.GattService1'
+GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
+GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
+
+class InvalidArgsException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
+
+class NotSupportedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.NotSupported'
+
+class NotPermittedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.NotPermitted'
+
+class InvalidValueLengthException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
+
+class FailedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.Failed'
+
+
+class Service(dbus.service.Object):
+    PATH_BASE = '/org/bluez/example/service'
+
+    def __init__(self, bus, index, uuid, primary):
+        self.path = self.PATH_BASE + str(index)
+        self.bus = bus
+        self.uuid = uuid
+        self.primary = primary
+        self.characteristics = []
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_SERVICE_IFACE: {
+                        'UUID': self.uuid,
+                        'Primary': self.primary,
+                        'Characteristics': dbus.Array(
+                                self.get_characteristic_paths(),
+                                signature='o')
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_characteristic(self, characteristic):
+        self.characteristics.append(characteristic)
+
+    def get_characteristic_paths(self):
+        result = []
+        for chrc in self.characteristics:
+            result.append(chrc.get_path())
+        return result
+
+    def get_characteristics(self):
+        return self.characteristics
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_SERVICE_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties[GATT_SERVICE_IFACE]
+
+    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+    def GetManagedObjects(self):
+        response = {}
+        print 'GetManagedObjects'
+
+        response[self.get_path()] = self.get_properties()
+        chrcs = self.get_characteristics()
+        for chrc in chrcs:
+            response[chrc.get_path()] = chrc.get_properties()
+            descs = chrc.get_descriptors()
+            for desc in descs:
+                response[desc.get_path()] = desc.get_properties()
+
+        return response
+
+
+class Characteristic(dbus.service.Object):
+    def __init__(self, bus, index, uuid, flags, service):
+        self.path = service.path + '/char' + str(index)
+        self.bus = bus
+        self.uuid = uuid
+        self.service = service
+        self.flags = flags
+        self.descriptors = []
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_CHRC_IFACE: {
+                        'Service': self.service.get_path(),
+                        'UUID': self.uuid,
+                        'Flags': self.flags,
+                        'Descriptors': dbus.Array(
+                                self.get_descriptor_paths(),
+                                signature='o')
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_descriptor(self, descriptor):
+        self.descriptors.append(descriptor)
+
+    def get_descriptor_paths(self):
+        result = []
+        for desc in self.descriptors:
+            result.append(desc.get_path())
+        return result
+
+    def get_descriptors(self):
+        return self.descriptors
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_CHRC_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties[GATT_CHRC_IFACE]
+
+    @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay')
+    def ReadValue(self):
+        print 'Default ReadValue called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay')
+    def WriteValue(self, value):
+        print 'Default WriteValue called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE)
+    def StartNotify(self):
+        print 'Default StartNotify called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE)
+    def StopNotify(self):
+        print 'Default StopNotify called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.signal(DBUS_PROP_IFACE,
+                         signature='sa{sv}as')
+    def PropertiesChanged(self, interface, changed, invalidated):
+        pass
+
+
+class Descriptor(dbus.service.Object):
+    def __init__(self, bus, index, uuid, characteristic):
+        self.path = characteristic.path + '/desc' + str(index)
+        self.bus = bus
+        self.uuid = uuid
+        self.chrc = characteristic
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_DESC_IFACE: {
+                        'Characteristic': self.chrc.get_path(),
+                        'UUID': self.uuid,
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_DESC_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties[GATT_CHRC_IFACE]
+
+    @dbus.service.method(GATT_DESC_IFACE, out_signature='ay')
+    def ReadValue(self):
+        print 'Default ReadValue called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_DESC_IFACE, in_signature='ay')
+    def WriteValue(self, value):
+        print 'Default WriteValue called, returning error'
+        raise NotSupportedException()
+
+
+class HeartRateService(Service):
+    """
+    Fake Heart Rate Service that simulates a fake heart beat and control point
+    behavior.
+
+    """
+    HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index):
+        Service.__init__(self, bus, index, self.HR_UUID, True)
+        self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))
+        self.add_characteristic(BodySensorLocationChrc(bus, 1, self))
+        self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))
+        self.energy_expended = 0
+
+
+class HeartRateMeasurementChrc(Characteristic):
+    HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.HR_MSRMT_UUID,
+                ['notify'],
+                service)
+        self.notifying = False
+        self.hr_ee_count = 0
+
+    def hr_msrmt_cb(self):
+        value = []
+        value.append(dbus.Byte(0x06))
+
+        value.append(dbus.Byte(randint(90, 130)))
+
+        if self.hr_ee_count % 10 == 0:
+            value[0] = dbus.Byte(value[0] | 0x08)
+            value.append(dbus.Byte(self.service.energy_expended & 0xff))
+            value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))
+
+        self.service.energy_expended = \
+                min(0xffff, self.service.energy_expended + 1)
+        self.hr_ee_count += 1
+
+        print 'Updating value: ' + repr(value)
+
+        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])
+
+        return self.notifying
+
+    def _update_hr_msrmt_simulation(self):
+        print 'Update HR Measurement Simulation'
+
+        if not self.notifying:
+            return
+
+        gobject.timeout_add(1000, self.hr_msrmt_cb)
+
+    def StartNotify(self):
+        if self.notifying:
+            print 'Already notifying, nothing to do'
+            return
+
+        self.notifying = True
+        self._update_hr_msrmt_simulation()
+
+    def StopNotify(self):
+        if not self.notifying:
+            print 'Not notifying, nothing to do'
+            return
+
+        self.notifying = False
+        self._update_hr_msrmt_simulation()
+
+
+class BodySensorLocationChrc(Characteristic):
+    BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.BODY_SNSR_LOC_UUID,
+                ['read'],
+                service)
+
+    def ReadValue(self):
+        # Return 'Chest' as the sensor location.
+        return [ 0x01 ]
+
+class HeartRateControlPointChrc(Characteristic):
+    HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.HR_CTRL_PT_UUID,
+                ['write'],
+                service)
+
+    def WriteValue(self, value):
+        print 'Heart Rate Control Point WriteValue called'
+
+        if len(value) != 1:
+            raise InvalidValueLengthException()
+
+        byte = value[0]
+        print 'Control Point value: ' + repr(byte)
+
+        if byte != 1:
+            raise FailedException("0x80")
+
+        print 'Energy Expended field reset!'
+        self.service.energy_expended = 0
+
+
+class BatteryService(Service):
+    """
+    Fake Battery service that emulates a draining battery.
+
+    """
+    BATTERY_UUID = '180f'
+
+    def __init__(self, bus, index):
+        Service.__init__(self, bus, index, self.BATTERY_UUID, True)
+        self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))
+
+
+class BatteryLevelCharacteristic(Characteristic):
+    """
+    Fake Battery Level characteristic. The battery level is drained by 2 points
+    every 5 seconds.
+
+    """
+    BATTERY_LVL_UUID = '2a19'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.BATTERY_LVL_UUID,
+                ['read', 'notify'],
+                service)
+        self.notifying = False
+        self.battery_lvl = 100
+        gobject.timeout_add(5000, self.drain_battery)
+
+    def notify_battery_level(self):
+        if not self.notifying:
+            return
+        self.PropertiesChanged(
+                GATT_CHRC_IFACE,
+                { 'Value': [dbus.Byte(self.battery_lvl)] }, [])
+
+    def drain_battery(self):
+        if self.battery_lvl > 0:
+            self.battery_lvl -= 2
+            if self.battery_lvl < 0:
+                self.battery_lvl = 0
+        print 'Battery Level drained: ' + repr(self.battery_lvl)
+        self.notify_battery_level()
+        return True
+
+    def ReadValue(self):
+        print 'Battery Level read: ' + repr(self.battery_lvl)
+        return [dbus.Byte(self.battery_lvl)]
+
+    def StartNotify(self):
+        if self.notifying:
+            print 'Already notifying, nothing to do'
+            return
+
+        self.notifying = True
+        self.notify_battery_level()
+
+    def StopNotify(self):
+        if not self.notifying:
+            print 'Not notifying, nothing to do'
+            return
+
+        self.notifying = False
+
+
+class TestService(Service):
+    """
+    Dummy test service that provides characteristics and descriptors that
+    exercise various API functionality.
+
+    """
+    TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'
+
+    def __init__(self, bus, index):
+        Service.__init__(self, bus, index, self.TEST_SVC_UUID, False)
+        self.add_characteristic(TestCharacteristic(bus, 0, self))
+
+
+class TestCharacteristic(Characteristic):
+    """
+    Dummy test characteristic. Allows writing arbitrary bytes to its value, and
+    contains "extended properties", as well as a test descriptor.
+
+    """
+    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.TEST_CHRC_UUID,
+                ['read', 'write', 'writable-auxiliaries'],
+                service)
+        self.value = []
+        self.add_descriptor(TestDescriptor(bus, 0, self))
+        self.add_descriptor(
+                CharacteristicUserDescriptionDescriptor(bus, 1, self))
+
+    def ReadValue(self):
+        print 'TestCharacteristic Read: ' + repr(self.value)
+        return self.value
+
+    def WriteValue(self, value):
+        print 'TestCharacteristic Write: ' + repr(value)
+        self.value = value
+
+
+class TestDescriptor(Descriptor):
+    """
+    Dummy test descriptor. Returns a static value.
+
+    """
+    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'
+
+    def __init__(self, bus, index, characteristic):
+        Descriptor.__init__(
+                self, bus, index,
+                self.TEST_DESC_UUID,
+                characteristic)
+
+    def ReadValue(self):
+        return [
+                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
+        ]
+
+
+class CharacteristicUserDescriptionDescriptor(Descriptor):
+    """
+    Writable CUD descriptor.
+
+    """
+    CUD_UUID = '2901'
+
+    def __init__(self, bus, index, characteristic):
+        self.writable = 'writable-auxiliaries' in characteristic.flags
+        self.value = array.array('B', 'This is a characteristic for testing')
+        self.value = self.value.tolist()
+        Descriptor.__init__(
+                self, bus, index,
+                self.CUD_UUID,
+                characteristic)
+
+    def ReadValue(self):
+        return self.value
+
+    def WriteValue(self, value):
+        if not self.writable:
+            raise NotPermittedException()
+        self.value = value
+
+
+def register_service_cb():
+    print 'GATT service registered'
+
+
+def register_service_error_cb(error):
+    print 'Failed to register service: ' + str(error)
+    mainloop.quit()
+
+
+def find_adapter(bus):
+    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
+                               DBUS_OM_IFACE)
+    objects = remote_om.GetManagedObjects()
+
+    for o, props in objects.iteritems():
+        if props.has_key(GATT_MANAGER_IFACE):
+            return o
+
+    return None
+
+def main():
+    global mainloop
+
+    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+    bus = dbus.SystemBus()
+
+    adapter = find_adapter(bus)
+    if not adapter:
+        print 'GattManager1 interface not found'
+        return
+
+    service_manager = dbus.Interface(
+            bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+            GATT_MANAGER_IFACE)
+
+    hr_service = HeartRateService(bus, 0)
+    bat_service = BatteryService(bus, 1)
+    test_service = TestService(bus, 2)
+
+    mainloop = gobject.MainLoop()
+
+    service_manager.RegisterService(hr_service.get_path(), {},
+                                    reply_handler=register_service_cb,
+                                    error_handler=register_service_error_cb)
+    service_manager.RegisterService(bat_service.get_path(), {},
+                                    reply_handler=register_service_cb,
+                                    error_handler=register_service_error_cb)
+    service_manager.RegisterService(test_service.get_path(), {},
+                                    reply_handler=register_service_cb,
+                                    error_handler=register_service_error_cb)
+
+    mainloop.run()
+
+if __name__ == '__main__':
+    main()
diff --git a/tools/gatt-example b/tools/gatt-example
deleted file mode 100644
index a6f5cbe..0000000
--- a/tools/gatt-example
+++ /dev/null
@@ -1,533 +0,0 @@
-#!/usr/bin/python
-
-import dbus
-import dbus.exceptions
-import dbus.mainloop.glib
-import dbus.service
-
-import array
-import gobject
-
-from random import randint
-
-mainloop = None
-
-BLUEZ_SERVICE_NAME = 'org.bluez'
-GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
-DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
-DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
-
-GATT_SERVICE_IFACE = 'org.bluez.GattService1'
-GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
-GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
-
-class InvalidArgsException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
-
-class NotSupportedException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.NotSupported'
-
-class NotPermittedException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.NotPermitted'
-
-class InvalidValueLengthException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
-
-class FailedException(dbus.exceptions.DBusException):
-    _dbus_error_name = 'org.bluez.Error.Failed'
-
-
-class Service(dbus.service.Object):
-    PATH_BASE = '/org/bluez/example/service'
-
-    def __init__(self, bus, index, uuid, primary):
-        self.path = self.PATH_BASE + str(index)
-        self.bus = bus
-        self.uuid = uuid
-        self.primary = primary
-        self.characteristics = []
-        dbus.service.Object.__init__(self, bus, self.path)
-
-    def get_properties(self):
-        return {
-                GATT_SERVICE_IFACE: {
-                        'UUID': self.uuid,
-                        'Primary': self.primary,
-                        'Characteristics': dbus.Array(
-                                self.get_characteristic_paths(),
-                                signature='o')
-                }
-        }
-
-    def get_path(self):
-        return dbus.ObjectPath(self.path)
-
-    def add_characteristic(self, characteristic):
-        self.characteristics.append(characteristic)
-
-    def get_characteristic_paths(self):
-        result = []
-        for chrc in self.characteristics:
-            result.append(chrc.get_path())
-        return result
-
-    def get_characteristics(self):
-        return self.characteristics
-
-    @dbus.service.method(DBUS_PROP_IFACE,
-                         in_signature='s',
-                         out_signature='a{sv}')
-    def GetAll(self, interface):
-        if interface != GATT_SERVICE_IFACE:
-            raise InvalidArgsException()
-
-        return self.get_properties[GATT_SERVICE_IFACE]
-
-    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
-    def GetManagedObjects(self):
-        response = {}
-        print 'GetManagedObjects'
-
-        response[self.get_path()] = self.get_properties()
-        chrcs = self.get_characteristics()
-        for chrc in chrcs:
-            response[chrc.get_path()] = chrc.get_properties()
-            descs = chrc.get_descriptors()
-            for desc in descs:
-                response[desc.get_path()] = desc.get_properties()
-
-        return response
-
-
-class Characteristic(dbus.service.Object):
-    def __init__(self, bus, index, uuid, flags, service):
-        self.path = service.path + '/char' + str(index)
-        self.bus = bus
-        self.uuid = uuid
-        self.service = service
-        self.flags = flags
-        self.descriptors = []
-        dbus.service.Object.__init__(self, bus, self.path)
-
-    def get_properties(self):
-        return {
-                GATT_CHRC_IFACE: {
-                        'Service': self.service.get_path(),
-                        'UUID': self.uuid,
-                        'Flags': self.flags,
-                        'Descriptors': dbus.Array(
-                                self.get_descriptor_paths(),
-                                signature='o')
-                }
-        }
-
-    def get_path(self):
-        return dbus.ObjectPath(self.path)
-
-    def add_descriptor(self, descriptor):
-        self.descriptors.append(descriptor)
-
-    def get_descriptor_paths(self):
-        result = []
-        for desc in self.descriptors:
-            result.append(desc.get_path())
-        return result
-
-    def get_descriptors(self):
-        return self.descriptors
-
-    @dbus.service.method(DBUS_PROP_IFACE,
-                         in_signature='s',
-                         out_signature='a{sv}')
-    def GetAll(self, interface):
-        if interface != GATT_CHRC_IFACE:
-            raise InvalidArgsException()
-
-        return self.get_properties[GATT_CHRC_IFACE]
-
-    @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay')
-    def ReadValue(self):
-        print 'Default ReadValue called, returning error'
-        raise NotSupportedException()
-
-    @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay')
-    def WriteValue(self, value):
-        print 'Default WriteValue called, returning error'
-        raise NotSupportedException()
-
-    @dbus.service.method(GATT_CHRC_IFACE)
-    def StartNotify(self):
-        print 'Default StartNotify called, returning error'
-        raise NotSupportedException()
-
-    @dbus.service.method(GATT_CHRC_IFACE)
-    def StopNotify(self):
-        print 'Default StopNotify called, returning error'
-        raise NotSupportedException()
-
-    @dbus.service.signal(DBUS_PROP_IFACE,
-                         signature='sa{sv}as')
-    def PropertiesChanged(self, interface, changed, invalidated):
-        pass
-
-
-class Descriptor(dbus.service.Object):
-    def __init__(self, bus, index, uuid, characteristic):
-        self.path = characteristic.path + '/desc' + str(index)
-        self.bus = bus
-        self.uuid = uuid
-        self.chrc = characteristic
-        dbus.service.Object.__init__(self, bus, self.path)
-
-    def get_properties(self):
-        return {
-                GATT_DESC_IFACE: {
-                        'Characteristic': self.chrc.get_path(),
-                        'UUID': self.uuid,
-                }
-        }
-
-    def get_path(self):
-        return dbus.ObjectPath(self.path)
-
-    @dbus.service.method(DBUS_PROP_IFACE,
-                         in_signature='s',
-                         out_signature='a{sv}')
-    def GetAll(self, interface):
-        if interface != GATT_DESC_IFACE:
-            raise InvalidArgsException()
-
-        return self.get_properties[GATT_CHRC_IFACE]
-
-    @dbus.service.method(GATT_DESC_IFACE, out_signature='ay')
-    def ReadValue(self):
-        print 'Default ReadValue called, returning error'
-        raise NotSupportedException()
-
-    @dbus.service.method(GATT_DESC_IFACE, in_signature='ay')
-    def WriteValue(self, value):
-        print 'Default WriteValue called, returning error'
-        raise NotSupportedException()
-
-
-class HeartRateService(Service):
-    """
-    Fake Heart Rate Service that simulates a fake heart beat and control point
-    behavior.
-
-    """
-    HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
-
-    def __init__(self, bus, index):
-        Service.__init__(self, bus, index, self.HR_UUID, True)
-        self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))
-        self.add_characteristic(BodySensorLocationChrc(bus, 1, self))
-        self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))
-        self.energy_expended = 0
-
-
-class HeartRateMeasurementChrc(Characteristic):
-    HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
-
-    def __init__(self, bus, index, service):
-        Characteristic.__init__(
-                self, bus, index,
-                self.HR_MSRMT_UUID,
-                ['notify'],
-                service)
-        self.notifying = False
-        self.hr_ee_count = 0
-
-    def hr_msrmt_cb(self):
-        value = []
-        value.append(dbus.Byte(0x06))
-
-        value.append(dbus.Byte(randint(90, 130)))
-
-        if self.hr_ee_count % 10 == 0:
-            value[0] = dbus.Byte(value[0] | 0x08)
-            value.append(dbus.Byte(self.service.energy_expended & 0xff))
-            value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))
-
-        self.service.energy_expended = \
-                min(0xffff, self.service.energy_expended + 1)
-        self.hr_ee_count += 1
-
-        print 'Updating value: ' + repr(value)
-
-        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])
-
-        return self.notifying
-
-    def _update_hr_msrmt_simulation(self):
-        print 'Update HR Measurement Simulation'
-
-        if not self.notifying:
-            return
-
-        gobject.timeout_add(1000, self.hr_msrmt_cb)
-
-    def StartNotify(self):
-        if self.notifying:
-            print 'Already notifying, nothing to do'
-            return
-
-        self.notifying = True
-        self._update_hr_msrmt_simulation()
-
-    def StopNotify(self):
-        if not self.notifying:
-            print 'Not notifying, nothing to do'
-            return
-
-        self.notifying = False
-        self._update_hr_msrmt_simulation()
-
-
-class BodySensorLocationChrc(Characteristic):
-    BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
-
-    def __init__(self, bus, index, service):
-        Characteristic.__init__(
-                self, bus, index,
-                self.BODY_SNSR_LOC_UUID,
-                ['read'],
-                service)
-
-    def ReadValue(self):
-        # Return 'Chest' as the sensor location.
-        return [ 0x01 ]
-
-class HeartRateControlPointChrc(Characteristic):
-    HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
-
-    def __init__(self, bus, index, service):
-        Characteristic.__init__(
-                self, bus, index,
-                self.HR_CTRL_PT_UUID,
-                ['write'],
-                service)
-
-    def WriteValue(self, value):
-        print 'Heart Rate Control Point WriteValue called'
-
-        if len(value) != 1:
-            raise InvalidValueLengthException()
-
-        byte = value[0]
-        print 'Control Point value: ' + repr(byte)
-
-        if byte != 1:
-            raise FailedException("0x80")
-
-        print 'Energy Expended field reset!'
-        self.service.energy_expended = 0
-
-
-class BatteryService(Service):
-    """
-    Fake Battery service that emulates a draining battery.
-
-    """
-    BATTERY_UUID = '180f'
-
-    def __init__(self, bus, index):
-        Service.__init__(self, bus, index, self.BATTERY_UUID, True)
-        self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))
-
-
-class BatteryLevelCharacteristic(Characteristic):
-    """
-    Fake Battery Level characteristic. The battery level is drained by 2 points
-    every 5 seconds.
-
-    """
-    BATTERY_LVL_UUID = '2a19'
-
-    def __init__(self, bus, index, service):
-        Characteristic.__init__(
-                self, bus, index,
-                self.BATTERY_LVL_UUID,
-                ['read', 'notify'],
-                service)
-        self.notifying = False
-        self.battery_lvl = 100
-        gobject.timeout_add(5000, self.drain_battery)
-
-    def notify_battery_level(self):
-        if not self.notifying:
-            return
-        self.PropertiesChanged(
-                GATT_CHRC_IFACE,
-                { 'Value': [dbus.Byte(self.battery_lvl)] }, [])
-
-    def drain_battery(self):
-        if self.battery_lvl > 0:
-            self.battery_lvl -= 2
-            if self.battery_lvl < 0:
-                self.battery_lvl = 0
-        print 'Battery Level drained: ' + repr(self.battery_lvl)
-        self.notify_battery_level()
-        return True
-
-    def ReadValue(self):
-        print 'Battery Level read: ' + repr(self.battery_lvl)
-        return [dbus.Byte(self.battery_lvl)]
-
-    def StartNotify(self):
-        if self.notifying:
-            print 'Already notifying, nothing to do'
-            return
-
-        self.notifying = True
-        self.notify_battery_level()
-
-    def StopNotify(self):
-        if not self.notifying:
-            print 'Not notifying, nothing to do'
-            return
-
-        self.notifying = False
-
-
-class TestService(Service):
-    """
-    Dummy test service that provides characteristics and descriptors that
-    exercise various API functionality.
-
-    """
-    TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'
-
-    def __init__(self, bus, index):
-        Service.__init__(self, bus, index, self.TEST_SVC_UUID, False)
-        self.add_characteristic(TestCharacteristic(bus, 0, self))
-
-
-class TestCharacteristic(Characteristic):
-    """
-    Dummy test characteristic. Allows writing arbitrary bytes to its value, and
-    contains "extended properties", as well as a test descriptor.
-
-    """
-    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'
-
-    def __init__(self, bus, index, service):
-        Characteristic.__init__(
-                self, bus, index,
-                self.TEST_CHRC_UUID,
-                ['read', 'write', 'writable-auxiliaries'],
-                service)
-        self.value = []
-        self.add_descriptor(TestDescriptor(bus, 0, self))
-        self.add_descriptor(
-                CharacteristicUserDescriptionDescriptor(bus, 1, self))
-
-    def ReadValue(self):
-        print 'TestCharacteristic Read: ' + repr(self.value)
-        return self.value
-
-    def WriteValue(self, value):
-        print 'TestCharacteristic Write: ' + repr(value)
-        self.value = value
-
-
-class TestDescriptor(Descriptor):
-    """
-    Dummy test descriptor. Returns a static value.
-
-    """
-    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'
-
-    def __init__(self, bus, index, characteristic):
-        Descriptor.__init__(
-                self, bus, index,
-                self.TEST_DESC_UUID,
-                characteristic)
-
-    def ReadValue(self):
-        return [
-                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
-        ]
-
-
-class CharacteristicUserDescriptionDescriptor(Descriptor):
-    """
-    Writable CUD descriptor.
-
-    """
-    CUD_UUID = '2901'
-
-    def __init__(self, bus, index, characteristic):
-        self.writable = 'writable-auxiliaries' in characteristic.flags
-        self.value = array.array('B', 'This is a characteristic for testing')
-        self.value = self.value.tolist()
-        Descriptor.__init__(
-                self, bus, index,
-                self.CUD_UUID,
-                characteristic)
-
-    def ReadValue(self):
-        return self.value
-
-    def WriteValue(self, value):
-        if not self.writable:
-            raise NotPermittedException()
-        self.value = value
-
-
-def register_service_cb():
-    print 'GATT service registered'
-
-
-def register_service_error_cb(error):
-    print 'Failed to register service: ' + str(error)
-    mainloop.quit()
-
-
-def find_adapter(bus):
-    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
-                               DBUS_OM_IFACE)
-    objects = remote_om.GetManagedObjects()
-
-    for o, props in objects.iteritems():
-        if props.has_key(GATT_MANAGER_IFACE):
-            return o
-
-    return None
-
-def main():
-    global mainloop
-
-    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
-
-    bus = dbus.SystemBus()
-
-    adapter = find_adapter(bus)
-    if not adapter:
-        print 'GattManager1 interface not found'
-        return
-
-    service_manager = dbus.Interface(
-            bus.get_object(BLUEZ_SERVICE_NAME, adapter),
-            GATT_MANAGER_IFACE)
-
-    hr_service = HeartRateService(bus, 0)
-    bat_service = BatteryService(bus, 1)
-    test_service = TestService(bus, 2)
-
-    mainloop = gobject.MainLoop()
-
-    service_manager.RegisterService(hr_service.get_path(), {},
-                                    reply_handler=register_service_cb,
-                                    error_handler=register_service_error_cb)
-    service_manager.RegisterService(bat_service.get_path(), {},
-                                    reply_handler=register_service_cb,
-                                    error_handler=register_service_error_cb)
-    service_manager.RegisterService(test_service.get_path(), {},
-                                    reply_handler=register_service_cb,
-                                    error_handler=register_service_error_cb)
-
-    mainloop.run()
-
-if __name__ == '__main__':
-    main()
-- 
2.2.0.rc0.207.ga3a616c

--
To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux