Hi Luiz, I'm not sure if I can attach files to this forum. I just try to attach the files about the "Service Changed Indication" issue. Here is the list of files: bt_read.py - the script that the "Heart Rate Measurement" and "Body Sensor Location" characteristics only have the Read attribute bt_notify.py - the script that the "Heart Rate Measurement" and "Body Sensor Location" characteristics only have both the Read and Notify attributes btmon_read.log - the btmon log when running bt_read.py btmon_notify.log - the btmon log when running bt_notify.py Thank you very much! On Wed, Jan 27, 2021 at 12:46 AM Kenny Bian <kennybian@xxxxxxxxx> wrote: > > Hi Luiz, > > On Wed, Jan 27, 2021 at 12:42 AM Kenny Bian <kennybian@xxxxxxxxx> wrote: > > > > Hi Luiz, > > > > Thank you so much for your information. > > > > I created a test GATT server from the sample > > code(https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server). > > I have 2 python scripts. In the "bt_read.py" I created, the Heart Rate > > Measurement(2A37) and Body Sensor Location(2A38) only have Read > > attribute. In the "bt_notify.py" I created, the Heart Rate > > Measurement(2A37) and Body Sensor Location(2A38) only have both the > > Read and Notify attribute. > > > > Here is what I did: > > I ran the "bt_read.py" and captured the btmon log. Then I ran > > "bt_notify.py" and captured the btmon log. I read both logs. But I > > don't see there is any info regarding the "Service Changed > > Indication". > > > > Here is the btmon log with both the Read and Notify attributes below. > > Thanks again for your help! > > ========================================================================= > > Bluetooth monitor ver 5.48 > > = Note: Linux version 4.19.35-g92e18fefc77 (armv7l) 0.809169 > > = Note: Bluetooth subsystem version 2.22 0.809182 > > = New Index: 00:16:A4:4A:2D:27 (Primary,UART,hci0) [hci0] 0.809187 > > = Open Index: 00:16:A4:4A:2D:27 [hci0] 0.809190 > > = Index Info: 00:16:A4:4.. (Cypress Semiconductor Corporation) [hci0] 0.809194 > > @ MGMT Open: bluetoothd (privileged) version 1.14 {0x0002} 0.809200 > > @ MGMT Open: bluetoothd (privileged) version 1.14 {0x0001} 0.809203 > > @ MGMT Open: btmon (privileged) version 1.14 {0x0003} 0.809276 > > > ACL Data RX: Handle 65 flags 0x02 dlen 8 #1 [hci0] 4.543721 > > ATT: Handle Value Notification (0x1b) len 3 > > Handle: 0x0016 > > Data: 64 > > > ACL Data RX: Handle 65 flags 0x02 dlen 11 #2 [hci0] 6.883753 > > ATT: Read By Group Type Request (0x10) len 6 > > Handle range: 0x0001-0xffff > > Attribute group type: Primary Service (0x2800) > > < ACL Data TX: Handle 65 flags 0x00 dlen 30 #3 [hci0] 6.884412 > > ATT: Read By Group Type Response (0x11) len 25 > > Attribute data length: 6 > > Attribute group list: 4 entries > > Handle range: 0x0001-0x0005 > > UUID: Generic Access Profile (0x1800) > > Handle range: 0x0006-0x0009 > > UUID: Generic Attribute Profile (0x1801) > > Handle range: 0x0234-0x023c > > UUID: Heart Rate (0x180d) > > Handle range: 0x023d-0x0240 > > UUID: Battery Service (0x180f) > > > ACL Data RX: Handle 65 flags 0x02 dlen 11 #4 [hci0] 6.944638 > > ATT: Read By Group Type Request (0x10) len 6 > > Handle range: 0x0241-0xffff > > Attribute group type: Primary Service (0x2800) > > < ACL Data TX: Handle 65 flags 0x00 dlen 26 #5 [hci0] 6.945293 > > ATT: Read By Group Type Response (0x11) len 21 > > Attribute data length: 20 > > Attribute group list: 1 entry > > Handle range: 0x0241-0x0250 > > UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef0) > > > ACL Data RX: Handle 65 flags 0x02 dlen 11 #6 [hci0] 7.004723 > > ATT: Read By Group Type Request (0x10) len 6 > > Handle range: 0x0251-0xffff > > Attribute group type: Primary Service (0x2800) > > < ACL Data TX: Handle 65 flags 0x00 dlen 9 #7 [hci0] 7.005369 > > ATT: Error Response (0x01) len 4 > > Read By Group Type Request (0x10) > > Handle: 0x0251 > > Error: Attribute Not Found (0x0a) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #8 [hci0] 7.005549 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 11 #9 [hci0] 7.064450 > > ATT: Read By Type Request (0x08) len 6 > > Handle range: 0x0234-0x023c > > Attribute type: Characteristic (0x2803) > > < ACL Data TX: Handle 65 flags 0x00 dlen 27 #10 [hci0] 7.065112 > > ATT: Read By Type Response (0x09) len 22 > > Attribute data length: 7 > > Attribute data list: 3 entries > > Handle: 0x0235 > > Value: 123602372a > > Handle: 0x0238 > > Value: 123902382a > > Handle: 0x023b > > Value: 083c02392a > > > ACL Data RX: Handle 65 flags 0x02 dlen 11 #11 [hci0] 7.124428 > > ATT: Read By Type Request (0x08) len 6 > > Handle range: 0x023d-0x0240 > > Attribute type: Characteristic (0x2803) > > < ACL Data TX: Handle 65 flags 0x00 dlen 13 #12 [hci0] 7.125226 > > ATT: Read By Type Response (0x09) len 8 > > Attribute data length: 7 > > Attribute data list: 1 entry > > Handle: 0x023e > > Value: 123f02192a > > > HCI Event: Number of Completed Packets (0x13) plen 5 #13 [hci0] 7.125376 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 11 #14 [hci0] 7.184416 > > ATT: Read By Type Request (0x08) len 6 > > Handle range: 0x0241-0x0250 > > Attribute type: Characteristic (0x2803) > > < ACL Data TX: Handle 65 flags 0x00 dlen 69 #15 [hci0] 7.185101 > > ATT: Read By Type Response (0x09) len 64 > > Attribute data length: 21 > > Attribute data list: 3 entries > > Handle: 0x0242 > > Value: 8a4302f1debc9a785634127856341278563412 > > Handle: 0x0247 > > Value: 8a4802f3debc9a785634127856341278563412 > > Handle: 0x024c > > Value: 8a4d02f5debc9a785634127856341278563412 > > > ACL Data RX: Handle 65 flags 0x02 dlen 11 #16 [hci0] 7.244526 > > ATT: Read By Type Request (0x08) len 6 > > Handle range: 0x024e-0x0250 > > Attribute type: Characteristic (0x2803) > > < ACL Data TX: Handle 65 flags 0x00 dlen 9 #17 [hci0] 7.246144 > > ATT: Error Response (0x01) len 4 > > Read By Type Request (0x08) > > Handle: 0x024e > > Error: Attribute Not Found (0x0a) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #18 [hci0] 7.246564 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #19 [hci0] 7.304500 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x0237-0x0237 > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #20 [hci0] 7.305951 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x0237 > > UUID: Client Characteristic Configuration (0x2902) > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #21 [hci0] 7.364540 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x023a-0x023a > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #22 [hci0] 7.366242 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x023a > > UUID: Client Characteristic Configuration (0x2902) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #23 [hci0] 7.366660 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #24 [hci0] 7.424617 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x0240-0x0240 > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #25 [hci0] 7.426085 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x0240 > > UUID: Client Characteristic Configuration (0x2902) > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #26 [hci0] 7.484523 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x0244-0x0246 > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #27 [hci0] 7.486104 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x0244 > > UUID: Characteristic Extended Properties (0x2900) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #28 [hci0] 7.486520 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #29 [hci0] 7.544546 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x0245-0x0246 > > < ACL Data TX: Handle 65 flags 0x00 dlen 24 #30 [hci0] 7.546011 > > ATT: Find Information Response (0x05) len 19 > > Format: UUID-128 (0x02) > > Handle: 0x0245 > > UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef2) > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #31 [hci0] 7.604713 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x0246-0x0246 > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #32 [hci0] 7.606323 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x0246 > > UUID: Characteristic User Description (0x2901) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #33 [hci0] 7.606747 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #34 [hci0] 7.664243 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x0249-0x024b > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #35 [hci0] 7.665862 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x0249 > > UUID: Characteristic Extended Properties (0x2900) > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #36 [hci0] 7.724603 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x024a-0x024b > > < ACL Data TX: Handle 65 flags 0x00 dlen 24 #37 [hci0] 7.726315 > > ATT: Find Information Response (0x05) len 19 > > Format: UUID-128 (0x02) > > Handle: 0x024a > > UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef4) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #38 [hci0] 7.726625 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #39 [hci0] 7.784545 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x024b-0x024b > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #40 [hci0] 7.785996 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x024b > > UUID: Characteristic User Description (0x2901) > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #41 [hci0] 7.844187 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x024e-0x0250 > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #42 [hci0] 7.845777 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x024e > > UUID: Characteristic Extended Properties (0x2900) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #43 [hci0] 7.846215 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #44 [hci0] 7.904466 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x024f-0x0250 > > < ACL Data TX: Handle 65 flags 0x00 dlen 24 #45 [hci0] 7.906160 > > ATT: Find Information Response (0x05) len 19 > > Format: UUID-128 (0x02) > > Handle: 0x024f > > UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef6) > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #46 [hci0] 7.964493 > > ATT: Find Information Request (0x04) len 4 > > Handle range: 0x0250-0x0250 > > < ACL Data TX: Handle 65 flags 0x00 dlen 10 #47 [hci0] 7.966067 > > ATT: Find Information Response (0x05) len 5 > > Format: UUID-16 (0x01) > > Handle: 0x0250 > > UUID: Characteristic User Description (0x2901) > > > HCI Event: Number of Completed Packets (0x13) plen 5 #48 [hci0] 7.966489 > > Num handles: 1 > > Handle: 65 > > Count: 2 > > > HCI Event: Number of Completed Packets (0x13) plen 5 #49 [hci0] 8.203559 > > Num handles: 1 > > Handle: 65 > > Count: 1 > > > ACL Data RX: Handle 65 flags 0x02 dlen 7 #50 [hci0] 11.833674 > > ATT: Read Request (0x0a) len 2 > > Handle: 0x0236 > > < ACL Data TX: Handle 65 flags 0x00 dlen 7 #51 [hci0] 11.840770 > > ATT: Read Response (0x0b) len 2 > > Value: 0663 > > > HCI Event: Number of Completed Packets (0x13) plen 5 #52 [hci0] 12.078687 > > Num handles: 1 > > Handle: 65 > > Count: 1 > > > ACL Data RX: Handle 65 flags 0x02 dlen 9 #53 [hci0] 14.055212 > > ATT: Write Request (0x12) len 4 > > Handle: 0x0237 > > Data: 0100 > > < ACL Data TX: Handle 65 flags 0x00 dlen 5 #54 [hci0] 14.056230 > > ATT: Write Response (0x13) len 0 > > > HCI Event: Number of Completed Packets (0x13) plen 5 #55 [hci0] 14.329045 > > Num handles: 1 > > Handle: 65 > > Count: 1 > > < ACL Data TX: Handle 65 flags 0x00 dlen 11 #56 [hci0] 15.078671 > > ATT: Handle Value Notification (0x1b) len 6 > > Handle: 0x0236 > > Data: 0e750000 > > > HCI Event: Number of Completed Packets (0x13) plen 5 #57 [hci0] 15.330353 > > Num handles: 1 > > Handle: 65 > > Count: 1 > > < ACL Data TX: Handle 65 flags 0x00 dlen 9 #58 [hci0] 16.071839 > > ATT: Handle Value Notification (0x1b) len 4 > > Handle: 0x0236 > > Data: 066e > > > ACL Data RX: H > > > > On Mon, Jan 25, 2021 at 9:59 AM Luiz Augusto von Dentz > > <luiz.dentz@xxxxxxxxx> wrote: > > > > > > Hi Kenny, > > > > > > On Sun, Jan 24, 2021 at 10:42 PM Kenny Bian <kennybian@xxxxxxxxx> wrote: > > > > > > > > Hi Luiz, > > > > > > > > Thank you so much for your reply. I appreciate it. > > > > > > > > By "registering the services", do you mean "RegisterApplication()" in > > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server#n656? > > > > If that is the case, I believe I already registered the services. > > > > I checked the files in /var/lib/bluetooth. According to > > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/settings-storage.txt#n321, > > > > there should be a "[ServiceChanged]" section in the "info" file. But I > > > > don't see it in the "info" file. Is there a way to tell the "Service > > > > Changed Indication" is actually working? > > > > Let's suppose the "Service Changed Indication" is already enabled, is > > > > there a way for the mobile app to check on their side to tell which > > > > service(s) got changed? > > > > > > HCI traces (btmon) should be able to tell you if it has been > > > subscribed or not, if there is an Indication in it it probably means > > > the remote has subscribed. > > > > > > > Thanks again for your help. > > > > > > > > On Sun, Jan 24, 2021 at 7:35 PM Luiz Augusto von Dentz > > > > <luiz.dentz@xxxxxxxxx> wrote: > > > > > > > > > > Hi Kenny, > > > > > > > > > > On Sun, Jan 24, 2021 at 12:45 AM Kenny Bian <kennybian@xxxxxxxxx> wrote: > > > > > > > > > > > > Hello, > > > > > > > > > > > > We implemented a GATT server on Linux in Python. The code is based on > > > > > > the code sample(https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server). > > > > > > The BlueZ version is 5.48. But we found a problem. The cached data in > > > > > > /var/lib/bluetooth caused the mobile app to crash if some > > > > > > characteristics are changed. After some research, we found "Under BLE > > > > > > standard 'Generic Attribute'(0x1801), there is a Characteristic > > > > > > 'Service Changed' (0x2A05) with 'indicate' property", see > > > > > > https://github.com/espressif/esp-idf/issues/1777. > > > > > > > > > > > > The questions we have: > > > > > > How to enable the "Service Changed Indication"(0x2A05) in the > > > > > > bluetooth? Is there any code example in Python? > > > > > > > > > > If you are registering the services with Bluetoothd then it should > > > > > generate the service change automatically: > > > > > > > > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/src/gatt-database.c#n1185 > > > > > > > > > > When a new service is registered it is indicated here: > > > > > > > > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/src/gatt-database.c#n1452 > > > > > > > > > > > > > > > -- > > > > > Luiz Augusto von Dentz > > > > > > > > > > > > -- > > > Luiz Augusto von Dentz
#!/usr/bin/env python3 import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service import array try: from gi.repository import GObject except ImportError: import gobject as GObject import sys from random import randint # Advert import gobject mainloop = None BLUEZ_SERVICE_NAME = 'org.bluez' GATT_MANAGER_IFACE = 'org.bluez.GattManager1' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' # Advert LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' # Advert class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class NotSupportedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotSupported' class NotPermittedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotPermitted' class InvalidValueLengthException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.InvalidValueLength' class FailedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.Failed' class Advertisement(dbus.service.Object): PATH_BASE = '/org/bluez/example/advertisement' def __init__(self, bus, index, advertising_type): self.path = self.PATH_BASE + str(index) self.bus = bus self.ad_type = advertising_type self.service_uuids = None self.manufacturer_data = None self.solicit_uuids = None self.service_data = None self.local_name = None self.include_tx_power = None self.data = None dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): properties = dict() properties['Type'] = self.ad_type if self.service_uuids is not None: properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s') if self.solicit_uuids is not None: properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s') if self.manufacturer_data is not None: properties['ManufacturerData'] = dbus.Dictionary( self.manufacturer_data, signature='qv') if self.service_data is not None: properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='sv') if self.local_name is not None: properties['LocalName'] = dbus.String(self.local_name) if self.include_tx_power is not None: properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power) if self.data is not None: properties['Data'] = dbus.Dictionary( self.data, signature='yv') return {LE_ADVERTISEMENT_IFACE: properties} def get_path(self): return dbus.ObjectPath(self.path) def add_service_uuid(self, uuid): if not self.service_uuids: self.service_uuids = [] self.service_uuids.append(uuid) def add_solicit_uuid(self, uuid): if not self.solicit_uuids: self.solicit_uuids = [] self.solicit_uuids.append(uuid) def add_manufacturer_data(self, manuf_code, data): if not self.manufacturer_data: self.manufacturer_data = dbus.Dictionary({}, signature='qv') self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y') def add_service_data(self, uuid, data): if not self.service_data: self.service_data = dbus.Dictionary({}, signature='sv') self.service_data[uuid] = dbus.Array(data, signature='y') def add_local_name(self, name): if not self.local_name: self.local_name = "" self.local_name = dbus.String(name) def add_data(self, ad_type, data): if not self.data: self.data = dbus.Dictionary({}, signature='yv') self.data[ad_type] = dbus.Array(data, signature='y') @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): print( 'GetAll') if interface != LE_ADVERTISEMENT_IFACE: raise InvalidArgsException() print( 'returning props') return self.get_properties()[LE_ADVERTISEMENT_IFACE] @dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature='', out_signature='') def Release(self): print( '%s: Released!' % self.path) class TestAdvertisement(Advertisement): def __init__(self, bus, index): Advertisement.__init__(self, bus, index, 'peripheral') self.add_service_uuid('180D') self.add_service_uuid('180F') self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04]) self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04]) self.add_local_name('TestAdvertisement') self.include_tx_power = True self.add_data(0x26, [0x01, 0x01, 0x00]) def register_ad_cb(): print( 'Advertisement registered') def register_ad_error_cb(error): print( 'Failed to register advertisement: ' + str(error)) mainloop.quit() class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class NotSupportedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotSupported' class NotPermittedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotPermitted' class InvalidValueLengthException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.InvalidValueLength' class FailedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.Failed' class Application(dbus.service.Object): """ org.bluez.GattApplication1 interface implementation """ def __init__(self, bus): self.path = '/' self.services = [] dbus.service.Object.__init__(self, bus, self.path) self.add_service(HeartRateService(bus, 0)) self.add_service(BatteryService(bus, 1)) self.add_service(TestService(bus, 2)) def get_path(self): return dbus.ObjectPath(self.path) def add_service(self, service): self.services.append(service) @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): response = {} print('GetManagedObjects') for service in self.services: response[service.get_path()] = service.get_properties() chrcs = service.get_characteristics() for chrc in chrcs: response[chrc.get_path()] = chrc.get_properties() descs = chrc.get_descriptors() for desc in descs: response[desc.get_path()] = desc.get_properties() return response class Service(dbus.service.Object): """ org.bluez.GattService1 interface implementation """ PATH_BASE = '/org/bluez/example/service' def __init__(self, bus, index, uuid, primary): self.path = self.PATH_BASE + str(index) self.bus = bus self.uuid = uuid self.primary = primary self.characteristics = [] dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_SERVICE_IFACE: { 'UUID': self.uuid, 'Primary': self.primary, 'Characteristics': dbus.Array( self.get_characteristic_paths(), signature='o') } } def get_path(self): return dbus.ObjectPath(self.path) def add_characteristic(self, characteristic): self.characteristics.append(characteristic) def get_characteristic_paths(self): result = [] for chrc in self.characteristics: result.append(chrc.get_path()) return result def get_characteristics(self): return self.characteristics @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_SERVICE_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_SERVICE_IFACE] class Characteristic(dbus.service.Object): """ org.bluez.GattCharacteristic1 interface implementation """ def __init__(self, bus, index, uuid, flags, service): self.path = service.path + '/char' + str(index) self.bus = bus self.uuid = uuid self.service = service self.flags = flags self.descriptors = [] dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } } def get_path(self): return dbus.ObjectPath(self.path) def add_descriptor(self, descriptor): self.descriptors.append(descriptor) def get_descriptor_paths(self): result = [] for desc in self.descriptors: result.append(desc.get_path()) return result def get_descriptors(self): return self.descriptors @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_CHRC_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_CHRC_IFACE] @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') def ReadValue(self, options): print('Default ReadValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}') def WriteValue(self, value, options): print('Default WriteValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE) def StartNotify(self): print('Default StartNotify called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE) def StopNotify(self): print('Default StopNotify called, returning error') raise NotSupportedException() @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}as') def PropertiesChanged(self, interface, changed, invalidated): pass class Descriptor(dbus.service.Object): """ org.bluez.GattDescriptor1 interface implementation """ def __init__(self, bus, index, uuid, flags, characteristic): self.path = characteristic.path + '/desc' + str(index) self.bus = bus self.uuid = uuid self.flags = flags self.chrc = characteristic dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_DESC_IFACE: { 'Characteristic': self.chrc.get_path(), 'UUID': self.uuid, 'Flags': self.flags, } } def get_path(self): return dbus.ObjectPath(self.path) @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_DESC_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_DESC_IFACE] @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay') def ReadValue(self, options): print ('Default ReadValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}') def WriteValue(self, value, options): print('Default WriteValue called, returning error') raise NotSupportedException() class HeartRateService(Service): """ Fake Heart Rate Service that simulates a fake heart beat and control point behavior. """ HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index): Service.__init__(self, bus, index, self.HR_UUID, True) self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) self.add_characteristic(BodySensorLocationChrc(bus, 1, self)) self.add_characteristic(HeartRateControlPointChrc(bus, 2, self)) self.energy_expended = 0 class HeartRateMeasurementChrc(Characteristic): HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.HR_MSRMT_UUID, ['read'], service) self.notifying = False self.hr_ee_count = 0 def ReadValue(self, options): print('HeartRateMeasurementChrc Read') return [dbus.Byte(6), dbus.Byte(99)] def hr_msrmt_cb(self): value = [] value.append(dbus.Byte(0x06)) value.append(dbus.Byte(randint(90, 130))) if self.hr_ee_count % 10 == 0: value[0] = dbus.Byte(value[0] | 0x08) value.append(dbus.Byte(self.service.energy_expended & 0xff)) value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) self.service.energy_expended = \ min(0xffff, self.service.energy_expended + 1) self.hr_ee_count += 1 print('Updating value: ' + repr(value)) self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) return self.notifying def _update_hr_msrmt_simulation(self): print('Update HR Measurement Simulation') if not self.notifying: return GObject.timeout_add(1000, self.hr_msrmt_cb) def StartNotify(self): if self.notifying: print('Already notifying, nothing to do') return self.notifying = True self._update_hr_msrmt_simulation() def StopNotify(self): if not self.notifying: print('Not notifying, nothing to do') return self.notifying = False self._update_hr_msrmt_simulation() class BodySensorLocationChrc(Characteristic): BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BODY_SNSR_LOC_UUID, ['read'], service) self.notifying = False self.value = [0x01] def ReadValue(self, options): # Return 'Chest' as the sensor location. return [ 0x02 ] def _cb(self): value = [] value.append(dbus.Byte(randint(1, 3))) print('Updating value: ' + repr(value)) self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) return self.notifying def _update_simulation(self): print('Update Simulation') if not self.notifying: return GObject.timeout_add(1000, self._cb) def StartNotify(self): if self.notifying: print('Already notifying, nothing to do') return self.notifying = True self._update_simulation() def StopNotify(self): if not self.notifying: print('Not notifying, nothing to do') return self.notifying = False self._update_simulation() class HeartRateControlPointChrc(Characteristic): HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.HR_CTRL_PT_UUID, ['write'], service) def WriteValue(self, value, options): print('Heart Rate Control Point WriteValue called') if len(value) != 1: raise InvalidValueLengthException() byte = value[0] print('Control Point value: ' + repr(byte)) if byte != 1: raise FailedException("0x80") print('Energy Expended field reset!') self.service.energy_expended = 0 class BatteryService(Service): """ Fake Battery service that emulates a draining battery. """ BATTERY_UUID = '180f' def __init__(self, bus, index): Service.__init__(self, bus, index, self.BATTERY_UUID, True) self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self)) class BatteryLevelCharacteristic(Characteristic): """ Fake Battery Level characteristic. The battery level is drained by 2 points every 5 seconds. """ BATTERY_LVL_UUID = '2a19' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BATTERY_LVL_UUID, ['read', 'notify'], service) self.notifying = False self.battery_lvl = 100 GObject.timeout_add(5000, self.drain_battery) def notify_battery_level(self): if not self.notifying: return self.PropertiesChanged( GATT_CHRC_IFACE, { 'Value': [dbus.Byte(self.battery_lvl)] }, []) def drain_battery(self): if not self.notifying: return True if self.battery_lvl > 0: self.battery_lvl -= 2 if self.battery_lvl < 0: self.battery_lvl = 0 print('Battery Level drained: ' + repr(self.battery_lvl)) self.notify_battery_level() return True def ReadValue(self, options): print('Battery Level read: ' + repr(self.battery_lvl)) return [dbus.Byte(self.battery_lvl)] def StartNotify(self): if self.notifying: print('Already notifying, nothing to do') return self.notifying = True self.notify_battery_level() def StopNotify(self): if not self.notifying: print('Not notifying, nothing to do') return self.notifying = False class TestService(Service): """ Dummy test service that provides characteristics and descriptors that exercise various API functionality. """ TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' def __init__(self, bus, index): Service.__init__(self, bus, index, self.TEST_SVC_UUID, True) self.add_characteristic(TestCharacteristic(bus, 0, self)) self.add_characteristic(TestEncryptCharacteristic(bus, 1, self)) self.add_characteristic(TestSecureCharacteristic(bus, 2, self)) class TestCharacteristic(Characteristic): """ Dummy test characteristic. Allows writing arbitrary bytes to its value, and contains "extended properties", as well as a test descriptor. """ TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['read', 'write', 'writable-auxiliaries'], service) self.value = [] self.add_descriptor(TestDescriptor(bus, 0, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 1, self)) def ReadValue(self, options): print('TestCharacteristic Read: ' + repr(self.value)) return self.value def WriteValue(self, value, options): print('TestCharacteristic Write: ' + repr(value)) self.value = value class TestDescriptor(Descriptor): """ Dummy test descriptor. Returns a static value. """ TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2' def __init__(self, bus, index, characteristic): Descriptor.__init__( self, bus, index, self.TEST_DESC_UUID, ['read', 'write'], characteristic) def ReadValue(self, options): return [ dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') ] class CharacteristicUserDescriptionDescriptor(Descriptor): """ Writable CUD descriptor. """ CUD_UUID = '2901' def __init__(self, bus, index, characteristic): self.writable = 'writable-auxiliaries' in characteristic.flags self.value = array.array('B', b'This is a characteristic for testing') self.value = self.value.tolist() Descriptor.__init__( self, bus, index, self.CUD_UUID, ['read', 'write'], characteristic) def ReadValue(self, options): return self.value def WriteValue(self, value, options): if not self.writable: raise NotPermittedException() self.value = value class TestEncryptCharacteristic(Characteristic): """ Dummy test characteristic requiring encryption. """ TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['encrypt-read', 'encrypt-write'], service) self.value = [] self.add_descriptor(TestEncryptDescriptor(bus, 2, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 3, self)) def ReadValue(self, options): print('TestEncryptCharacteristic Read: ' + repr(self.value)) return self.value def WriteValue(self, value, options): print('TestEncryptCharacteristic Write: ' + repr(value)) self.value = value class TestEncryptDescriptor(Descriptor): """ Dummy test descriptor requiring encryption. Returns a static value. """ TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4' def __init__(self, bus, index, characteristic): Descriptor.__init__( self, bus, index, self.TEST_DESC_UUID, ['encrypt-read', 'encrypt-write'], characteristic) def ReadValue(self, options): return [ dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') ] class TestSecureCharacteristic(Characteristic): """ Dummy test characteristic requiring secure connection. """ TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['secure-read', 'secure-write'], service) self.value = [] self.add_descriptor(TestSecureDescriptor(bus, 2, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 3, self)) def ReadValue(self, options): print('TestSecureCharacteristic Read: ' + repr(self.value)) return self.value def WriteValue(self, value, options): print('TestSecureCharacteristic Write: ' + repr(value)) self.value = value class TestSecureDescriptor(Descriptor): """ Dummy test descriptor requiring secure connection. Returns a static value. """ TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6' def __init__(self, bus, index, characteristic): Descriptor.__init__( self, bus, index, self.TEST_DESC_UUID, ['secure-read', 'secure-write'], characteristic) def ReadValue(self, options): return [ dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') ] def register_app_cb(): print('GATT application registered') def register_app_error_cb(error): print('Failed to register application: ' + str(error)) mainloop.quit() def find_adapter(bus): remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) objects = remote_om.GetManagedObjects() for o, props in objects.items(): if GATT_MANAGER_IFACE in props.keys(): return o return None def main(): global mainloop dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() adapter = find_adapter(bus) if not adapter: print('GattManager1 interface not found') return service_manager = dbus.Interface( bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE) app = Application(bus) mainloop = GObject.MainLoop() print('Registering GATT application...') service_manager.RegisterApplication(app.get_path(), {}, reply_handler=register_app_cb, error_handler=register_app_error_cb) # Advert if not adapter: print( 'LEAdvertisingManager1 interface not found') return adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), "org.freedesktop.DBus.Properties"); adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), LE_ADVERTISING_MANAGER_IFACE) test_advertisement = TestAdvertisement(bus, 0) ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb) mainloop.run() if __name__ == '__main__': main()
#!/usr/bin/env python3 import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service import array try: from gi.repository import GObject except ImportError: import gobject as GObject import sys from random import randint # Advert import gobject mainloop = None BLUEZ_SERVICE_NAME = 'org.bluez' GATT_MANAGER_IFACE = 'org.bluez.GattManager1' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' # Advert LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' # Advert class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class NotSupportedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotSupported' class NotPermittedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotPermitted' class InvalidValueLengthException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.InvalidValueLength' class FailedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.Failed' class Advertisement(dbus.service.Object): PATH_BASE = '/org/bluez/example/advertisement' def __init__(self, bus, index, advertising_type): self.path = self.PATH_BASE + str(index) self.bus = bus self.ad_type = advertising_type self.service_uuids = None self.manufacturer_data = None self.solicit_uuids = None self.service_data = None self.local_name = None self.include_tx_power = None self.data = None dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): properties = dict() properties['Type'] = self.ad_type if self.service_uuids is not None: properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s') if self.solicit_uuids is not None: properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s') if self.manufacturer_data is not None: properties['ManufacturerData'] = dbus.Dictionary( self.manufacturer_data, signature='qv') if self.service_data is not None: properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='sv') if self.local_name is not None: properties['LocalName'] = dbus.String(self.local_name) if self.include_tx_power is not None: properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power) if self.data is not None: properties['Data'] = dbus.Dictionary( self.data, signature='yv') return {LE_ADVERTISEMENT_IFACE: properties} def get_path(self): return dbus.ObjectPath(self.path) def add_service_uuid(self, uuid): if not self.service_uuids: self.service_uuids = [] self.service_uuids.append(uuid) def add_solicit_uuid(self, uuid): if not self.solicit_uuids: self.solicit_uuids = [] self.solicit_uuids.append(uuid) def add_manufacturer_data(self, manuf_code, data): if not self.manufacturer_data: self.manufacturer_data = dbus.Dictionary({}, signature='qv') self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y') def add_service_data(self, uuid, data): if not self.service_data: self.service_data = dbus.Dictionary({}, signature='sv') self.service_data[uuid] = dbus.Array(data, signature='y') def add_local_name(self, name): if not self.local_name: self.local_name = "" self.local_name = dbus.String(name) def add_data(self, ad_type, data): if not self.data: self.data = dbus.Dictionary({}, signature='yv') self.data[ad_type] = dbus.Array(data, signature='y') @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): print( 'GetAll') if interface != LE_ADVERTISEMENT_IFACE: raise InvalidArgsException() print( 'returning props') return self.get_properties()[LE_ADVERTISEMENT_IFACE] @dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature='', out_signature='') def Release(self): print( '%s: Released!' % self.path) class TestAdvertisement(Advertisement): def __init__(self, bus, index): Advertisement.__init__(self, bus, index, 'peripheral') self.add_service_uuid('180D') self.add_service_uuid('180F') self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04]) self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04]) self.add_local_name('TestAdvertisement') self.include_tx_power = True self.add_data(0x26, [0x01, 0x01, 0x00]) def register_ad_cb(): print( 'Advertisement registered') def register_ad_error_cb(error): print( 'Failed to register advertisement: ' + str(error)) mainloop.quit() class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class NotSupportedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotSupported' class NotPermittedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotPermitted' class InvalidValueLengthException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.InvalidValueLength' class FailedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.Failed' class Application(dbus.service.Object): """ org.bluez.GattApplication1 interface implementation """ def __init__(self, bus): self.path = '/' self.services = [] dbus.service.Object.__init__(self, bus, self.path) self.add_service(HeartRateService(bus, 0)) self.add_service(BatteryService(bus, 1)) self.add_service(TestService(bus, 2)) def get_path(self): return dbus.ObjectPath(self.path) def add_service(self, service): self.services.append(service) @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): response = {} print('GetManagedObjects') for service in self.services: response[service.get_path()] = service.get_properties() chrcs = service.get_characteristics() for chrc in chrcs: response[chrc.get_path()] = chrc.get_properties() descs = chrc.get_descriptors() for desc in descs: response[desc.get_path()] = desc.get_properties() return response class Service(dbus.service.Object): """ org.bluez.GattService1 interface implementation """ PATH_BASE = '/org/bluez/example/service' def __init__(self, bus, index, uuid, primary): self.path = self.PATH_BASE + str(index) self.bus = bus self.uuid = uuid self.primary = primary self.characteristics = [] dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_SERVICE_IFACE: { 'UUID': self.uuid, 'Primary': self.primary, 'Characteristics': dbus.Array( self.get_characteristic_paths(), signature='o') } } def get_path(self): return dbus.ObjectPath(self.path) def add_characteristic(self, characteristic): self.characteristics.append(characteristic) def get_characteristic_paths(self): result = [] for chrc in self.characteristics: result.append(chrc.get_path()) return result def get_characteristics(self): return self.characteristics @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_SERVICE_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_SERVICE_IFACE] class Characteristic(dbus.service.Object): """ org.bluez.GattCharacteristic1 interface implementation """ def __init__(self, bus, index, uuid, flags, service): self.path = service.path + '/char' + str(index) self.bus = bus self.uuid = uuid self.service = service self.flags = flags self.descriptors = [] dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } } def get_path(self): return dbus.ObjectPath(self.path) def add_descriptor(self, descriptor): self.descriptors.append(descriptor) def get_descriptor_paths(self): result = [] for desc in self.descriptors: result.append(desc.get_path()) return result def get_descriptors(self): return self.descriptors @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_CHRC_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_CHRC_IFACE] @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') def ReadValue(self, options): print('Default ReadValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}') def WriteValue(self, value, options): print('Default WriteValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE) def StartNotify(self): print('Default StartNotify called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE) def StopNotify(self): print('Default StopNotify called, returning error') raise NotSupportedException() @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}as') def PropertiesChanged(self, interface, changed, invalidated): pass class Descriptor(dbus.service.Object): """ org.bluez.GattDescriptor1 interface implementation """ def __init__(self, bus, index, uuid, flags, characteristic): self.path = characteristic.path + '/desc' + str(index) self.bus = bus self.uuid = uuid self.flags = flags self.chrc = characteristic dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_DESC_IFACE: { 'Characteristic': self.chrc.get_path(), 'UUID': self.uuid, 'Flags': self.flags, } } def get_path(self): return dbus.ObjectPath(self.path) @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_DESC_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_DESC_IFACE] @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay') def ReadValue(self, options): print ('Default ReadValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}') def WriteValue(self, value, options): print('Default WriteValue called, returning error') raise NotSupportedException() class HeartRateService(Service): """ Fake Heart Rate Service that simulates a fake heart beat and control point behavior. """ HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index): Service.__init__(self, bus, index, self.HR_UUID, True) self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) self.add_characteristic(BodySensorLocationChrc(bus, 1, self)) self.add_characteristic(HeartRateControlPointChrc(bus, 2, self)) self.energy_expended = 0 class HeartRateMeasurementChrc(Characteristic): HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.HR_MSRMT_UUID, ['read', 'notify'], service) self.notifying = False self.hr_ee_count = 0 def ReadValue(self, options): print('HeartRateMeasurementChrc Read') return [dbus.Byte(6), dbus.Byte(99)] def hr_msrmt_cb(self): value = [] value.append(dbus.Byte(0x06)) value.append(dbus.Byte(randint(90, 130))) if self.hr_ee_count % 10 == 0: value[0] = dbus.Byte(value[0] | 0x08) value.append(dbus.Byte(self.service.energy_expended & 0xff)) value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) self.service.energy_expended = \ min(0xffff, self.service.energy_expended + 1) self.hr_ee_count += 1 print('Updating value: ' + repr(value)) self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) return self.notifying def _update_hr_msrmt_simulation(self): print('Update HR Measurement Simulation') if not self.notifying: return GObject.timeout_add(1000, self.hr_msrmt_cb) def StartNotify(self): if self.notifying: print('Already notifying, nothing to do') return self.notifying = True self._update_hr_msrmt_simulation() def StopNotify(self): if not self.notifying: print('Not notifying, nothing to do') return self.notifying = False self._update_hr_msrmt_simulation() class BodySensorLocationChrc(Characteristic): BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BODY_SNSR_LOC_UUID, ['read', 'notify'], service) self.notifying = False self.value = [0x01] def ReadValue(self, options): # Return 'Chest' as the sensor location. return [ 0x02 ] def _cb(self): value = [] value.append(dbus.Byte(randint(1, 3))) print('Updating value: ' + repr(value)) self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) return self.notifying def _update_simulation(self): print('Update Simulation') if not self.notifying: return GObject.timeout_add(1000, self._cb) def StartNotify(self): if self.notifying: print('Already notifying, nothing to do') return self.notifying = True self._update_simulation() def StopNotify(self): if not self.notifying: print('Not notifying, nothing to do') return self.notifying = False self._update_simulation() class HeartRateControlPointChrc(Characteristic): HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.HR_CTRL_PT_UUID, ['write'], service) def WriteValue(self, value, options): print('Heart Rate Control Point WriteValue called') if len(value) != 1: raise InvalidValueLengthException() byte = value[0] print('Control Point value: ' + repr(byte)) if byte != 1: raise FailedException("0x80") print('Energy Expended field reset!') self.service.energy_expended = 0 class BatteryService(Service): """ Fake Battery service that emulates a draining battery. """ BATTERY_UUID = '180f' def __init__(self, bus, index): Service.__init__(self, bus, index, self.BATTERY_UUID, True) self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self)) class BatteryLevelCharacteristic(Characteristic): """ Fake Battery Level characteristic. The battery level is drained by 2 points every 5 seconds. """ BATTERY_LVL_UUID = '2a19' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BATTERY_LVL_UUID, ['read', 'notify'], service) self.notifying = False self.battery_lvl = 100 GObject.timeout_add(5000, self.drain_battery) def notify_battery_level(self): if not self.notifying: return self.PropertiesChanged( GATT_CHRC_IFACE, { 'Value': [dbus.Byte(self.battery_lvl)] }, []) def drain_battery(self): if not self.notifying: return True if self.battery_lvl > 0: self.battery_lvl -= 2 if self.battery_lvl < 0: self.battery_lvl = 0 print('Battery Level drained: ' + repr(self.battery_lvl)) self.notify_battery_level() return True def ReadValue(self, options): print('Battery Level read: ' + repr(self.battery_lvl)) return [dbus.Byte(self.battery_lvl)] def StartNotify(self): if self.notifying: print('Already notifying, nothing to do') return self.notifying = True self.notify_battery_level() def StopNotify(self): if not self.notifying: print('Not notifying, nothing to do') return self.notifying = False class TestService(Service): """ Dummy test service that provides characteristics and descriptors that exercise various API functionality. """ TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' def __init__(self, bus, index): Service.__init__(self, bus, index, self.TEST_SVC_UUID, True) self.add_characteristic(TestCharacteristic(bus, 0, self)) self.add_characteristic(TestEncryptCharacteristic(bus, 1, self)) self.add_characteristic(TestSecureCharacteristic(bus, 2, self)) class TestCharacteristic(Characteristic): """ Dummy test characteristic. Allows writing arbitrary bytes to its value, and contains "extended properties", as well as a test descriptor. """ TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['read', 'write', 'writable-auxiliaries'], service) self.value = [] self.add_descriptor(TestDescriptor(bus, 0, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 1, self)) def ReadValue(self, options): print('TestCharacteristic Read: ' + repr(self.value)) return self.value def WriteValue(self, value, options): print('TestCharacteristic Write: ' + repr(value)) self.value = value class TestDescriptor(Descriptor): """ Dummy test descriptor. Returns a static value. """ TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2' def __init__(self, bus, index, characteristic): Descriptor.__init__( self, bus, index, self.TEST_DESC_UUID, ['read', 'write'], characteristic) def ReadValue(self, options): return [ dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') ] class CharacteristicUserDescriptionDescriptor(Descriptor): """ Writable CUD descriptor. """ CUD_UUID = '2901' def __init__(self, bus, index, characteristic): self.writable = 'writable-auxiliaries' in characteristic.flags self.value = array.array('B', b'This is a characteristic for testing') self.value = self.value.tolist() Descriptor.__init__( self, bus, index, self.CUD_UUID, ['read', 'write'], characteristic) def ReadValue(self, options): return self.value def WriteValue(self, value, options): if not self.writable: raise NotPermittedException() self.value = value class TestEncryptCharacteristic(Characteristic): """ Dummy test characteristic requiring encryption. """ TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['encrypt-read', 'encrypt-write'], service) self.value = [] self.add_descriptor(TestEncryptDescriptor(bus, 2, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 3, self)) def ReadValue(self, options): print('TestEncryptCharacteristic Read: ' + repr(self.value)) return self.value def WriteValue(self, value, options): print('TestEncryptCharacteristic Write: ' + repr(value)) self.value = value class TestEncryptDescriptor(Descriptor): """ Dummy test descriptor requiring encryption. Returns a static value. """ TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4' def __init__(self, bus, index, characteristic): Descriptor.__init__( self, bus, index, self.TEST_DESC_UUID, ['encrypt-read', 'encrypt-write'], characteristic) def ReadValue(self, options): return [ dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') ] class TestSecureCharacteristic(Characteristic): """ Dummy test characteristic requiring secure connection. """ TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['secure-read', 'secure-write'], service) self.value = [] self.add_descriptor(TestSecureDescriptor(bus, 2, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 3, self)) def ReadValue(self, options): print('TestSecureCharacteristic Read: ' + repr(self.value)) return self.value def WriteValue(self, value, options): print('TestSecureCharacteristic Write: ' + repr(value)) self.value = value class TestSecureDescriptor(Descriptor): """ Dummy test descriptor requiring secure connection. Returns a static value. """ TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6' def __init__(self, bus, index, characteristic): Descriptor.__init__( self, bus, index, self.TEST_DESC_UUID, ['secure-read', 'secure-write'], characteristic) def ReadValue(self, options): return [ dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') ] def register_app_cb(): print('GATT application registered') def register_app_error_cb(error): print('Failed to register application: ' + str(error)) mainloop.quit() def find_adapter(bus): remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) objects = remote_om.GetManagedObjects() for o, props in objects.items(): if GATT_MANAGER_IFACE in props.keys(): return o return None def main(): global mainloop dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() adapter = find_adapter(bus) if not adapter: print('GattManager1 interface not found') return service_manager = dbus.Interface( bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE) app = Application(bus) mainloop = GObject.MainLoop() print('Registering GATT application...') service_manager.RegisterApplication(app.get_path(), {}, reply_handler=register_app_cb, error_handler=register_app_error_cb) # Advert if not adapter: print( 'LEAdvertisingManager1 interface not found') return adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), "org.freedesktop.DBus.Properties"); adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), LE_ADVERTISING_MANAGER_IFACE) test_advertisement = TestAdvertisement(bus, 0) ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb) mainloop.run() if __name__ == '__main__': main()
Attachment:
btmon_notify.log
Description: Binary data
Attachment:
btmon_read.log
Description: Binary data