[PATCH BlueZ 18/18] tools: Added a Python example for GATT server API

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch introduces tools/gatt-example, which demonstrates how an
external application can use the GattManager1 API to register GATT
services. This example simulates a fake Heart Rate service while also
providing a test service with a non-SIG UUID that exercises various
API behavior.
---
 tools/gatt-example | 484 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 484 insertions(+)
 create mode 100755 tools/gatt-example

diff --git a/tools/gatt-example b/tools/gatt-example
new file mode 100755
index 0000000..62f1b5c
--- /dev/null
+++ b/tools/gatt-example
@@ -0,0 +1,484 @@
+#!/usr/bin/python
+
+import dbus
+import dbus.exceptions
+import dbus.mainloop.glib
+import dbus.service
+
+import array
+import gobject
+
+from random import randint
+
+mainloop = None
+object_manager = None
+
+BLUEZ_SERVICE_NAME = 'org.bluez'
+GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+
+GATT_SERVICE_IFACE = 'org.bluez.GattService1'
+GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
+GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
+
+class InvalidArgsException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
+
+class NotSupportedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.NotSupported'
+
+class NotPermittedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.NotPermitted'
+
+class InvalidValueLengthException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
+
+class FailedException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.bluez.Error.Failed'
+
+
+class ExampleObjectManager(dbus.service.Object):
+    PATH = '/'
+
+    def __init__(self, bus):
+        self.services = []
+        dbus.service.Object.__init__(self, bus, self.PATH)
+
+    def get_path(self):
+        return dbus.ObjectPath(self.PATH)
+
+    def add_service(self, service):
+        self.services.append(service)
+
+    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+    def GetManagedObjects(self):
+        response = {}
+        print 'GetManagedObjects'
+
+        for service in self.services:
+            response[service.get_path()] = service.get_properties()
+            chrcs = service.get_characteristics()
+            for chrc in chrcs:
+                response[chrc.get_path()] = chrc.get_properties()
+                descs = chrc.get_descriptors()
+                for desc in descs:
+                    response[desc.get_path()] = desc.get_properties()
+
+        return response
+
+
+class Service(dbus.service.Object):
+    PATH_BASE = '/org/bluez/example/service'
+
+    def __init__(self, bus, index, uuid, primary):
+        self.path = self.PATH_BASE + str(index)
+        self.bus = bus
+        self.uuid = uuid
+        self.primary = primary
+        self.characteristics = []
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_SERVICE_IFACE: {
+                        'UUID': self.uuid,
+                        'Primary': self.primary,
+                        'Characteristics': dbus.Array(
+                                self.get_characteristic_paths(),
+                                signature='o')
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_characteristic(self, characteristic):
+        self.characteristics.append(characteristic)
+
+    def get_characteristic_paths(self):
+        result = []
+        for chrc in self.characteristics:
+            result.append(chrc.get_path())
+        return result
+
+    def get_characteristics(self):
+        return self.characteristics
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_SERVICE_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties[GATT_SERVICE_IFACE]
+
+
+class Characteristic(dbus.service.Object):
+    def __init__(self, bus, index, uuid, flags, service):
+        self.path = service.path + '/char' + str(index)
+        self.bus = bus
+        self.uuid = uuid
+        self.service = service
+        self.flags = flags
+        self.descriptors = []
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_CHRC_IFACE: {
+                        'Service': self.service.get_path(),
+                        'UUID': self.uuid,
+                        'Flags': self.flags,
+                        'Descriptors': dbus.Array(
+                                self.get_descriptor_paths(),
+                                signature='o')
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_descriptor(self, descriptor):
+        self.descriptors.append(descriptor)
+
+    def get_descriptor_paths(self):
+        result = []
+        for desc in self.descriptors:
+            result.append(desc.get_path())
+        return result
+
+    def get_descriptors(self):
+        return self.descriptors
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_CHRC_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties[GATT_CHRC_IFACE]
+
+    @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay')
+    def ReadValue(self):
+        print 'Default ReadValue called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay')
+    def WriteValue(self, value):
+        print 'Default WriteValue called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE)
+    def StartNotify(self):
+        print 'Default StartNotify called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE)
+    def StopNotify(self):
+        print 'Default StopNotify called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.signal(DBUS_PROP_IFACE,
+                         signature='sa{sv}as')
+    def PropertiesChanged(self, interface, changed, invalidated):
+        pass
+
+
+class Descriptor(dbus.service.Object):
+    def __init__(self, bus, index, uuid, characteristic):
+        self.path = characteristic.path + '/desc' + str(index)
+        self.bus = bus
+        self.uuid = uuid
+        self.chrc = characteristic
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_DESC_IFACE: {
+                        'Characteristic': self.chrc.get_path(),
+                        'UUID': self.uuid,
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_DESC_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties[GATT_CHRC_IFACE]
+
+    @dbus.service.method(GATT_DESC_IFACE, out_signature='ay')
+    def ReadValue(self):
+        print 'Default ReadValue called, returning error'
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_DESC_IFACE, in_signature='ay')
+    def WriteValue(self, value):
+        print 'Default WriteValue called, returning error'
+        raise NotSupportedException()
+
+
+class HeartRateService(Service):
+    """
+    Fake Heart Rate Service that simulates a fake heart beat and control point
+    behavior.
+
+    """
+    HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index):
+        Service.__init__(self, bus, index, self.HR_UUID, True)
+        self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))
+        self.add_characteristic(BodySensorLocationChrc(bus, 1, self))
+        self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))
+        self.energy_expended = 0
+
+
+class HeartRateMeasurementChrc(Characteristic):
+    HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.HR_MSRMT_UUID,
+                ['notify'],
+                service)
+        self.notifying = False;
+        self.hr_ee_count = 0
+
+    def hr_msrmt_cb(self):
+        value = []
+        value.append(dbus.Byte(0x06))
+
+        value.append(dbus.Byte(randint(90, 130)))
+
+        if self.hr_ee_count % 10 == 0:
+            value[0] = dbus.Byte(value[0] | 0x08)
+            value.append(dbus.Byte(self.service.energy_expended & 0xff))
+            value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))
+
+        self.service.energy_expended = \
+                min(0xffff, self.service.energy_expended + 1)
+        self.hr_ee_count += 1
+
+        print 'Updating value: ' + repr(value)
+
+        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])
+
+        return self.notifying
+
+    def _update_hr_msrmt_simulation(self):
+        print 'Update HR Measurement Simulation'
+
+        if not self.notifying:
+            return
+
+        gobject.timeout_add(1000, self.hr_msrmt_cb)
+
+    def StartNotify(self):
+        if self.notifying:
+            print 'Already notifying, nothing to do'
+            return
+
+        self.notifying = True
+        self._update_hr_msrmt_simulation()
+
+    def StopNotify(self):
+        if not self.notifying:
+            print 'Not notifying, nothing to do'
+            return
+
+        self.notifying = False
+        self._update_hr_msrmt_simulation()
+
+
+class BodySensorLocationChrc(Characteristic):
+    BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.BODY_SNSR_LOC_UUID,
+                ['read'],
+                service)
+
+    def ReadValue(self):
+        # Return 'Chest' as the sensor location.
+        return [ 0x01 ]
+
+class HeartRateControlPointChrc(Characteristic):
+    HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.HR_CTRL_PT_UUID,
+                ['write'],
+                service)
+
+    def WriteValue(self, value):
+        print 'Heart Rate Control Point WriteValue called'
+
+        if len(value) != 1:
+            raise InvalidValueLengthException()
+
+        byte = value[0]
+        print 'Control Point value: ' + repr(byte)
+
+        if byte != 1:
+            raise FailedException("0x80")
+
+        print 'Energy Expended field reset!'
+        self.service.energy_expended = 0
+
+
+class TestService(Service):
+    """
+    Dummy test service that provides characteristics and descriptors that
+    exercise various API functionality.
+
+    """
+    TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'
+
+    def __init__(self, bus, index):
+        Service.__init__(self, bus, index, self.TEST_SVC_UUID, False)
+        self.add_characteristic(TestCharacteristic(bus, 0, self))
+
+
+class TestCharacteristic(Characteristic):
+    """
+    Dummy test characteristic. Allows writing arbitrary bytes to its value, and
+    contains "extended properties", as well as a test descriptor.
+
+    """
+    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.TEST_CHRC_UUID,
+                ['read', 'write', 'writable-auxiliaries'],
+                service)
+        self.value = []
+        self.add_descriptor(TestDescriptor(bus, 0, self))
+        self.add_descriptor(
+                CharacteristicUserDescriptionDescriptor(bus, 1, self))
+
+    def ReadValue(self):
+        print 'TestCharacteristic Read: ' + repr(self.value)
+        return self.value
+
+    def WriteValue(self, value):
+        print 'TestCharacteristic Write: ' + repr(value)
+        self.value = value
+
+
+class TestDescriptor(Descriptor):
+    """
+    Dummy test descriptor. Returns a static value.
+
+    """
+    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'
+
+    def __init__(self, bus, index, characteristic):
+        Descriptor.__init__(
+                self, bus, index,
+                self.TEST_DESC_UUID,
+                characteristic)
+
+    def ReadValue(self):
+        return [
+                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
+        ]
+
+
+class CharacteristicUserDescriptionDescriptor(Descriptor):
+    """
+    Writable CUD descriptor.
+
+    """
+    CUD_UUID = '2901'
+
+    def __init__(self, bus, index, characteristic):
+        self.writable = 'writable-auxiliaries' in characteristic.flags
+        self.value = array.array('B', 'This is characteristic is for testing')
+        self.value = self.value.tolist()
+        Descriptor.__init__(
+                self, bus, index,
+                self.CUD_UUID,
+                characteristic)
+
+    def ReadValue(self):
+        return self.value
+
+    def WriteValue(self, value):
+        if not self.writable:
+            raise NotPermittedException()
+        self.value = value
+
+
+def register_service_cb():
+    print 'GATT service registered'
+
+
+def register_service_error_cb(error):
+    print 'Failed to register service: ' + str(error)
+    mainloop.quit()
+
+
+def find_adapter(bus):
+    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
+                               DBUS_OM_IFACE)
+    objects = remote_om.GetManagedObjects()
+
+    for o, props in objects.iteritems():
+        if props.has_key(GATT_MANAGER_IFACE):
+            return o
+
+    return None
+
+def main():
+    global object_manager
+    global mainloop
+
+    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+    bus = dbus.SystemBus()
+    object_manager = ExampleObjectManager(bus)
+
+    adapter = find_adapter(bus)
+    if not adapter:
+        print 'GattManager1 interface not found'
+        return
+
+    service_manager = dbus.Interface(
+            bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+            GATT_MANAGER_IFACE)
+
+    hr_service = HeartRateService(bus, 0)
+    object_manager.add_service(hr_service)
+
+    test_service = TestService(bus, 1)
+    object_manager.add_service(test_service)
+
+    mainloop = gobject.MainLoop()
+
+    service_manager.RegisterService(hr_service.get_path(), {},
+                                    reply_handler=register_service_cb,
+                                    error_handler=register_service_error_cb)
+    service_manager.RegisterService(test_service.get_path(), {},
+                                    reply_handler=register_service_cb,
+                                    error_handler=register_service_error_cb)
+
+    mainloop.run()
+
+if __name__ == '__main__':
+    main()
-- 
2.2.0.rc0.207.ga3a616c

--
To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux