This patch moves tools/gatt-example to test/ and renames it to example-gatt-server in preparation for example-gatt-client. The same naming convention (example-*) is also adopted for test/advertisement-example, so it's now called test/example-advertisement. Also, both scripts now have the execute set appropriately to be consistent with the other Python scripts. --- test/advertisement-example | 170 --------------- test/example-advertisement | 170 +++++++++++++++ test/example-gatt-server | 533 +++++++++++++++++++++++++++++++++++++++++++++ tools/gatt-example | 533 --------------------------------------------- 4 files changed, 703 insertions(+), 703 deletions(-) delete mode 100644 test/advertisement-example create mode 100755 test/example-advertisement create mode 100755 test/example-gatt-server delete mode 100644 tools/gatt-example diff --git a/test/advertisement-example b/test/advertisement-example deleted file mode 100644 index 98aeafa..0000000 --- a/test/advertisement-example +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/python - -import dbus -import dbus.exceptions -import dbus.mainloop.glib -import dbus.service - -import array -import gobject - -from random import randint - -mainloop = None - -BLUEZ_SERVICE_NAME = 'org.bluez' -LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' - -LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' - - -class InvalidArgsException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' - - -class NotSupportedException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.NotSupported' - - -class NotPermittedException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.NotPermitted' - - -class InvalidValueLengthException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.InvalidValueLength' - - -class FailedException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.Failed' - - -class Advertisement(dbus.service.Object): - PATH_BASE = '/org/bluez/example/advertisement' - - def __init__(self, bus, index, advertising_type): - self.path = self.PATH_BASE + str(index) - self.bus = bus - self.ad_type = advertising_type - self.service_uuids = None - self.manufacturer_data = None - self.solicit_uuids = None - self.service_data = None - dbus.service.Object.__init__(self, bus, self.path) - - def get_properties(self): - properties = dict() - properties['Type'] = self.ad_type - if self.service_uuids is not None: - properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, - signature='s') - if self.solicit_uuids is not None: - properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, - signature='s') - if self.manufacturer_data is not None: - properties['ManufacturerData'] = dbus.Dictionary( - self.manufacturer_data, signature='qay') - if self.service_data is not None: - properties['ServiceData'] = dbus.Dictionary(self.service_data, - signature='say') - return {LE_ADVERTISEMENT_IFACE: properties} - - def get_path(self): - return dbus.ObjectPath(self.path) - - def add_service_uuid(self, uuid): - if not self.service_uuids: - self.service_uuids = [] - self.service_uuids.append(uuid) - - def add_solicit_uuid(self, uuid): - if not self.solicit_uuids: - self.solicit_uuids = [] - self.solicit_uuids.append(uuid) - - def add_manufacturer_data(self, manuf_code, data): - if not self.manufacturer_data: - self.manufacturer_data = dict() - self.manufacturer_data[manuf_code] = data - - def add_service_data(self, uuid, data): - if not self.service_data: - self.service_data = dict() - self.service_data[uuid] = data - - @dbus.service.method(DBUS_PROP_IFACE, - in_signature='s', - out_signature='a{sv}') - def GetAll(self, interface): - print 'GetAll' - if interface != LE_ADVERTISEMENT_IFACE: - raise InvalidArgsException() - print 'returning props' - return self.get_properties()[LE_ADVERTISEMENT_IFACE] - - @dbus.service.method(LE_ADVERTISEMENT_IFACE, - in_signature='', - out_signature='') - def Release(self): - print '%s: Released!' % self.path - -class TestAdvertisement(Advertisement): - - def __init__(self, bus, index): - Advertisement.__init__(self, bus, index, 'broadcast') - self.add_service_uuid('0000180D-0000-1000-8000-00805F9B34FB') - self.add_service_uuid('0000180F-0000-1000-8000-00805F9B34FB') - self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04]) - self.add_service_data('00009999-0000-1000-8000-00805F9B34FB', - [0x00, 0x01, 0x02, 0x03, 0x04]) - - -def register_ad_cb(): - print 'Advertisement registered' - - -def register_ad_error_cb(error): - print 'Failed to register advertisement: ' + str(error) - mainloop.quit() - - -def find_adapter(bus): - remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), - DBUS_OM_IFACE) - objects = remote_om.GetManagedObjects() - - for o, props in objects.iteritems(): - if LE_ADVERTISING_MANAGER_IFACE in props: - return o - - return None - - -def main(): - global mainloop - - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - - bus = dbus.SystemBus() - - adapter = find_adapter(bus) - if not adapter: - print 'LEAdvertisingManager1 interface not found' - return - - ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), - LE_ADVERTISING_MANAGER_IFACE) - - test_advertisement = TestAdvertisement(bus, 0) - - mainloop = gobject.MainLoop() - - ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, - reply_handler=register_ad_cb, - error_handler=register_ad_error_cb) - - mainloop.run() - -if __name__ == '__main__': - main() diff --git a/test/example-advertisement b/test/example-advertisement new file mode 100755 index 0000000..98aeafa --- /dev/null +++ b/test/example-advertisement @@ -0,0 +1,170 @@ +#!/usr/bin/python + +import dbus +import dbus.exceptions +import dbus.mainloop.glib +import dbus.service + +import array +import gobject + +from random import randint + +mainloop = None + +BLUEZ_SERVICE_NAME = 'org.bluez' +LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' + + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + + +class NotSupportedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotSupported' + + +class NotPermittedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotPermitted' + + +class InvalidValueLengthException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.InvalidValueLength' + + +class FailedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.Failed' + + +class Advertisement(dbus.service.Object): + PATH_BASE = '/org/bluez/example/advertisement' + + def __init__(self, bus, index, advertising_type): + self.path = self.PATH_BASE + str(index) + self.bus = bus + self.ad_type = advertising_type + self.service_uuids = None + self.manufacturer_data = None + self.solicit_uuids = None + self.service_data = None + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + properties = dict() + properties['Type'] = self.ad_type + if self.service_uuids is not None: + properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, + signature='s') + if self.solicit_uuids is not None: + properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, + signature='s') + if self.manufacturer_data is not None: + properties['ManufacturerData'] = dbus.Dictionary( + self.manufacturer_data, signature='qay') + if self.service_data is not None: + properties['ServiceData'] = dbus.Dictionary(self.service_data, + signature='say') + return {LE_ADVERTISEMENT_IFACE: properties} + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_service_uuid(self, uuid): + if not self.service_uuids: + self.service_uuids = [] + self.service_uuids.append(uuid) + + def add_solicit_uuid(self, uuid): + if not self.solicit_uuids: + self.solicit_uuids = [] + self.solicit_uuids.append(uuid) + + def add_manufacturer_data(self, manuf_code, data): + if not self.manufacturer_data: + self.manufacturer_data = dict() + self.manufacturer_data[manuf_code] = data + + def add_service_data(self, uuid, data): + if not self.service_data: + self.service_data = dict() + self.service_data[uuid] = data + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + print 'GetAll' + if interface != LE_ADVERTISEMENT_IFACE: + raise InvalidArgsException() + print 'returning props' + return self.get_properties()[LE_ADVERTISEMENT_IFACE] + + @dbus.service.method(LE_ADVERTISEMENT_IFACE, + in_signature='', + out_signature='') + def Release(self): + print '%s: Released!' % self.path + +class TestAdvertisement(Advertisement): + + def __init__(self, bus, index): + Advertisement.__init__(self, bus, index, 'broadcast') + self.add_service_uuid('0000180D-0000-1000-8000-00805F9B34FB') + self.add_service_uuid('0000180F-0000-1000-8000-00805F9B34FB') + self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04]) + self.add_service_data('00009999-0000-1000-8000-00805F9B34FB', + [0x00, 0x01, 0x02, 0x03, 0x04]) + + +def register_ad_cb(): + print 'Advertisement registered' + + +def register_ad_error_cb(error): + print 'Failed to register advertisement: ' + str(error) + mainloop.quit() + + +def find_adapter(bus): + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), + DBUS_OM_IFACE) + objects = remote_om.GetManagedObjects() + + for o, props in objects.iteritems(): + if LE_ADVERTISING_MANAGER_IFACE in props: + return o + + return None + + +def main(): + global mainloop + + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + + adapter = find_adapter(bus) + if not adapter: + print 'LEAdvertisingManager1 interface not found' + return + + ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + LE_ADVERTISING_MANAGER_IFACE) + + test_advertisement = TestAdvertisement(bus, 0) + + mainloop = gobject.MainLoop() + + ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, + reply_handler=register_ad_cb, + error_handler=register_ad_error_cb) + + mainloop.run() + +if __name__ == '__main__': + main() diff --git a/test/example-gatt-server b/test/example-gatt-server new file mode 100755 index 0000000..a6f5cbe --- /dev/null +++ b/test/example-gatt-server @@ -0,0 +1,533 @@ +#!/usr/bin/python + +import dbus +import dbus.exceptions +import dbus.mainloop.glib +import dbus.service + +import array +import gobject + +from random import randint + +mainloop = None + +BLUEZ_SERVICE_NAME = 'org.bluez' +GATT_MANAGER_IFACE = 'org.bluez.GattManager1' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +GATT_SERVICE_IFACE = 'org.bluez.GattService1' +GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' +GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + +class NotSupportedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotSupported' + +class NotPermittedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotPermitted' + +class InvalidValueLengthException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.InvalidValueLength' + +class FailedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.Failed' + + +class Service(dbus.service.Object): + PATH_BASE = '/org/bluez/example/service' + + def __init__(self, bus, index, uuid, primary): + self.path = self.PATH_BASE + str(index) + self.bus = bus + self.uuid = uuid + self.primary = primary + self.characteristics = [] + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_SERVICE_IFACE: { + 'UUID': self.uuid, + 'Primary': self.primary, + 'Characteristics': dbus.Array( + self.get_characteristic_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_characteristic(self, characteristic): + self.characteristics.append(characteristic) + + def get_characteristic_paths(self): + result = [] + for chrc in self.characteristics: + result.append(chrc.get_path()) + return result + + def get_characteristics(self): + return self.characteristics + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_SERVICE_IFACE: + raise InvalidArgsException() + + return self.get_properties[GATT_SERVICE_IFACE] + + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') + def GetManagedObjects(self): + response = {} + print 'GetManagedObjects' + + response[self.get_path()] = self.get_properties() + chrcs = self.get_characteristics() + for chrc in chrcs: + response[chrc.get_path()] = chrc.get_properties() + descs = chrc.get_descriptors() + for desc in descs: + response[desc.get_path()] = desc.get_properties() + + return response + + +class Characteristic(dbus.service.Object): + def __init__(self, bus, index, uuid, flags, service): + self.path = service.path + '/char' + str(index) + self.bus = bus + self.uuid = uuid + self.service = service + self.flags = flags + self.descriptors = [] + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_CHRC_IFACE: { + 'Service': self.service.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + 'Descriptors': dbus.Array( + self.get_descriptor_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_descriptor(self, descriptor): + self.descriptors.append(descriptor) + + def get_descriptor_paths(self): + result = [] + for desc in self.descriptors: + result.append(desc.get_path()) + return result + + def get_descriptors(self): + return self.descriptors + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_CHRC_IFACE: + raise InvalidArgsException() + + return self.get_properties[GATT_CHRC_IFACE] + + @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay') + def ReadValue(self): + print 'Default ReadValue called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay') + def WriteValue(self, value): + print 'Default WriteValue called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StartNotify(self): + print 'Default StartNotify called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StopNotify(self): + print 'Default StopNotify called, returning error' + raise NotSupportedException() + + @dbus.service.signal(DBUS_PROP_IFACE, + signature='sa{sv}as') + def PropertiesChanged(self, interface, changed, invalidated): + pass + + +class Descriptor(dbus.service.Object): + def __init__(self, bus, index, uuid, characteristic): + self.path = characteristic.path + '/desc' + str(index) + self.bus = bus + self.uuid = uuid + self.chrc = characteristic + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_DESC_IFACE: { + 'Characteristic': self.chrc.get_path(), + 'UUID': self.uuid, + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_DESC_IFACE: + raise InvalidArgsException() + + return self.get_properties[GATT_CHRC_IFACE] + + @dbus.service.method(GATT_DESC_IFACE, out_signature='ay') + def ReadValue(self): + print 'Default ReadValue called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_DESC_IFACE, in_signature='ay') + def WriteValue(self, value): + print 'Default WriteValue called, returning error' + raise NotSupportedException() + + +class HeartRateService(Service): + """ + Fake Heart Rate Service that simulates a fake heart beat and control point + behavior. + + """ + HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index): + Service.__init__(self, bus, index, self.HR_UUID, True) + self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) + self.add_characteristic(BodySensorLocationChrc(bus, 1, self)) + self.add_characteristic(HeartRateControlPointChrc(bus, 2, self)) + self.energy_expended = 0 + + +class HeartRateMeasurementChrc(Characteristic): + HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.HR_MSRMT_UUID, + ['notify'], + service) + self.notifying = False + self.hr_ee_count = 0 + + def hr_msrmt_cb(self): + value = [] + value.append(dbus.Byte(0x06)) + + value.append(dbus.Byte(randint(90, 130))) + + if self.hr_ee_count % 10 == 0: + value[0] = dbus.Byte(value[0] | 0x08) + value.append(dbus.Byte(self.service.energy_expended & 0xff)) + value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) + + self.service.energy_expended = \ + min(0xffff, self.service.energy_expended + 1) + self.hr_ee_count += 1 + + print 'Updating value: ' + repr(value) + + self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) + + return self.notifying + + def _update_hr_msrmt_simulation(self): + print 'Update HR Measurement Simulation' + + if not self.notifying: + return + + gobject.timeout_add(1000, self.hr_msrmt_cb) + + def StartNotify(self): + if self.notifying: + print 'Already notifying, nothing to do' + return + + self.notifying = True + self._update_hr_msrmt_simulation() + + def StopNotify(self): + if not self.notifying: + print 'Not notifying, nothing to do' + return + + self.notifying = False + self._update_hr_msrmt_simulation() + + +class BodySensorLocationChrc(Characteristic): + BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.BODY_SNSR_LOC_UUID, + ['read'], + service) + + def ReadValue(self): + # Return 'Chest' as the sensor location. + return [ 0x01 ] + +class HeartRateControlPointChrc(Characteristic): + HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.HR_CTRL_PT_UUID, + ['write'], + service) + + def WriteValue(self, value): + print 'Heart Rate Control Point WriteValue called' + + if len(value) != 1: + raise InvalidValueLengthException() + + byte = value[0] + print 'Control Point value: ' + repr(byte) + + if byte != 1: + raise FailedException("0x80") + + print 'Energy Expended field reset!' + self.service.energy_expended = 0 + + +class BatteryService(Service): + """ + Fake Battery service that emulates a draining battery. + + """ + BATTERY_UUID = '180f' + + def __init__(self, bus, index): + Service.__init__(self, bus, index, self.BATTERY_UUID, True) + self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self)) + + +class BatteryLevelCharacteristic(Characteristic): + """ + Fake Battery Level characteristic. The battery level is drained by 2 points + every 5 seconds. + + """ + BATTERY_LVL_UUID = '2a19' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.BATTERY_LVL_UUID, + ['read', 'notify'], + service) + self.notifying = False + self.battery_lvl = 100 + gobject.timeout_add(5000, self.drain_battery) + + def notify_battery_level(self): + if not self.notifying: + return + self.PropertiesChanged( + GATT_CHRC_IFACE, + { 'Value': [dbus.Byte(self.battery_lvl)] }, []) + + def drain_battery(self): + if self.battery_lvl > 0: + self.battery_lvl -= 2 + if self.battery_lvl < 0: + self.battery_lvl = 0 + print 'Battery Level drained: ' + repr(self.battery_lvl) + self.notify_battery_level() + return True + + def ReadValue(self): + print 'Battery Level read: ' + repr(self.battery_lvl) + return [dbus.Byte(self.battery_lvl)] + + def StartNotify(self): + if self.notifying: + print 'Already notifying, nothing to do' + return + + self.notifying = True + self.notify_battery_level() + + def StopNotify(self): + if not self.notifying: + print 'Not notifying, nothing to do' + return + + self.notifying = False + + +class TestService(Service): + """ + Dummy test service that provides characteristics and descriptors that + exercise various API functionality. + + """ + TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' + + def __init__(self, bus, index): + Service.__init__(self, bus, index, self.TEST_SVC_UUID, False) + self.add_characteristic(TestCharacteristic(bus, 0, self)) + + +class TestCharacteristic(Characteristic): + """ + Dummy test characteristic. Allows writing arbitrary bytes to its value, and + contains "extended properties", as well as a test descriptor. + + """ + TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.TEST_CHRC_UUID, + ['read', 'write', 'writable-auxiliaries'], + service) + self.value = [] + self.add_descriptor(TestDescriptor(bus, 0, self)) + self.add_descriptor( + CharacteristicUserDescriptionDescriptor(bus, 1, self)) + + def ReadValue(self): + print 'TestCharacteristic Read: ' + repr(self.value) + return self.value + + def WriteValue(self, value): + print 'TestCharacteristic Write: ' + repr(value) + self.value = value + + +class TestDescriptor(Descriptor): + """ + Dummy test descriptor. Returns a static value. + + """ + TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2' + + def __init__(self, bus, index, characteristic): + Descriptor.__init__( + self, bus, index, + self.TEST_DESC_UUID, + characteristic) + + def ReadValue(self): + return [ + dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') + ] + + +class CharacteristicUserDescriptionDescriptor(Descriptor): + """ + Writable CUD descriptor. + + """ + CUD_UUID = '2901' + + def __init__(self, bus, index, characteristic): + self.writable = 'writable-auxiliaries' in characteristic.flags + self.value = array.array('B', 'This is a characteristic for testing') + self.value = self.value.tolist() + Descriptor.__init__( + self, bus, index, + self.CUD_UUID, + characteristic) + + def ReadValue(self): + return self.value + + def WriteValue(self, value): + if not self.writable: + raise NotPermittedException() + self.value = value + + +def register_service_cb(): + print 'GATT service registered' + + +def register_service_error_cb(error): + print 'Failed to register service: ' + str(error) + mainloop.quit() + + +def find_adapter(bus): + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), + DBUS_OM_IFACE) + objects = remote_om.GetManagedObjects() + + for o, props in objects.iteritems(): + if props.has_key(GATT_MANAGER_IFACE): + return o + + return None + +def main(): + global mainloop + + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + + adapter = find_adapter(bus) + if not adapter: + print 'GattManager1 interface not found' + return + + service_manager = dbus.Interface( + bus.get_object(BLUEZ_SERVICE_NAME, adapter), + GATT_MANAGER_IFACE) + + hr_service = HeartRateService(bus, 0) + bat_service = BatteryService(bus, 1) + test_service = TestService(bus, 2) + + mainloop = gobject.MainLoop() + + service_manager.RegisterService(hr_service.get_path(), {}, + reply_handler=register_service_cb, + error_handler=register_service_error_cb) + service_manager.RegisterService(bat_service.get_path(), {}, + reply_handler=register_service_cb, + error_handler=register_service_error_cb) + service_manager.RegisterService(test_service.get_path(), {}, + reply_handler=register_service_cb, + error_handler=register_service_error_cb) + + mainloop.run() + +if __name__ == '__main__': + main() diff --git a/tools/gatt-example b/tools/gatt-example deleted file mode 100644 index a6f5cbe..0000000 --- a/tools/gatt-example +++ /dev/null @@ -1,533 +0,0 @@ -#!/usr/bin/python - -import dbus -import dbus.exceptions -import dbus.mainloop.glib -import dbus.service - -import array -import gobject - -from random import randint - -mainloop = None - -BLUEZ_SERVICE_NAME = 'org.bluez' -GATT_MANAGER_IFACE = 'org.bluez.GattManager1' -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' - -GATT_SERVICE_IFACE = 'org.bluez.GattService1' -GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' -GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' - -class InvalidArgsException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' - -class NotSupportedException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.NotSupported' - -class NotPermittedException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.NotPermitted' - -class InvalidValueLengthException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.InvalidValueLength' - -class FailedException(dbus.exceptions.DBusException): - _dbus_error_name = 'org.bluez.Error.Failed' - - -class Service(dbus.service.Object): - PATH_BASE = '/org/bluez/example/service' - - def __init__(self, bus, index, uuid, primary): - self.path = self.PATH_BASE + str(index) - self.bus = bus - self.uuid = uuid - self.primary = primary - self.characteristics = [] - dbus.service.Object.__init__(self, bus, self.path) - - def get_properties(self): - return { - GATT_SERVICE_IFACE: { - 'UUID': self.uuid, - 'Primary': self.primary, - 'Characteristics': dbus.Array( - self.get_characteristic_paths(), - signature='o') - } - } - - def get_path(self): - return dbus.ObjectPath(self.path) - - def add_characteristic(self, characteristic): - self.characteristics.append(characteristic) - - def get_characteristic_paths(self): - result = [] - for chrc in self.characteristics: - result.append(chrc.get_path()) - return result - - def get_characteristics(self): - return self.characteristics - - @dbus.service.method(DBUS_PROP_IFACE, - in_signature='s', - out_signature='a{sv}') - def GetAll(self, interface): - if interface != GATT_SERVICE_IFACE: - raise InvalidArgsException() - - return self.get_properties[GATT_SERVICE_IFACE] - - @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') - def GetManagedObjects(self): - response = {} - print 'GetManagedObjects' - - response[self.get_path()] = self.get_properties() - chrcs = self.get_characteristics() - for chrc in chrcs: - response[chrc.get_path()] = chrc.get_properties() - descs = chrc.get_descriptors() - for desc in descs: - response[desc.get_path()] = desc.get_properties() - - return response - - -class Characteristic(dbus.service.Object): - def __init__(self, bus, index, uuid, flags, service): - self.path = service.path + '/char' + str(index) - self.bus = bus - self.uuid = uuid - self.service = service - self.flags = flags - self.descriptors = [] - dbus.service.Object.__init__(self, bus, self.path) - - def get_properties(self): - return { - GATT_CHRC_IFACE: { - 'Service': self.service.get_path(), - 'UUID': self.uuid, - 'Flags': self.flags, - 'Descriptors': dbus.Array( - self.get_descriptor_paths(), - signature='o') - } - } - - def get_path(self): - return dbus.ObjectPath(self.path) - - def add_descriptor(self, descriptor): - self.descriptors.append(descriptor) - - def get_descriptor_paths(self): - result = [] - for desc in self.descriptors: - result.append(desc.get_path()) - return result - - def get_descriptors(self): - return self.descriptors - - @dbus.service.method(DBUS_PROP_IFACE, - in_signature='s', - out_signature='a{sv}') - def GetAll(self, interface): - if interface != GATT_CHRC_IFACE: - raise InvalidArgsException() - - return self.get_properties[GATT_CHRC_IFACE] - - @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay') - def ReadValue(self): - print 'Default ReadValue called, returning error' - raise NotSupportedException() - - @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay') - def WriteValue(self, value): - print 'Default WriteValue called, returning error' - raise NotSupportedException() - - @dbus.service.method(GATT_CHRC_IFACE) - def StartNotify(self): - print 'Default StartNotify called, returning error' - raise NotSupportedException() - - @dbus.service.method(GATT_CHRC_IFACE) - def StopNotify(self): - print 'Default StopNotify called, returning error' - raise NotSupportedException() - - @dbus.service.signal(DBUS_PROP_IFACE, - signature='sa{sv}as') - def PropertiesChanged(self, interface, changed, invalidated): - pass - - -class Descriptor(dbus.service.Object): - def __init__(self, bus, index, uuid, characteristic): - self.path = characteristic.path + '/desc' + str(index) - self.bus = bus - self.uuid = uuid - self.chrc = characteristic - dbus.service.Object.__init__(self, bus, self.path) - - def get_properties(self): - return { - GATT_DESC_IFACE: { - 'Characteristic': self.chrc.get_path(), - 'UUID': self.uuid, - } - } - - def get_path(self): - return dbus.ObjectPath(self.path) - - @dbus.service.method(DBUS_PROP_IFACE, - in_signature='s', - out_signature='a{sv}') - def GetAll(self, interface): - if interface != GATT_DESC_IFACE: - raise InvalidArgsException() - - return self.get_properties[GATT_CHRC_IFACE] - - @dbus.service.method(GATT_DESC_IFACE, out_signature='ay') - def ReadValue(self): - print 'Default ReadValue called, returning error' - raise NotSupportedException() - - @dbus.service.method(GATT_DESC_IFACE, in_signature='ay') - def WriteValue(self, value): - print 'Default WriteValue called, returning error' - raise NotSupportedException() - - -class HeartRateService(Service): - """ - Fake Heart Rate Service that simulates a fake heart beat and control point - behavior. - - """ - HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb' - - def __init__(self, bus, index): - Service.__init__(self, bus, index, self.HR_UUID, True) - self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) - self.add_characteristic(BodySensorLocationChrc(bus, 1, self)) - self.add_characteristic(HeartRateControlPointChrc(bus, 2, self)) - self.energy_expended = 0 - - -class HeartRateMeasurementChrc(Characteristic): - HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' - - def __init__(self, bus, index, service): - Characteristic.__init__( - self, bus, index, - self.HR_MSRMT_UUID, - ['notify'], - service) - self.notifying = False - self.hr_ee_count = 0 - - def hr_msrmt_cb(self): - value = [] - value.append(dbus.Byte(0x06)) - - value.append(dbus.Byte(randint(90, 130))) - - if self.hr_ee_count % 10 == 0: - value[0] = dbus.Byte(value[0] | 0x08) - value.append(dbus.Byte(self.service.energy_expended & 0xff)) - value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) - - self.service.energy_expended = \ - min(0xffff, self.service.energy_expended + 1) - self.hr_ee_count += 1 - - print 'Updating value: ' + repr(value) - - self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) - - return self.notifying - - def _update_hr_msrmt_simulation(self): - print 'Update HR Measurement Simulation' - - if not self.notifying: - return - - gobject.timeout_add(1000, self.hr_msrmt_cb) - - def StartNotify(self): - if self.notifying: - print 'Already notifying, nothing to do' - return - - self.notifying = True - self._update_hr_msrmt_simulation() - - def StopNotify(self): - if not self.notifying: - print 'Not notifying, nothing to do' - return - - self.notifying = False - self._update_hr_msrmt_simulation() - - -class BodySensorLocationChrc(Characteristic): - BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' - - def __init__(self, bus, index, service): - Characteristic.__init__( - self, bus, index, - self.BODY_SNSR_LOC_UUID, - ['read'], - service) - - def ReadValue(self): - # Return 'Chest' as the sensor location. - return [ 0x01 ] - -class HeartRateControlPointChrc(Characteristic): - HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' - - def __init__(self, bus, index, service): - Characteristic.__init__( - self, bus, index, - self.HR_CTRL_PT_UUID, - ['write'], - service) - - def WriteValue(self, value): - print 'Heart Rate Control Point WriteValue called' - - if len(value) != 1: - raise InvalidValueLengthException() - - byte = value[0] - print 'Control Point value: ' + repr(byte) - - if byte != 1: - raise FailedException("0x80") - - print 'Energy Expended field reset!' - self.service.energy_expended = 0 - - -class BatteryService(Service): - """ - Fake Battery service that emulates a draining battery. - - """ - BATTERY_UUID = '180f' - - def __init__(self, bus, index): - Service.__init__(self, bus, index, self.BATTERY_UUID, True) - self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self)) - - -class BatteryLevelCharacteristic(Characteristic): - """ - Fake Battery Level characteristic. The battery level is drained by 2 points - every 5 seconds. - - """ - BATTERY_LVL_UUID = '2a19' - - def __init__(self, bus, index, service): - Characteristic.__init__( - self, bus, index, - self.BATTERY_LVL_UUID, - ['read', 'notify'], - service) - self.notifying = False - self.battery_lvl = 100 - gobject.timeout_add(5000, self.drain_battery) - - def notify_battery_level(self): - if not self.notifying: - return - self.PropertiesChanged( - GATT_CHRC_IFACE, - { 'Value': [dbus.Byte(self.battery_lvl)] }, []) - - def drain_battery(self): - if self.battery_lvl > 0: - self.battery_lvl -= 2 - if self.battery_lvl < 0: - self.battery_lvl = 0 - print 'Battery Level drained: ' + repr(self.battery_lvl) - self.notify_battery_level() - return True - - def ReadValue(self): - print 'Battery Level read: ' + repr(self.battery_lvl) - return [dbus.Byte(self.battery_lvl)] - - def StartNotify(self): - if self.notifying: - print 'Already notifying, nothing to do' - return - - self.notifying = True - self.notify_battery_level() - - def StopNotify(self): - if not self.notifying: - print 'Not notifying, nothing to do' - return - - self.notifying = False - - -class TestService(Service): - """ - Dummy test service that provides characteristics and descriptors that - exercise various API functionality. - - """ - TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' - - def __init__(self, bus, index): - Service.__init__(self, bus, index, self.TEST_SVC_UUID, False) - self.add_characteristic(TestCharacteristic(bus, 0, self)) - - -class TestCharacteristic(Characteristic): - """ - Dummy test characteristic. Allows writing arbitrary bytes to its value, and - contains "extended properties", as well as a test descriptor. - - """ - TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' - - def __init__(self, bus, index, service): - Characteristic.__init__( - self, bus, index, - self.TEST_CHRC_UUID, - ['read', 'write', 'writable-auxiliaries'], - service) - self.value = [] - self.add_descriptor(TestDescriptor(bus, 0, self)) - self.add_descriptor( - CharacteristicUserDescriptionDescriptor(bus, 1, self)) - - def ReadValue(self): - print 'TestCharacteristic Read: ' + repr(self.value) - return self.value - - def WriteValue(self, value): - print 'TestCharacteristic Write: ' + repr(value) - self.value = value - - -class TestDescriptor(Descriptor): - """ - Dummy test descriptor. Returns a static value. - - """ - TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2' - - def __init__(self, bus, index, characteristic): - Descriptor.__init__( - self, bus, index, - self.TEST_DESC_UUID, - characteristic) - - def ReadValue(self): - return [ - dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') - ] - - -class CharacteristicUserDescriptionDescriptor(Descriptor): - """ - Writable CUD descriptor. - - """ - CUD_UUID = '2901' - - def __init__(self, bus, index, characteristic): - self.writable = 'writable-auxiliaries' in characteristic.flags - self.value = array.array('B', 'This is a characteristic for testing') - self.value = self.value.tolist() - Descriptor.__init__( - self, bus, index, - self.CUD_UUID, - characteristic) - - def ReadValue(self): - return self.value - - def WriteValue(self, value): - if not self.writable: - raise NotPermittedException() - self.value = value - - -def register_service_cb(): - print 'GATT service registered' - - -def register_service_error_cb(error): - print 'Failed to register service: ' + str(error) - mainloop.quit() - - -def find_adapter(bus): - remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), - DBUS_OM_IFACE) - objects = remote_om.GetManagedObjects() - - for o, props in objects.iteritems(): - if props.has_key(GATT_MANAGER_IFACE): - return o - - return None - -def main(): - global mainloop - - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - - bus = dbus.SystemBus() - - adapter = find_adapter(bus) - if not adapter: - print 'GattManager1 interface not found' - return - - service_manager = dbus.Interface( - bus.get_object(BLUEZ_SERVICE_NAME, adapter), - GATT_MANAGER_IFACE) - - hr_service = HeartRateService(bus, 0) - bat_service = BatteryService(bus, 1) - test_service = TestService(bus, 2) - - mainloop = gobject.MainLoop() - - service_manager.RegisterService(hr_service.get_path(), {}, - reply_handler=register_service_cb, - error_handler=register_service_error_cb) - service_manager.RegisterService(bat_service.get_path(), {}, - reply_handler=register_service_cb, - error_handler=register_service_error_cb) - service_manager.RegisterService(test_service.get_path(), {}, - reply_handler=register_service_cb, - error_handler=register_service_error_cb) - - mainloop.run() - -if __name__ == '__main__': - main() -- 2.2.0.rc0.207.ga3a616c -- To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html