Hi, > On Tue, Mar 31, 2015 at 5:53 PM, Arman Uguray <armansito@xxxxxxxxxxxx> wrote: > This patch moves tools/gatt-example to test/ and renames it to > example-gatt-server in preparation for example-gatt-client. > The same naming convention (example-*) is also adopted for > test/advertisement-example, so it's now called > test/example-advertisement. Also, both scripts now have the execute > set appropriately to be consistent with the other Python scripts. > --- > test/advertisement-example | 170 --------------- > test/example-advertisement | 170 +++++++++++++++ > test/example-gatt-server | 533 +++++++++++++++++++++++++++++++++++++++++++++ > tools/gatt-example | 533 --------------------------------------------- > 4 files changed, 703 insertions(+), 703 deletions(-) > delete mode 100644 test/advertisement-example > create mode 100755 test/example-advertisement > create mode 100755 test/example-gatt-server > delete mode 100644 tools/gatt-example > > diff --git a/test/advertisement-example b/test/advertisement-example > deleted file mode 100644 > index 98aeafa..0000000 > --- a/test/advertisement-example > +++ /dev/null > @@ -1,170 +0,0 @@ > -#!/usr/bin/python > - > -import dbus > -import dbus.exceptions > -import dbus.mainloop.glib > -import dbus.service > - > -import array > -import gobject > - > -from random import randint > - > -mainloop = None > - > -BLUEZ_SERVICE_NAME = 'org.bluez' > -LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' > -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' > -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' > - > -LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' > - > - > -class InvalidArgsException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' > - > - > -class NotSupportedException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.NotSupported' > - > - > -class NotPermittedException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.NotPermitted' > - > - > -class InvalidValueLengthException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.InvalidValueLength' > - > - > -class FailedException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.Failed' > - > - > -class Advertisement(dbus.service.Object): > - PATH_BASE = '/org/bluez/example/advertisement' > - > - def __init__(self, bus, index, advertising_type): > - self.path = self.PATH_BASE + str(index) > - self.bus = bus > - self.ad_type = advertising_type > - self.service_uuids = None > - self.manufacturer_data = None > - self.solicit_uuids = None > - self.service_data = None > - dbus.service.Object.__init__(self, bus, self.path) > - > - def get_properties(self): > - properties = dict() > - properties['Type'] = self.ad_type > - if self.service_uuids is not None: > - properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, > - signature='s') > - if self.solicit_uuids is not None: > - properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, > - signature='s') > - if self.manufacturer_data is not None: > - properties['ManufacturerData'] = dbus.Dictionary( > - self.manufacturer_data, signature='qay') > - if self.service_data is not None: > - properties['ServiceData'] = dbus.Dictionary(self.service_data, > - signature='say') > - return {LE_ADVERTISEMENT_IFACE: properties} > - > - def get_path(self): > - return dbus.ObjectPath(self.path) > - > - def add_service_uuid(self, uuid): > - if not self.service_uuids: > - self.service_uuids = [] > - self.service_uuids.append(uuid) > - > - def add_solicit_uuid(self, uuid): > - if not self.solicit_uuids: > - self.solicit_uuids = [] > - self.solicit_uuids.append(uuid) > - > - def add_manufacturer_data(self, manuf_code, data): > - if not self.manufacturer_data: > - self.manufacturer_data = dict() > - self.manufacturer_data[manuf_code] = data > - > - def add_service_data(self, uuid, data): > - if not self.service_data: > - self.service_data = dict() > - self.service_data[uuid] = data > - > - @dbus.service.method(DBUS_PROP_IFACE, > - in_signature='s', > - out_signature='a{sv}') > - def GetAll(self, interface): > - print 'GetAll' > - if interface != LE_ADVERTISEMENT_IFACE: > - raise InvalidArgsException() > - print 'returning props' > - return self.get_properties()[LE_ADVERTISEMENT_IFACE] > - > - @dbus.service.method(LE_ADVERTISEMENT_IFACE, > - in_signature='', > - out_signature='') > - def Release(self): > - print '%s: Released!' % self.path > - > -class TestAdvertisement(Advertisement): > - > - def __init__(self, bus, index): > - Advertisement.__init__(self, bus, index, 'broadcast') > - self.add_service_uuid('0000180D-0000-1000-8000-00805F9B34FB') > - self.add_service_uuid('0000180F-0000-1000-8000-00805F9B34FB') > - self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04]) > - self.add_service_data('00009999-0000-1000-8000-00805F9B34FB', > - [0x00, 0x01, 0x02, 0x03, 0x04]) > - > - > -def register_ad_cb(): > - print 'Advertisement registered' > - > - > -def register_ad_error_cb(error): > - print 'Failed to register advertisement: ' + str(error) > - mainloop.quit() > - > - > -def find_adapter(bus): > - remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), > - DBUS_OM_IFACE) > - objects = remote_om.GetManagedObjects() > - > - for o, props in objects.iteritems(): > - if LE_ADVERTISING_MANAGER_IFACE in props: > - return o > - > - return None > - > - > -def main(): > - global mainloop > - > - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) > - > - bus = dbus.SystemBus() > - > - adapter = find_adapter(bus) > - if not adapter: > - print 'LEAdvertisingManager1 interface not found' > - return > - > - ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), > - LE_ADVERTISING_MANAGER_IFACE) > - > - test_advertisement = TestAdvertisement(bus, 0) > - > - mainloop = gobject.MainLoop() > - > - ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, > - reply_handler=register_ad_cb, > - error_handler=register_ad_error_cb) > - > - mainloop.run() > - > -if __name__ == '__main__': > - main() > diff --git a/test/example-advertisement b/test/example-advertisement > new file mode 100755 > index 0000000..98aeafa > --- /dev/null > +++ b/test/example-advertisement > @@ -0,0 +1,170 @@ > +#!/usr/bin/python > + > +import dbus > +import dbus.exceptions > +import dbus.mainloop.glib > +import dbus.service > + > +import array > +import gobject > + > +from random import randint > + > +mainloop = None > + > +BLUEZ_SERVICE_NAME = 'org.bluez' > +LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' > +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' > +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' > + > +LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' > + > + > +class InvalidArgsException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' > + > + > +class NotSupportedException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.NotSupported' > + > + > +class NotPermittedException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.NotPermitted' > + > + > +class InvalidValueLengthException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.InvalidValueLength' > + > + > +class FailedException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.Failed' > + > + > +class Advertisement(dbus.service.Object): > + PATH_BASE = '/org/bluez/example/advertisement' > + > + def __init__(self, bus, index, advertising_type): > + self.path = self.PATH_BASE + str(index) > + self.bus = bus > + self.ad_type = advertising_type > + self.service_uuids = None > + self.manufacturer_data = None > + self.solicit_uuids = None > + self.service_data = None > + dbus.service.Object.__init__(self, bus, self.path) > + > + def get_properties(self): > + properties = dict() > + properties['Type'] = self.ad_type > + if self.service_uuids is not None: > + properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, > + signature='s') > + if self.solicit_uuids is not None: > + properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, > + signature='s') > + if self.manufacturer_data is not None: > + properties['ManufacturerData'] = dbus.Dictionary( > + self.manufacturer_data, signature='qay') > + if self.service_data is not None: > + properties['ServiceData'] = dbus.Dictionary(self.service_data, > + signature='say') > + return {LE_ADVERTISEMENT_IFACE: properties} > + > + def get_path(self): > + return dbus.ObjectPath(self.path) > + > + def add_service_uuid(self, uuid): > + if not self.service_uuids: > + self.service_uuids = [] > + self.service_uuids.append(uuid) > + > + def add_solicit_uuid(self, uuid): > + if not self.solicit_uuids: > + self.solicit_uuids = [] > + self.solicit_uuids.append(uuid) > + > + def add_manufacturer_data(self, manuf_code, data): > + if not self.manufacturer_data: > + self.manufacturer_data = dict() > + self.manufacturer_data[manuf_code] = data > + > + def add_service_data(self, uuid, data): > + if not self.service_data: > + self.service_data = dict() > + self.service_data[uuid] = data > + > + @dbus.service.method(DBUS_PROP_IFACE, > + in_signature='s', > + out_signature='a{sv}') > + def GetAll(self, interface): > + print 'GetAll' > + if interface != LE_ADVERTISEMENT_IFACE: > + raise InvalidArgsException() > + print 'returning props' > + return self.get_properties()[LE_ADVERTISEMENT_IFACE] > + > + @dbus.service.method(LE_ADVERTISEMENT_IFACE, > + in_signature='', > + out_signature='') > + def Release(self): > + print '%s: Released!' % self.path > + > +class TestAdvertisement(Advertisement): > + > + def __init__(self, bus, index): > + Advertisement.__init__(self, bus, index, 'broadcast') > + self.add_service_uuid('0000180D-0000-1000-8000-00805F9B34FB') > + self.add_service_uuid('0000180F-0000-1000-8000-00805F9B34FB') > + self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04]) > + self.add_service_data('00009999-0000-1000-8000-00805F9B34FB', > + [0x00, 0x01, 0x02, 0x03, 0x04]) > + > + > +def register_ad_cb(): > + print 'Advertisement registered' > + > + > +def register_ad_error_cb(error): > + print 'Failed to register advertisement: ' + str(error) > + mainloop.quit() > + > + > +def find_adapter(bus): > + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), > + DBUS_OM_IFACE) > + objects = remote_om.GetManagedObjects() > + > + for o, props in objects.iteritems(): > + if LE_ADVERTISING_MANAGER_IFACE in props: > + return o > + > + return None > + > + > +def main(): > + global mainloop > + > + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) > + > + bus = dbus.SystemBus() > + > + adapter = find_adapter(bus) > + if not adapter: > + print 'LEAdvertisingManager1 interface not found' > + return > + > + ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), > + LE_ADVERTISING_MANAGER_IFACE) > + > + test_advertisement = TestAdvertisement(bus, 0) > + > + mainloop = gobject.MainLoop() > + > + ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, > + reply_handler=register_ad_cb, > + error_handler=register_ad_error_cb) > + > + mainloop.run() > + > +if __name__ == '__main__': > + main() > diff --git a/test/example-gatt-server b/test/example-gatt-server > new file mode 100755 > index 0000000..a6f5cbe > --- /dev/null > +++ b/test/example-gatt-server > @@ -0,0 +1,533 @@ > +#!/usr/bin/python > + > +import dbus > +import dbus.exceptions > +import dbus.mainloop.glib > +import dbus.service > + > +import array > +import gobject > + > +from random import randint > + > +mainloop = None > + > +BLUEZ_SERVICE_NAME = 'org.bluez' > +GATT_MANAGER_IFACE = 'org.bluez.GattManager1' > +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' > +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' > + > +GATT_SERVICE_IFACE = 'org.bluez.GattService1' > +GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' > +GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' > + > +class InvalidArgsException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' > + > +class NotSupportedException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.NotSupported' > + > +class NotPermittedException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.NotPermitted' > + > +class InvalidValueLengthException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.InvalidValueLength' > + > +class FailedException(dbus.exceptions.DBusException): > + _dbus_error_name = 'org.bluez.Error.Failed' > + > + > +class Service(dbus.service.Object): > + PATH_BASE = '/org/bluez/example/service' > + > + def __init__(self, bus, index, uuid, primary): > + self.path = self.PATH_BASE + str(index) > + self.bus = bus > + self.uuid = uuid > + self.primary = primary > + self.characteristics = [] > + dbus.service.Object.__init__(self, bus, self.path) > + > + def get_properties(self): > + return { > + GATT_SERVICE_IFACE: { > + 'UUID': self.uuid, > + 'Primary': self.primary, > + 'Characteristics': dbus.Array( > + self.get_characteristic_paths(), > + signature='o') > + } > + } > + > + def get_path(self): > + return dbus.ObjectPath(self.path) > + > + def add_characteristic(self, characteristic): > + self.characteristics.append(characteristic) > + > + def get_characteristic_paths(self): > + result = [] > + for chrc in self.characteristics: > + result.append(chrc.get_path()) > + return result > + > + def get_characteristics(self): > + return self.characteristics > + > + @dbus.service.method(DBUS_PROP_IFACE, > + in_signature='s', > + out_signature='a{sv}') > + def GetAll(self, interface): > + if interface != GATT_SERVICE_IFACE: > + raise InvalidArgsException() > + > + return self.get_properties[GATT_SERVICE_IFACE] > + > + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') > + def GetManagedObjects(self): > + response = {} > + print 'GetManagedObjects' > + > + response[self.get_path()] = self.get_properties() > + chrcs = self.get_characteristics() > + for chrc in chrcs: > + response[chrc.get_path()] = chrc.get_properties() > + descs = chrc.get_descriptors() > + for desc in descs: > + response[desc.get_path()] = desc.get_properties() > + > + return response > + > + > +class Characteristic(dbus.service.Object): > + def __init__(self, bus, index, uuid, flags, service): > + self.path = service.path + '/char' + str(index) > + self.bus = bus > + self.uuid = uuid > + self.service = service > + self.flags = flags > + self.descriptors = [] > + dbus.service.Object.__init__(self, bus, self.path) > + > + def get_properties(self): > + return { > + GATT_CHRC_IFACE: { > + 'Service': self.service.get_path(), > + 'UUID': self.uuid, > + 'Flags': self.flags, > + 'Descriptors': dbus.Array( > + self.get_descriptor_paths(), > + signature='o') > + } > + } > + > + def get_path(self): > + return dbus.ObjectPath(self.path) > + > + def add_descriptor(self, descriptor): > + self.descriptors.append(descriptor) > + > + def get_descriptor_paths(self): > + result = [] > + for desc in self.descriptors: > + result.append(desc.get_path()) > + return result > + > + def get_descriptors(self): > + return self.descriptors > + > + @dbus.service.method(DBUS_PROP_IFACE, > + in_signature='s', > + out_signature='a{sv}') > + def GetAll(self, interface): > + if interface != GATT_CHRC_IFACE: > + raise InvalidArgsException() > + > + return self.get_properties[GATT_CHRC_IFACE] > + > + @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay') > + def ReadValue(self): > + print 'Default ReadValue called, returning error' > + raise NotSupportedException() > + > + @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay') > + def WriteValue(self, value): > + print 'Default WriteValue called, returning error' > + raise NotSupportedException() > + > + @dbus.service.method(GATT_CHRC_IFACE) > + def StartNotify(self): > + print 'Default StartNotify called, returning error' > + raise NotSupportedException() > + > + @dbus.service.method(GATT_CHRC_IFACE) > + def StopNotify(self): > + print 'Default StopNotify called, returning error' > + raise NotSupportedException() > + > + @dbus.service.signal(DBUS_PROP_IFACE, > + signature='sa{sv}as') > + def PropertiesChanged(self, interface, changed, invalidated): > + pass > + > + > +class Descriptor(dbus.service.Object): > + def __init__(self, bus, index, uuid, characteristic): > + self.path = characteristic.path + '/desc' + str(index) > + self.bus = bus > + self.uuid = uuid > + self.chrc = characteristic > + dbus.service.Object.__init__(self, bus, self.path) > + > + def get_properties(self): > + return { > + GATT_DESC_IFACE: { > + 'Characteristic': self.chrc.get_path(), > + 'UUID': self.uuid, > + } > + } > + > + def get_path(self): > + return dbus.ObjectPath(self.path) > + > + @dbus.service.method(DBUS_PROP_IFACE, > + in_signature='s', > + out_signature='a{sv}') > + def GetAll(self, interface): > + if interface != GATT_DESC_IFACE: > + raise InvalidArgsException() > + > + return self.get_properties[GATT_CHRC_IFACE] > + > + @dbus.service.method(GATT_DESC_IFACE, out_signature='ay') > + def ReadValue(self): > + print 'Default ReadValue called, returning error' > + raise NotSupportedException() > + > + @dbus.service.method(GATT_DESC_IFACE, in_signature='ay') > + def WriteValue(self, value): > + print 'Default WriteValue called, returning error' > + raise NotSupportedException() > + > + > +class HeartRateService(Service): > + """ > + Fake Heart Rate Service that simulates a fake heart beat and control point > + behavior. > + > + """ > + HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb' > + > + def __init__(self, bus, index): > + Service.__init__(self, bus, index, self.HR_UUID, True) > + self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) > + self.add_characteristic(BodySensorLocationChrc(bus, 1, self)) > + self.add_characteristic(HeartRateControlPointChrc(bus, 2, self)) > + self.energy_expended = 0 > + > + > +class HeartRateMeasurementChrc(Characteristic): > + HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' > + > + def __init__(self, bus, index, service): > + Characteristic.__init__( > + self, bus, index, > + self.HR_MSRMT_UUID, > + ['notify'], > + service) > + self.notifying = False > + self.hr_ee_count = 0 > + > + def hr_msrmt_cb(self): > + value = [] > + value.append(dbus.Byte(0x06)) > + > + value.append(dbus.Byte(randint(90, 130))) > + > + if self.hr_ee_count % 10 == 0: > + value[0] = dbus.Byte(value[0] | 0x08) > + value.append(dbus.Byte(self.service.energy_expended & 0xff)) > + value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) > + > + self.service.energy_expended = \ > + min(0xffff, self.service.energy_expended + 1) > + self.hr_ee_count += 1 > + > + print 'Updating value: ' + repr(value) > + > + self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) > + > + return self.notifying > + > + def _update_hr_msrmt_simulation(self): > + print 'Update HR Measurement Simulation' > + > + if not self.notifying: > + return > + > + gobject.timeout_add(1000, self.hr_msrmt_cb) > + > + def StartNotify(self): > + if self.notifying: > + print 'Already notifying, nothing to do' > + return > + > + self.notifying = True > + self._update_hr_msrmt_simulation() > + > + def StopNotify(self): > + if not self.notifying: > + print 'Not notifying, nothing to do' > + return > + > + self.notifying = False > + self._update_hr_msrmt_simulation() > + > + > +class BodySensorLocationChrc(Characteristic): > + BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' > + > + def __init__(self, bus, index, service): > + Characteristic.__init__( > + self, bus, index, > + self.BODY_SNSR_LOC_UUID, > + ['read'], > + service) > + > + def ReadValue(self): > + # Return 'Chest' as the sensor location. > + return [ 0x01 ] > + > +class HeartRateControlPointChrc(Characteristic): > + HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' > + > + def __init__(self, bus, index, service): > + Characteristic.__init__( > + self, bus, index, > + self.HR_CTRL_PT_UUID, > + ['write'], > + service) > + > + def WriteValue(self, value): > + print 'Heart Rate Control Point WriteValue called' > + > + if len(value) != 1: > + raise InvalidValueLengthException() > + > + byte = value[0] > + print 'Control Point value: ' + repr(byte) > + > + if byte != 1: > + raise FailedException("0x80") > + > + print 'Energy Expended field reset!' > + self.service.energy_expended = 0 > + > + > +class BatteryService(Service): > + """ > + Fake Battery service that emulates a draining battery. > + > + """ > + BATTERY_UUID = '180f' > + > + def __init__(self, bus, index): > + Service.__init__(self, bus, index, self.BATTERY_UUID, True) > + self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self)) > + > + > +class BatteryLevelCharacteristic(Characteristic): > + """ > + Fake Battery Level characteristic. The battery level is drained by 2 points > + every 5 seconds. > + > + """ > + BATTERY_LVL_UUID = '2a19' > + > + def __init__(self, bus, index, service): > + Characteristic.__init__( > + self, bus, index, > + self.BATTERY_LVL_UUID, > + ['read', 'notify'], > + service) > + self.notifying = False > + self.battery_lvl = 100 > + gobject.timeout_add(5000, self.drain_battery) > + > + def notify_battery_level(self): > + if not self.notifying: > + return > + self.PropertiesChanged( > + GATT_CHRC_IFACE, > + { 'Value': [dbus.Byte(self.battery_lvl)] }, []) > + > + def drain_battery(self): > + if self.battery_lvl > 0: > + self.battery_lvl -= 2 > + if self.battery_lvl < 0: > + self.battery_lvl = 0 > + print 'Battery Level drained: ' + repr(self.battery_lvl) > + self.notify_battery_level() > + return True > + > + def ReadValue(self): > + print 'Battery Level read: ' + repr(self.battery_lvl) > + return [dbus.Byte(self.battery_lvl)] > + > + def StartNotify(self): > + if self.notifying: > + print 'Already notifying, nothing to do' > + return > + > + self.notifying = True > + self.notify_battery_level() > + > + def StopNotify(self): > + if not self.notifying: > + print 'Not notifying, nothing to do' > + return > + > + self.notifying = False > + > + > +class TestService(Service): > + """ > + Dummy test service that provides characteristics and descriptors that > + exercise various API functionality. > + > + """ > + TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' > + > + def __init__(self, bus, index): > + Service.__init__(self, bus, index, self.TEST_SVC_UUID, False) > + self.add_characteristic(TestCharacteristic(bus, 0, self)) > + > + > +class TestCharacteristic(Characteristic): > + """ > + Dummy test characteristic. Allows writing arbitrary bytes to its value, and > + contains "extended properties", as well as a test descriptor. > + > + """ > + TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' > + > + def __init__(self, bus, index, service): > + Characteristic.__init__( > + self, bus, index, > + self.TEST_CHRC_UUID, > + ['read', 'write', 'writable-auxiliaries'], > + service) > + self.value = [] > + self.add_descriptor(TestDescriptor(bus, 0, self)) > + self.add_descriptor( > + CharacteristicUserDescriptionDescriptor(bus, 1, self)) > + > + def ReadValue(self): > + print 'TestCharacteristic Read: ' + repr(self.value) > + return self.value > + > + def WriteValue(self, value): > + print 'TestCharacteristic Write: ' + repr(value) > + self.value = value > + > + > +class TestDescriptor(Descriptor): > + """ > + Dummy test descriptor. Returns a static value. > + > + """ > + TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2' > + > + def __init__(self, bus, index, characteristic): > + Descriptor.__init__( > + self, bus, index, > + self.TEST_DESC_UUID, > + characteristic) > + > + def ReadValue(self): > + return [ > + dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') > + ] > + > + > +class CharacteristicUserDescriptionDescriptor(Descriptor): > + """ > + Writable CUD descriptor. > + > + """ > + CUD_UUID = '2901' > + > + def __init__(self, bus, index, characteristic): > + self.writable = 'writable-auxiliaries' in characteristic.flags > + self.value = array.array('B', 'This is a characteristic for testing') > + self.value = self.value.tolist() > + Descriptor.__init__( > + self, bus, index, > + self.CUD_UUID, > + characteristic) > + > + def ReadValue(self): > + return self.value > + > + def WriteValue(self, value): > + if not self.writable: > + raise NotPermittedException() > + self.value = value > + > + > +def register_service_cb(): > + print 'GATT service registered' > + > + > +def register_service_error_cb(error): > + print 'Failed to register service: ' + str(error) > + mainloop.quit() > + > + > +def find_adapter(bus): > + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), > + DBUS_OM_IFACE) > + objects = remote_om.GetManagedObjects() > + > + for o, props in objects.iteritems(): > + if props.has_key(GATT_MANAGER_IFACE): > + return o > + > + return None > + > +def main(): > + global mainloop > + > + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) > + > + bus = dbus.SystemBus() > + > + adapter = find_adapter(bus) > + if not adapter: > + print 'GattManager1 interface not found' > + return > + > + service_manager = dbus.Interface( > + bus.get_object(BLUEZ_SERVICE_NAME, adapter), > + GATT_MANAGER_IFACE) > + > + hr_service = HeartRateService(bus, 0) > + bat_service = BatteryService(bus, 1) > + test_service = TestService(bus, 2) > + > + mainloop = gobject.MainLoop() > + > + service_manager.RegisterService(hr_service.get_path(), {}, > + reply_handler=register_service_cb, > + error_handler=register_service_error_cb) > + service_manager.RegisterService(bat_service.get_path(), {}, > + reply_handler=register_service_cb, > + error_handler=register_service_error_cb) > + service_manager.RegisterService(test_service.get_path(), {}, > + reply_handler=register_service_cb, > + error_handler=register_service_error_cb) > + > + mainloop.run() > + > +if __name__ == '__main__': > + main() > diff --git a/tools/gatt-example b/tools/gatt-example > deleted file mode 100644 > index a6f5cbe..0000000 > --- a/tools/gatt-example > +++ /dev/null > @@ -1,533 +0,0 @@ > -#!/usr/bin/python > - > -import dbus > -import dbus.exceptions > -import dbus.mainloop.glib > -import dbus.service > - > -import array > -import gobject > - > -from random import randint > - > -mainloop = None > - > -BLUEZ_SERVICE_NAME = 'org.bluez' > -GATT_MANAGER_IFACE = 'org.bluez.GattManager1' > -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' > -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' > - > -GATT_SERVICE_IFACE = 'org.bluez.GattService1' > -GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' > -GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' > - > -class InvalidArgsException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' > - > -class NotSupportedException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.NotSupported' > - > -class NotPermittedException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.NotPermitted' > - > -class InvalidValueLengthException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.InvalidValueLength' > - > -class FailedException(dbus.exceptions.DBusException): > - _dbus_error_name = 'org.bluez.Error.Failed' > - > - > -class Service(dbus.service.Object): > - PATH_BASE = '/org/bluez/example/service' > - > - def __init__(self, bus, index, uuid, primary): > - self.path = self.PATH_BASE + str(index) > - self.bus = bus > - self.uuid = uuid > - self.primary = primary > - self.characteristics = [] > - dbus.service.Object.__init__(self, bus, self.path) > - > - def get_properties(self): > - return { > - GATT_SERVICE_IFACE: { > - 'UUID': self.uuid, > - 'Primary': self.primary, > - 'Characteristics': dbus.Array( > - self.get_characteristic_paths(), > - signature='o') > - } > - } > - > - def get_path(self): > - return dbus.ObjectPath(self.path) > - > - def add_characteristic(self, characteristic): > - self.characteristics.append(characteristic) > - > - def get_characteristic_paths(self): > - result = [] > - for chrc in self.characteristics: > - result.append(chrc.get_path()) > - return result > - > - def get_characteristics(self): > - return self.characteristics > - > - @dbus.service.method(DBUS_PROP_IFACE, > - in_signature='s', > - out_signature='a{sv}') > - def GetAll(self, interface): > - if interface != GATT_SERVICE_IFACE: > - raise InvalidArgsException() > - > - return self.get_properties[GATT_SERVICE_IFACE] > - > - @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') > - def GetManagedObjects(self): > - response = {} > - print 'GetManagedObjects' > - > - response[self.get_path()] = self.get_properties() > - chrcs = self.get_characteristics() > - for chrc in chrcs: > - response[chrc.get_path()] = chrc.get_properties() > - descs = chrc.get_descriptors() > - for desc in descs: > - response[desc.get_path()] = desc.get_properties() > - > - return response > - > - > -class Characteristic(dbus.service.Object): > - def __init__(self, bus, index, uuid, flags, service): > - self.path = service.path + '/char' + str(index) > - self.bus = bus > - self.uuid = uuid > - self.service = service > - self.flags = flags > - self.descriptors = [] > - dbus.service.Object.__init__(self, bus, self.path) > - > - def get_properties(self): > - return { > - GATT_CHRC_IFACE: { > - 'Service': self.service.get_path(), > - 'UUID': self.uuid, > - 'Flags': self.flags, > - 'Descriptors': dbus.Array( > - self.get_descriptor_paths(), > - signature='o') > - } > - } > - > - def get_path(self): > - return dbus.ObjectPath(self.path) > - > - def add_descriptor(self, descriptor): > - self.descriptors.append(descriptor) > - > - def get_descriptor_paths(self): > - result = [] > - for desc in self.descriptors: > - result.append(desc.get_path()) > - return result > - > - def get_descriptors(self): > - return self.descriptors > - > - @dbus.service.method(DBUS_PROP_IFACE, > - in_signature='s', > - out_signature='a{sv}') > - def GetAll(self, interface): > - if interface != GATT_CHRC_IFACE: > - raise InvalidArgsException() > - > - return self.get_properties[GATT_CHRC_IFACE] > - > - @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay') > - def ReadValue(self): > - print 'Default ReadValue called, returning error' > - raise NotSupportedException() > - > - @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay') > - def WriteValue(self, value): > - print 'Default WriteValue called, returning error' > - raise NotSupportedException() > - > - @dbus.service.method(GATT_CHRC_IFACE) > - def StartNotify(self): > - print 'Default StartNotify called, returning error' > - raise NotSupportedException() > - > - @dbus.service.method(GATT_CHRC_IFACE) > - def StopNotify(self): > - print 'Default StopNotify called, returning error' > - raise NotSupportedException() > - > - @dbus.service.signal(DBUS_PROP_IFACE, > - signature='sa{sv}as') > - def PropertiesChanged(self, interface, changed, invalidated): > - pass > - > - > -class Descriptor(dbus.service.Object): > - def __init__(self, bus, index, uuid, characteristic): > - self.path = characteristic.path + '/desc' + str(index) > - self.bus = bus > - self.uuid = uuid > - self.chrc = characteristic > - dbus.service.Object.__init__(self, bus, self.path) > - > - def get_properties(self): > - return { > - GATT_DESC_IFACE: { > - 'Characteristic': self.chrc.get_path(), > - 'UUID': self.uuid, > - } > - } > - > - def get_path(self): > - return dbus.ObjectPath(self.path) > - > - @dbus.service.method(DBUS_PROP_IFACE, > - in_signature='s', > - out_signature='a{sv}') > - def GetAll(self, interface): > - if interface != GATT_DESC_IFACE: > - raise InvalidArgsException() > - > - return self.get_properties[GATT_CHRC_IFACE] > - > - @dbus.service.method(GATT_DESC_IFACE, out_signature='ay') > - def ReadValue(self): > - print 'Default ReadValue called, returning error' > - raise NotSupportedException() > - > - @dbus.service.method(GATT_DESC_IFACE, in_signature='ay') > - def WriteValue(self, value): > - print 'Default WriteValue called, returning error' > - raise NotSupportedException() > - > - > -class HeartRateService(Service): > - """ > - Fake Heart Rate Service that simulates a fake heart beat and control point > - behavior. > - > - """ > - HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb' > - > - def __init__(self, bus, index): > - Service.__init__(self, bus, index, self.HR_UUID, True) > - self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) > - self.add_characteristic(BodySensorLocationChrc(bus, 1, self)) > - self.add_characteristic(HeartRateControlPointChrc(bus, 2, self)) > - self.energy_expended = 0 > - > - > -class HeartRateMeasurementChrc(Characteristic): > - HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' > - > - def __init__(self, bus, index, service): > - Characteristic.__init__( > - self, bus, index, > - self.HR_MSRMT_UUID, > - ['notify'], > - service) > - self.notifying = False > - self.hr_ee_count = 0 > - > - def hr_msrmt_cb(self): > - value = [] > - value.append(dbus.Byte(0x06)) > - > - value.append(dbus.Byte(randint(90, 130))) > - > - if self.hr_ee_count % 10 == 0: > - value[0] = dbus.Byte(value[0] | 0x08) > - value.append(dbus.Byte(self.service.energy_expended & 0xff)) > - value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) > - > - self.service.energy_expended = \ > - min(0xffff, self.service.energy_expended + 1) > - self.hr_ee_count += 1 > - > - print 'Updating value: ' + repr(value) > - > - self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) > - > - return self.notifying > - > - def _update_hr_msrmt_simulation(self): > - print 'Update HR Measurement Simulation' > - > - if not self.notifying: > - return > - > - gobject.timeout_add(1000, self.hr_msrmt_cb) > - > - def StartNotify(self): > - if self.notifying: > - print 'Already notifying, nothing to do' > - return > - > - self.notifying = True > - self._update_hr_msrmt_simulation() > - > - def StopNotify(self): > - if not self.notifying: > - print 'Not notifying, nothing to do' > - return > - > - self.notifying = False > - self._update_hr_msrmt_simulation() > - > - > -class BodySensorLocationChrc(Characteristic): > - BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' > - > - def __init__(self, bus, index, service): > - Characteristic.__init__( > - self, bus, index, > - self.BODY_SNSR_LOC_UUID, > - ['read'], > - service) > - > - def ReadValue(self): > - # Return 'Chest' as the sensor location. > - return [ 0x01 ] > - > -class HeartRateControlPointChrc(Characteristic): > - HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' > - > - def __init__(self, bus, index, service): > - Characteristic.__init__( > - self, bus, index, > - self.HR_CTRL_PT_UUID, > - ['write'], > - service) > - > - def WriteValue(self, value): > - print 'Heart Rate Control Point WriteValue called' > - > - if len(value) != 1: > - raise InvalidValueLengthException() > - > - byte = value[0] > - print 'Control Point value: ' + repr(byte) > - > - if byte != 1: > - raise FailedException("0x80") > - > - print 'Energy Expended field reset!' > - self.service.energy_expended = 0 > - > - > -class BatteryService(Service): > - """ > - Fake Battery service that emulates a draining battery. > - > - """ > - BATTERY_UUID = '180f' > - > - def __init__(self, bus, index): > - Service.__init__(self, bus, index, self.BATTERY_UUID, True) > - self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self)) > - > - > -class BatteryLevelCharacteristic(Characteristic): > - """ > - Fake Battery Level characteristic. The battery level is drained by 2 points > - every 5 seconds. > - > - """ > - BATTERY_LVL_UUID = '2a19' > - > - def __init__(self, bus, index, service): > - Characteristic.__init__( > - self, bus, index, > - self.BATTERY_LVL_UUID, > - ['read', 'notify'], > - service) > - self.notifying = False > - self.battery_lvl = 100 > - gobject.timeout_add(5000, self.drain_battery) > - > - def notify_battery_level(self): > - if not self.notifying: > - return > - self.PropertiesChanged( > - GATT_CHRC_IFACE, > - { 'Value': [dbus.Byte(self.battery_lvl)] }, []) > - > - def drain_battery(self): > - if self.battery_lvl > 0: > - self.battery_lvl -= 2 > - if self.battery_lvl < 0: > - self.battery_lvl = 0 > - print 'Battery Level drained: ' + repr(self.battery_lvl) > - self.notify_battery_level() > - return True > - > - def ReadValue(self): > - print 'Battery Level read: ' + repr(self.battery_lvl) > - return [dbus.Byte(self.battery_lvl)] > - > - def StartNotify(self): > - if self.notifying: > - print 'Already notifying, nothing to do' > - return > - > - self.notifying = True > - self.notify_battery_level() > - > - def StopNotify(self): > - if not self.notifying: > - print 'Not notifying, nothing to do' > - return > - > - self.notifying = False > - > - > -class TestService(Service): > - """ > - Dummy test service that provides characteristics and descriptors that > - exercise various API functionality. > - > - """ > - TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' > - > - def __init__(self, bus, index): > - Service.__init__(self, bus, index, self.TEST_SVC_UUID, False) > - self.add_characteristic(TestCharacteristic(bus, 0, self)) > - > - > -class TestCharacteristic(Characteristic): > - """ > - Dummy test characteristic. Allows writing arbitrary bytes to its value, and > - contains "extended properties", as well as a test descriptor. > - > - """ > - TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' > - > - def __init__(self, bus, index, service): > - Characteristic.__init__( > - self, bus, index, > - self.TEST_CHRC_UUID, > - ['read', 'write', 'writable-auxiliaries'], > - service) > - self.value = [] > - self.add_descriptor(TestDescriptor(bus, 0, self)) > - self.add_descriptor( > - CharacteristicUserDescriptionDescriptor(bus, 1, self)) > - > - def ReadValue(self): > - print 'TestCharacteristic Read: ' + repr(self.value) > - return self.value > - > - def WriteValue(self, value): > - print 'TestCharacteristic Write: ' + repr(value) > - self.value = value > - > - > -class TestDescriptor(Descriptor): > - """ > - Dummy test descriptor. Returns a static value. > - > - """ > - TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2' > - > - def __init__(self, bus, index, characteristic): > - Descriptor.__init__( > - self, bus, index, > - self.TEST_DESC_UUID, > - characteristic) > - > - def ReadValue(self): > - return [ > - dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') > - ] > - > - > -class CharacteristicUserDescriptionDescriptor(Descriptor): > - """ > - Writable CUD descriptor. > - > - """ > - CUD_UUID = '2901' > - > - def __init__(self, bus, index, characteristic): > - self.writable = 'writable-auxiliaries' in characteristic.flags > - self.value = array.array('B', 'This is a characteristic for testing') > - self.value = self.value.tolist() > - Descriptor.__init__( > - self, bus, index, > - self.CUD_UUID, > - characteristic) > - > - def ReadValue(self): > - return self.value > - > - def WriteValue(self, value): > - if not self.writable: > - raise NotPermittedException() > - self.value = value > - > - > -def register_service_cb(): > - print 'GATT service registered' > - > - > -def register_service_error_cb(error): > - print 'Failed to register service: ' + str(error) > - mainloop.quit() > - > - > -def find_adapter(bus): > - remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), > - DBUS_OM_IFACE) > - objects = remote_om.GetManagedObjects() > - > - for o, props in objects.iteritems(): > - if props.has_key(GATT_MANAGER_IFACE): > - return o > - > - return None > - > -def main(): > - global mainloop > - > - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) > - > - bus = dbus.SystemBus() > - > - adapter = find_adapter(bus) > - if not adapter: > - print 'GattManager1 interface not found' > - return > - > - service_manager = dbus.Interface( > - bus.get_object(BLUEZ_SERVICE_NAME, adapter), > - GATT_MANAGER_IFACE) > - > - hr_service = HeartRateService(bus, 0) > - bat_service = BatteryService(bus, 1) > - test_service = TestService(bus, 2) > - > - mainloop = gobject.MainLoop() > - > - service_manager.RegisterService(hr_service.get_path(), {}, > - reply_handler=register_service_cb, > - error_handler=register_service_error_cb) > - service_manager.RegisterService(bat_service.get_path(), {}, > - reply_handler=register_service_cb, > - error_handler=register_service_error_cb) > - service_manager.RegisterService(test_service.get_path(), {}, > - reply_handler=register_service_cb, > - error_handler=register_service_error_cb) > - > - mainloop.run() > - > -if __name__ == '__main__': > - main() > -- > 2.2.0.rc0.207.ga3a616c > Pushed. Thanks, Arman -- To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html