Re: How to add "Service Changed Indication"

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Luiz,

I'm not sure if I can attach files to this forum. I just try to attach
the files about the "Service Changed Indication" issue.

Here is the list of files:
bt_read.py - the script that the "Heart Rate Measurement" and "Body
Sensor Location" characteristics only have the Read attribute
bt_notify.py - the script that the "Heart Rate Measurement" and "Body
Sensor Location" characteristics only have both the Read and Notify
attributes
btmon_read.log - the btmon log when running bt_read.py
btmon_notify.log - the btmon log when running bt_notify.py

Thank you very much!

On Wed, Jan 27, 2021 at 12:46 AM Kenny Bian <kennybian@xxxxxxxxx> wrote:
>
> Hi Luiz,
>
> On Wed, Jan 27, 2021 at 12:42 AM Kenny Bian <kennybian@xxxxxxxxx> wrote:
> >
> > Hi Luiz,
> >
> > Thank you so much for your information.
> >
> > I created a test GATT server from the sample
> > code(https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server).
> > I have 2 python scripts. In the "bt_read.py" I created, the Heart Rate
> > Measurement(2A37) and Body Sensor Location(2A38) only have Read
> > attribute. In the "bt_notify.py" I created, the Heart Rate
> > Measurement(2A37) and Body Sensor Location(2A38) only have both the
> > Read and Notify attribute.
> >
> > Here is what I did:
> > I ran the "bt_read.py" and captured the btmon log. Then I ran
> > "bt_notify.py" and captured the btmon log. I read both logs. But I
> > don't see there is any info regarding the "Service Changed
> > Indication".
> >
> > Here is the btmon log with both the Read and Notify attributes below.
> > Thanks again for your help!
> > =========================================================================
> > Bluetooth monitor ver 5.48
> > = Note: Linux version 4.19.35-g92e18fefc77 (armv7l)                    0.809169
> > = Note: Bluetooth subsystem version 2.22                               0.809182
> > = New Index: 00:16:A4:4A:2D:27 (Primary,UART,hci0)              [hci0] 0.809187
> > = Open Index: 00:16:A4:4A:2D:27                                 [hci0] 0.809190
> > = Index Info: 00:16:A4:4.. (Cypress Semiconductor Corporation)  [hci0] 0.809194
> > @ MGMT Open: bluetoothd (privileged) version 1.14             {0x0002} 0.809200
> > @ MGMT Open: bluetoothd (privileged) version 1.14             {0x0001} 0.809203
> > @ MGMT Open: btmon (privileged) version 1.14                  {0x0003} 0.809276
> > > ACL Data RX: Handle 65 flags 0x02 dlen 8                   #1 [hci0] 4.543721
> >       ATT: Handle Value Notification (0x1b) len 3
> >         Handle: 0x0016
> >           Data: 64
> > > ACL Data RX: Handle 65 flags 0x02 dlen 11                  #2 [hci0] 6.883753
> >       ATT: Read By Group Type Request (0x10) len 6
> >         Handle range: 0x0001-0xffff
> >         Attribute group type: Primary Service (0x2800)
> > < ACL Data TX: Handle 65 flags 0x00 dlen 30                  #3 [hci0] 6.884412
> >       ATT: Read By Group Type Response (0x11) len 25
> >         Attribute data length: 6
> >         Attribute group list: 4 entries
> >         Handle range: 0x0001-0x0005
> >         UUID: Generic Access Profile (0x1800)
> >         Handle range: 0x0006-0x0009
> >         UUID: Generic Attribute Profile (0x1801)
> >         Handle range: 0x0234-0x023c
> >         UUID: Heart Rate (0x180d)
> >         Handle range: 0x023d-0x0240
> >         UUID: Battery Service (0x180f)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 11                  #4 [hci0] 6.944638
> >       ATT: Read By Group Type Request (0x10) len 6
> >         Handle range: 0x0241-0xffff
> >         Attribute group type: Primary Service (0x2800)
> > < ACL Data TX: Handle 65 flags 0x00 dlen 26                  #5 [hci0] 6.945293
> >       ATT: Read By Group Type Response (0x11) len 21
> >         Attribute data length: 20
> >         Attribute group list: 1 entry
> >         Handle range: 0x0241-0x0250
> >         UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef0)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 11                  #6 [hci0] 7.004723
> >       ATT: Read By Group Type Request (0x10) len 6
> >         Handle range: 0x0251-0xffff
> >         Attribute group type: Primary Service (0x2800)
> > < ACL Data TX: Handle 65 flags 0x00 dlen 9                   #7 [hci0] 7.005369
> >       ATT: Error Response (0x01) len 4
> >         Read By Group Type Request (0x10)
> >         Handle: 0x0251
> >         Error: Attribute Not Found (0x0a)
> > > HCI Event: Number of Completed Packets (0x13) plen 5       #8 [hci0] 7.005549
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 11                  #9 [hci0] 7.064450
> >       ATT: Read By Type Request (0x08) len 6
> >         Handle range: 0x0234-0x023c
> >         Attribute type: Characteristic (0x2803)
> > < ACL Data TX: Handle 65 flags 0x00 dlen 27                 #10 [hci0] 7.065112
> >       ATT: Read By Type Response (0x09) len 22
> >         Attribute data length: 7
> >         Attribute data list: 3 entries
> >         Handle: 0x0235
> >         Value: 123602372a
> >         Handle: 0x0238
> >         Value: 123902382a
> >         Handle: 0x023b
> >         Value: 083c02392a
> > > ACL Data RX: Handle 65 flags 0x02 dlen 11                 #11 [hci0] 7.124428
> >       ATT: Read By Type Request (0x08) len 6
> >         Handle range: 0x023d-0x0240
> >         Attribute type: Characteristic (0x2803)
> > < ACL Data TX: Handle 65 flags 0x00 dlen 13                 #12 [hci0] 7.125226
> >       ATT: Read By Type Response (0x09) len 8
> >         Attribute data length: 7
> >         Attribute data list: 1 entry
> >         Handle: 0x023e
> >         Value: 123f02192a
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #13 [hci0] 7.125376
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 11                 #14 [hci0] 7.184416
> >       ATT: Read By Type Request (0x08) len 6
> >         Handle range: 0x0241-0x0250
> >         Attribute type: Characteristic (0x2803)
> > < ACL Data TX: Handle 65 flags 0x00 dlen 69                 #15 [hci0] 7.185101
> >       ATT: Read By Type Response (0x09) len 64
> >         Attribute data length: 21
> >         Attribute data list: 3 entries
> >         Handle: 0x0242
> >         Value: 8a4302f1debc9a785634127856341278563412
> >         Handle: 0x0247
> >         Value: 8a4802f3debc9a785634127856341278563412
> >         Handle: 0x024c
> >         Value: 8a4d02f5debc9a785634127856341278563412
> > > ACL Data RX: Handle 65 flags 0x02 dlen 11                 #16 [hci0] 7.244526
> >       ATT: Read By Type Request (0x08) len 6
> >         Handle range: 0x024e-0x0250
> >         Attribute type: Characteristic (0x2803)
> > < ACL Data TX: Handle 65 flags 0x00 dlen 9                  #17 [hci0] 7.246144
> >       ATT: Error Response (0x01) len 4
> >         Read By Type Request (0x08)
> >         Handle: 0x024e
> >         Error: Attribute Not Found (0x0a)
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #18 [hci0] 7.246564
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #19 [hci0] 7.304500
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x0237-0x0237
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #20 [hci0] 7.305951
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x0237
> >         UUID: Client Characteristic Configuration (0x2902)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #21 [hci0] 7.364540
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x023a-0x023a
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #22 [hci0] 7.366242
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x023a
> >         UUID: Client Characteristic Configuration (0x2902)
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #23 [hci0] 7.366660
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #24 [hci0] 7.424617
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x0240-0x0240
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #25 [hci0] 7.426085
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x0240
> >         UUID: Client Characteristic Configuration (0x2902)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #26 [hci0] 7.484523
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x0244-0x0246
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #27 [hci0] 7.486104
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x0244
> >         UUID: Characteristic Extended Properties (0x2900)
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #28 [hci0] 7.486520
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #29 [hci0] 7.544546
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x0245-0x0246
> > < ACL Data TX: Handle 65 flags 0x00 dlen 24                 #30 [hci0] 7.546011
> >       ATT: Find Information Response (0x05) len 19
> >         Format: UUID-128 (0x02)
> >         Handle: 0x0245
> >         UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef2)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #31 [hci0] 7.604713
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x0246-0x0246
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #32 [hci0] 7.606323
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x0246
> >         UUID: Characteristic User Description (0x2901)
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #33 [hci0] 7.606747
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #34 [hci0] 7.664243
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x0249-0x024b
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #35 [hci0] 7.665862
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x0249
> >         UUID: Characteristic Extended Properties (0x2900)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #36 [hci0] 7.724603
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x024a-0x024b
> > < ACL Data TX: Handle 65 flags 0x00 dlen 24                 #37 [hci0] 7.726315
> >       ATT: Find Information Response (0x05) len 19
> >         Format: UUID-128 (0x02)
> >         Handle: 0x024a
> >         UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef4)
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #38 [hci0] 7.726625
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #39 [hci0] 7.784545
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x024b-0x024b
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #40 [hci0] 7.785996
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x024b
> >         UUID: Characteristic User Description (0x2901)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #41 [hci0] 7.844187
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x024e-0x0250
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #42 [hci0] 7.845777
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x024e
> >         UUID: Characteristic Extended Properties (0x2900)
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #43 [hci0] 7.846215
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #44 [hci0] 7.904466
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x024f-0x0250
> > < ACL Data TX: Handle 65 flags 0x00 dlen 24                 #45 [hci0] 7.906160
> >       ATT: Find Information Response (0x05) len 19
> >         Format: UUID-128 (0x02)
> >         Handle: 0x024f
> >         UUID: Vendor specific (12345678-1234-5678-1234-56789abcdef6)
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                  #46 [hci0] 7.964493
> >       ATT: Find Information Request (0x04) len 4
> >         Handle range: 0x0250-0x0250
> > < ACL Data TX: Handle 65 flags 0x00 dlen 10                 #47 [hci0] 7.966067
> >       ATT: Find Information Response (0x05) len 5
> >         Format: UUID-16 (0x01)
> >         Handle: 0x0250
> >         UUID: Characteristic User Description (0x2901)
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #48 [hci0] 7.966489
> >         Num handles: 1
> >         Handle: 65
> >         Count: 2
> > > HCI Event: Number of Completed Packets (0x13) plen 5      #49 [hci0] 8.203559
> >         Num handles: 1
> >         Handle: 65
> >         Count: 1
> > > ACL Data RX: Handle 65 flags 0x02 dlen 7                 #50 [hci0] 11.833674
> >       ATT: Read Request (0x0a) len 2
> >         Handle: 0x0236
> > < ACL Data TX: Handle 65 flags 0x00 dlen 7                 #51 [hci0] 11.840770
> >       ATT: Read Response (0x0b) len 2
> >         Value: 0663
> > > HCI Event: Number of Completed Packets (0x13) plen 5     #52 [hci0] 12.078687
> >         Num handles: 1
> >         Handle: 65
> >         Count: 1
> > > ACL Data RX: Handle 65 flags 0x02 dlen 9                 #53 [hci0] 14.055212
> >       ATT: Write Request (0x12) len 4
> >         Handle: 0x0237
> >           Data: 0100
> > < ACL Data TX: Handle 65 flags 0x00 dlen 5                 #54 [hci0] 14.056230
> >       ATT: Write Response (0x13) len 0
> > > HCI Event: Number of Completed Packets (0x13) plen 5     #55 [hci0] 14.329045
> >         Num handles: 1
> >         Handle: 65
> >         Count: 1
> > < ACL Data TX: Handle 65 flags 0x00 dlen 11                #56 [hci0] 15.078671
> >       ATT: Handle Value Notification (0x1b) len 6
> >         Handle: 0x0236
> >           Data: 0e750000
> > > HCI Event: Number of Completed Packets (0x13) plen 5     #57 [hci0] 15.330353
> >         Num handles: 1
> >         Handle: 65
> >         Count: 1
> > < ACL Data TX: Handle 65 flags 0x00 dlen 9                 #58 [hci0] 16.071839
> >       ATT: Handle Value Notification (0x1b) len 4
> >         Handle: 0x0236
> >           Data: 066e
> > > ACL Data RX: H
> >
> > On Mon, Jan 25, 2021 at 9:59 AM Luiz Augusto von Dentz
> > <luiz.dentz@xxxxxxxxx> wrote:
> > >
> > > Hi Kenny,
> > >
> > > On Sun, Jan 24, 2021 at 10:42 PM Kenny Bian <kennybian@xxxxxxxxx> wrote:
> > > >
> > > > Hi Luiz,
> > > >
> > > > Thank you so much for your reply. I appreciate it.
> > > >
> > > > By "registering the services", do you mean "RegisterApplication()" in
> > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server#n656?
> > > > If that is the case, I believe I already registered the services.
> > > > I checked the files in /var/lib/bluetooth. According to
> > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/settings-storage.txt#n321,
> > > > there should be a "[ServiceChanged]" section in the "info" file. But I
> > > > don't see it in the "info" file. Is there a way to tell the "Service
> > > > Changed Indication" is actually working?
> > > > Let's suppose the "Service Changed Indication" is already enabled, is
> > > > there a way for the mobile app to check on their side to tell which
> > > > service(s) got changed?
> > >
> > > HCI traces (btmon) should be able to tell you if it has been
> > > subscribed or not, if there is an Indication in it it probably means
> > > the remote has subscribed.
> > >
> > > > Thanks again for your help.
> > > >
> > > > On Sun, Jan 24, 2021 at 7:35 PM Luiz Augusto von Dentz
> > > > <luiz.dentz@xxxxxxxxx> wrote:
> > > > >
> > > > > Hi Kenny,
> > > > >
> > > > > On Sun, Jan 24, 2021 at 12:45 AM Kenny Bian <kennybian@xxxxxxxxx> wrote:
> > > > > >
> > > > > > Hello,
> > > > > >
> > > > > > We implemented a GATT server on Linux in Python. The code is based on
> > > > > > the code sample(https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server).
> > > > > > The BlueZ version is 5.48. But we found a problem. The cached data in
> > > > > > /var/lib/bluetooth caused the mobile app to crash if some
> > > > > > characteristics are changed. After some research, we found "Under BLE
> > > > > > standard 'Generic Attribute'(0x1801), there is a Characteristic
> > > > > > 'Service Changed' (0x2A05) with 'indicate' property", see
> > > > > > https://github.com/espressif/esp-idf/issues/1777.
> > > > > >
> > > > > > The questions we have:
> > > > > > How to enable the "Service Changed Indication"(0x2A05) in the
> > > > > > bluetooth? Is there any code example in Python?
> > > > >
> > > > > If you are registering the services with Bluetoothd then it should
> > > > > generate the service change automatically:
> > > > >
> > > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/src/gatt-database.c#n1185
> > > > >
> > > > > When a new service is registered it is indicated here:
> > > > >
> > > > > https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/src/gatt-database.c#n1452
> > > > >
> > > > >
> > > > > --
> > > > > Luiz Augusto von Dentz
> > >
> > >
> > >
> > > --
> > > Luiz Augusto von Dentz
#!/usr/bin/env python3

import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service

import array
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import sys

from random import randint

# Advert
import gobject

mainloop = None

BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'

GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE =    'org.bluez.GattDescriptor1'

# Advert
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'

# Advert
class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'


class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'


class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'


class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'


class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


class Advertisement(dbus.service.Object):
    PATH_BASE = '/org/bluez/example/advertisement'

    def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.local_name = None
        self.include_tx_power = None
        self.data = None
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        properties = dict()
        properties['Type'] = self.ad_type
        if self.service_uuids is not None:
            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
                                                    signature='s')
        if self.solicit_uuids is not None:
            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
                                                    signature='s')
        if self.manufacturer_data is not None:
            properties['ManufacturerData'] = dbus.Dictionary(
                self.manufacturer_data, signature='qv')
        if self.service_data is not None:
            properties['ServiceData'] = dbus.Dictionary(self.service_data,
                                                        signature='sv')
        if self.local_name is not None:
            properties['LocalName'] = dbus.String(self.local_name)
        if self.include_tx_power is not None:
            properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)

        if self.data is not None:
            properties['Data'] = dbus.Dictionary(
                self.data, signature='yv')
        return {LE_ADVERTISEMENT_IFACE: properties}

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service_uuid(self, uuid):
        if not self.service_uuids:
            self.service_uuids = []
        self.service_uuids.append(uuid)

    def add_solicit_uuid(self, uuid):
        if not self.solicit_uuids:
            self.solicit_uuids = []
        self.solicit_uuids.append(uuid)

    def add_manufacturer_data(self, manuf_code, data):
        if not self.manufacturer_data:
            self.manufacturer_data = dbus.Dictionary({}, signature='qv')
        self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')

    def add_service_data(self, uuid, data):
        if not self.service_data:
            self.service_data = dbus.Dictionary({}, signature='sv')
        self.service_data[uuid] = dbus.Array(data, signature='y')

    def add_local_name(self, name):
        if not self.local_name:
            self.local_name = ""
        self.local_name = dbus.String(name)

    def add_data(self, ad_type, data):
        if not self.data:
            self.data = dbus.Dictionary({}, signature='yv')
        self.data[ad_type] = dbus.Array(data, signature='y')

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        print( 'GetAll')
        if interface != LE_ADVERTISEMENT_IFACE:
            raise InvalidArgsException()
        print( 'returning props')
        return self.get_properties()[LE_ADVERTISEMENT_IFACE]

    @dbus.service.method(LE_ADVERTISEMENT_IFACE,
                         in_signature='',
                         out_signature='')
    def Release(self):
        print( '%s: Released!' % self.path)

class TestAdvertisement(Advertisement):

    def __init__(self, bus, index):
        Advertisement.__init__(self, bus, index, 'peripheral')
        self.add_service_uuid('180D')
        self.add_service_uuid('180F')
        self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04])
        self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04])
        self.add_local_name('TestAdvertisement')
        self.include_tx_power = True
        self.add_data(0x26, [0x01, 0x01, 0x00])


def register_ad_cb():
    print( 'Advertisement registered')


def register_ad_error_cb(error):
    print( 'Failed to register advertisement: ' + str(error))
    mainloop.quit()





class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'

class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'

class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'

class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'

class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


class Application(dbus.service.Object):
    """
    org.bluez.GattApplication1 interface implementation
    """
    def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        self.add_service(HeartRateService(bus, 0))
        self.add_service(BatteryService(bus, 1))
        self.add_service(TestService(bus, 2))

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response


class Service(dbus.service.Object):
    """
    org.bluez.GattService1 interface implementation
    """
    PATH_BASE = '/org/bluez/example/service'

    def __init__(self, bus, index, uuid, primary):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_SERVICE_IFACE: {
                        'UUID': self.uuid,
                        'Primary': self.primary,
                        'Characteristics': dbus.Array(
                                self.get_characteristic_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_characteristic(self, characteristic):
        self.characteristics.append(characteristic)

    def get_characteristic_paths(self):
        result = []
        for chrc in self.characteristics:
            result.append(chrc.get_path())
        return result

    def get_characteristics(self):
        return self.characteristics

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_SERVICE_IFACE]


class Characteristic(dbus.service.Object):
    """
    org.bluez.GattCharacteristic1 interface implementation
    """
    def __init__(self, bus, index, uuid, flags, service):
        self.path = service.path + '/char' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.service = service
        self.flags = flags
        self.descriptors = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_CHRC_IFACE: {
                        'Service': self.service.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                        'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_descriptor(self, descriptor):
        self.descriptors.append(descriptor)

    def get_descriptor_paths(self):
        result = []
        for desc in self.descriptors:
            result.append(desc.get_path())
        return result

    def get_descriptors(self):
        return self.descriptors

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_CHRC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_CHRC_IFACE]

    @dbus.service.method(GATT_CHRC_IFACE,
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print('Default ReadValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StartNotify(self):
        print('Default StartNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StopNotify(self):
        print('Default StopNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.signal(DBUS_PROP_IFACE,
                         signature='sa{sv}as')
    def PropertiesChanged(self, interface, changed, invalidated):
        pass


class Descriptor(dbus.service.Object):
    """
    org.bluez.GattDescriptor1 interface implementation
    """
    def __init__(self, bus, index, uuid, flags, characteristic):
        self.path = characteristic.path + '/desc' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_DESC_IFACE: {
                        'Characteristic': self.chrc.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_DESC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_DESC_IFACE]

    @dbus.service.method(GATT_DESC_IFACE,
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print ('Default ReadValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise NotSupportedException()


class HeartRateService(Service):
    """
    Fake Heart Rate Service that simulates a fake heart beat and control point
    behavior.

    """
    HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.HR_UUID, True)
        self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))
        self.add_characteristic(BodySensorLocationChrc(bus, 1, self))
        self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))
        self.energy_expended = 0


class HeartRateMeasurementChrc(Characteristic):
    HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.HR_MSRMT_UUID,
                ['read'],
                service)
        self.notifying = False
        self.hr_ee_count = 0
        

    def ReadValue(self, options):
        print('HeartRateMeasurementChrc Read')
        return [dbus.Byte(6), dbus.Byte(99)]


    def hr_msrmt_cb(self):
        value = []
        value.append(dbus.Byte(0x06))

        value.append(dbus.Byte(randint(90, 130)))

        if self.hr_ee_count % 10 == 0:
            value[0] = dbus.Byte(value[0] | 0x08)
            value.append(dbus.Byte(self.service.energy_expended & 0xff))
            value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))

        self.service.energy_expended = \
                min(0xffff, self.service.energy_expended + 1)
        self.hr_ee_count += 1

        print('Updating value: ' + repr(value))

        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])

        return self.notifying

    def _update_hr_msrmt_simulation(self):
        print('Update HR Measurement Simulation')

        if not self.notifying:
            return

        GObject.timeout_add(1000, self.hr_msrmt_cb)

    def StartNotify(self):
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self._update_hr_msrmt_simulation()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False
        self._update_hr_msrmt_simulation()


class BodySensorLocationChrc(Characteristic):
    BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BODY_SNSR_LOC_UUID,
                ['read'],
                service)
        self.notifying = False
        self.value = [0x01]

    def ReadValue(self, options):
        # Return 'Chest' as the sensor location.
        return [ 0x02 ]


    def _cb(self):
        value = []

        value.append(dbus.Byte(randint(1, 3)))

        print('Updating value: ' + repr(value))

        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])

        return self.notifying

    def _update_simulation(self):
        print('Update Simulation')

        if not self.notifying:
            return

        GObject.timeout_add(1000, self._cb)

    def StartNotify(self):
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self._update_simulation()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False
        self._update_simulation()


class HeartRateControlPointChrc(Characteristic):
    HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.HR_CTRL_PT_UUID,
                ['write'],
                service)

    def WriteValue(self, value, options):
        print('Heart Rate Control Point WriteValue called')

        if len(value) != 1:
            raise InvalidValueLengthException()

        byte = value[0]
        print('Control Point value: ' + repr(byte))

        if byte != 1:
            raise FailedException("0x80")

        print('Energy Expended field reset!')
        self.service.energy_expended = 0


class BatteryService(Service):
    """
    Fake Battery service that emulates a draining battery.

    """
    BATTERY_UUID = '180f'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.BATTERY_UUID, True)
        self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))


class BatteryLevelCharacteristic(Characteristic):
    """
    Fake Battery Level characteristic. The battery level is drained by 2 points
    every 5 seconds.

    """
    BATTERY_LVL_UUID = '2a19'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BATTERY_LVL_UUID,
                ['read', 'notify'],
                service)
        self.notifying = False
        self.battery_lvl = 100
        GObject.timeout_add(5000, self.drain_battery)

    def notify_battery_level(self):
        if not self.notifying:
            return
        self.PropertiesChanged(
                GATT_CHRC_IFACE,
                { 'Value': [dbus.Byte(self.battery_lvl)] }, [])

    def drain_battery(self):
        if not self.notifying:
            return True
        if self.battery_lvl > 0:
            self.battery_lvl -= 2
            if self.battery_lvl < 0:
                self.battery_lvl = 0
        print('Battery Level drained: ' + repr(self.battery_lvl))
        self.notify_battery_level()
        return True

    def ReadValue(self, options):
        print('Battery Level read: ' + repr(self.battery_lvl))
        return [dbus.Byte(self.battery_lvl)]

    def StartNotify(self):
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self.notify_battery_level()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False


class TestService(Service):
    """
    Dummy test service that provides characteristics and descriptors that
    exercise various API functionality.

    """
    TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)
        self.add_characteristic(TestCharacteristic(bus, 0, self))
        self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))
        self.add_characteristic(TestSecureCharacteristic(bus, 2, self))

class TestCharacteristic(Characteristic):
    """
    Dummy test characteristic. Allows writing arbitrary bytes to its value, and
    contains "extended properties", as well as a test descriptor.

    """
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['read', 'write', 'writable-auxiliaries'],
                service)
        self.value = []
        self.add_descriptor(TestDescriptor(bus, 0, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 1, self))

    def ReadValue(self, options):
        print('TestCharacteristic Read: ' + repr(self.value))
        return self.value

    def WriteValue(self, value, options):
        print('TestCharacteristic Write: ' + repr(value))
        self.value = value


class TestDescriptor(Descriptor):
    """
    Dummy test descriptor. Returns a static value.

    """
    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.TEST_DESC_UUID,
                ['read', 'write'],
                characteristic)

    def ReadValue(self, options):
        return [
                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
        ]


class CharacteristicUserDescriptionDescriptor(Descriptor):
    """
    Writable CUD descriptor.

    """
    CUD_UUID = '2901'

    def __init__(self, bus, index, characteristic):
        self.writable = 'writable-auxiliaries' in characteristic.flags
        self.value = array.array('B', b'This is a characteristic for testing')
        self.value = self.value.tolist()
        Descriptor.__init__(
                self, bus, index,
                self.CUD_UUID,
                ['read', 'write'],
                characteristic)

    def ReadValue(self, options):
        return self.value

    def WriteValue(self, value, options):
        if not self.writable:
            raise NotPermittedException()
        self.value = value

class TestEncryptCharacteristic(Characteristic):
    """
    Dummy test characteristic requiring encryption.

    """
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['encrypt-read', 'encrypt-write'],
                service)
        self.value = []
        self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 3, self))

    def ReadValue(self, options):
        print('TestEncryptCharacteristic Read: ' + repr(self.value))
        return self.value

    def WriteValue(self, value, options):
        print('TestEncryptCharacteristic Write: ' + repr(value))
        self.value = value

class TestEncryptDescriptor(Descriptor):
    """
    Dummy test descriptor requiring encryption. Returns a static value.

    """
    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.TEST_DESC_UUID,
                ['encrypt-read', 'encrypt-write'],
                characteristic)

    def ReadValue(self, options):
        return [
                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
        ]


class TestSecureCharacteristic(Characteristic):
    """
    Dummy test characteristic requiring secure connection.

    """
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['secure-read', 'secure-write'],
                service)
        self.value = []
        self.add_descriptor(TestSecureDescriptor(bus, 2, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 3, self))

    def ReadValue(self, options):
        print('TestSecureCharacteristic Read: ' + repr(self.value))
        return self.value

    def WriteValue(self, value, options):
        print('TestSecureCharacteristic Write: ' + repr(value))
        self.value = value


class TestSecureDescriptor(Descriptor):
    """
    Dummy test descriptor requiring secure connection. Returns a static value.

    """
    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.TEST_DESC_UUID,
                ['secure-read', 'secure-write'],
                characteristic)

    def ReadValue(self, options):
        return [
                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
        ]

def register_app_cb():
    print('GATT application registered')


def register_app_error_cb(error):
    print('Failed to register application: ' + str(error))
    mainloop.quit()


def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()

    for o, props in objects.items():
        if GATT_MANAGER_IFACE in props.keys():
            return o

    return None

def main():
    global mainloop

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    adapter = find_adapter(bus)
    if not adapter:
        print('GattManager1 interface not found')
        return

    service_manager = dbus.Interface(
            bus.get_object(BLUEZ_SERVICE_NAME, adapter),
            GATT_MANAGER_IFACE)

    app = Application(bus)

    mainloop = GObject.MainLoop()

    print('Registering GATT application...')

    service_manager.RegisterApplication(app.get_path(), {},
                                    reply_handler=register_app_cb,
                                    error_handler=register_app_error_cb)


    # Advert
    if not adapter:
        print( 'LEAdvertisingManager1 interface not found')
        return

    adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                   "org.freedesktop.DBus.Properties");

    adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                LE_ADVERTISING_MANAGER_IFACE)

    test_advertisement = TestAdvertisement(bus, 0)

    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)

    mainloop.run()

if __name__ == '__main__':
    main()

#!/usr/bin/env python3

import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service

import array
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import sys

from random import randint

# Advert
import gobject

mainloop = None

BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'

GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE =    'org.bluez.GattDescriptor1'

# Advert
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'

# Advert
class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'


class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'


class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'


class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'


class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


class Advertisement(dbus.service.Object):
    PATH_BASE = '/org/bluez/example/advertisement'

    def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.local_name = None
        self.include_tx_power = None
        self.data = None
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        properties = dict()
        properties['Type'] = self.ad_type
        if self.service_uuids is not None:
            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
                                                    signature='s')
        if self.solicit_uuids is not None:
            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
                                                    signature='s')
        if self.manufacturer_data is not None:
            properties['ManufacturerData'] = dbus.Dictionary(
                self.manufacturer_data, signature='qv')
        if self.service_data is not None:
            properties['ServiceData'] = dbus.Dictionary(self.service_data,
                                                        signature='sv')
        if self.local_name is not None:
            properties['LocalName'] = dbus.String(self.local_name)
        if self.include_tx_power is not None:
            properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)

        if self.data is not None:
            properties['Data'] = dbus.Dictionary(
                self.data, signature='yv')
        return {LE_ADVERTISEMENT_IFACE: properties}

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service_uuid(self, uuid):
        if not self.service_uuids:
            self.service_uuids = []
        self.service_uuids.append(uuid)

    def add_solicit_uuid(self, uuid):
        if not self.solicit_uuids:
            self.solicit_uuids = []
        self.solicit_uuids.append(uuid)

    def add_manufacturer_data(self, manuf_code, data):
        if not self.manufacturer_data:
            self.manufacturer_data = dbus.Dictionary({}, signature='qv')
        self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')

    def add_service_data(self, uuid, data):
        if not self.service_data:
            self.service_data = dbus.Dictionary({}, signature='sv')
        self.service_data[uuid] = dbus.Array(data, signature='y')

    def add_local_name(self, name):
        if not self.local_name:
            self.local_name = ""
        self.local_name = dbus.String(name)

    def add_data(self, ad_type, data):
        if not self.data:
            self.data = dbus.Dictionary({}, signature='yv')
        self.data[ad_type] = dbus.Array(data, signature='y')

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        print( 'GetAll')
        if interface != LE_ADVERTISEMENT_IFACE:
            raise InvalidArgsException()
        print( 'returning props')
        return self.get_properties()[LE_ADVERTISEMENT_IFACE]

    @dbus.service.method(LE_ADVERTISEMENT_IFACE,
                         in_signature='',
                         out_signature='')
    def Release(self):
        print( '%s: Released!' % self.path)

class TestAdvertisement(Advertisement):

    def __init__(self, bus, index):
        Advertisement.__init__(self, bus, index, 'peripheral')
        self.add_service_uuid('180D')
        self.add_service_uuid('180F')
        self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04])
        self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04])
        self.add_local_name('TestAdvertisement')
        self.include_tx_power = True
        self.add_data(0x26, [0x01, 0x01, 0x00])


def register_ad_cb():
    print( 'Advertisement registered')


def register_ad_error_cb(error):
    print( 'Failed to register advertisement: ' + str(error))
    mainloop.quit()





class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'

class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'

class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'

class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'

class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


class Application(dbus.service.Object):
    """
    org.bluez.GattApplication1 interface implementation
    """
    def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        self.add_service(HeartRateService(bus, 0))
        self.add_service(BatteryService(bus, 1))
        self.add_service(TestService(bus, 2))

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response


class Service(dbus.service.Object):
    """
    org.bluez.GattService1 interface implementation
    """
    PATH_BASE = '/org/bluez/example/service'

    def __init__(self, bus, index, uuid, primary):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_SERVICE_IFACE: {
                        'UUID': self.uuid,
                        'Primary': self.primary,
                        'Characteristics': dbus.Array(
                                self.get_characteristic_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_characteristic(self, characteristic):
        self.characteristics.append(characteristic)

    def get_characteristic_paths(self):
        result = []
        for chrc in self.characteristics:
            result.append(chrc.get_path())
        return result

    def get_characteristics(self):
        return self.characteristics

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_SERVICE_IFACE]


class Characteristic(dbus.service.Object):
    """
    org.bluez.GattCharacteristic1 interface implementation
    """
    def __init__(self, bus, index, uuid, flags, service):
        self.path = service.path + '/char' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.service = service
        self.flags = flags
        self.descriptors = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_CHRC_IFACE: {
                        'Service': self.service.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                        'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_descriptor(self, descriptor):
        self.descriptors.append(descriptor)

    def get_descriptor_paths(self):
        result = []
        for desc in self.descriptors:
            result.append(desc.get_path())
        return result

    def get_descriptors(self):
        return self.descriptors

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_CHRC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_CHRC_IFACE]

    @dbus.service.method(GATT_CHRC_IFACE,
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print('Default ReadValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StartNotify(self):
        print('Default StartNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StopNotify(self):
        print('Default StopNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.signal(DBUS_PROP_IFACE,
                         signature='sa{sv}as')
    def PropertiesChanged(self, interface, changed, invalidated):
        pass


class Descriptor(dbus.service.Object):
    """
    org.bluez.GattDescriptor1 interface implementation
    """
    def __init__(self, bus, index, uuid, flags, characteristic):
        self.path = characteristic.path + '/desc' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_DESC_IFACE: {
                        'Characteristic': self.chrc.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_DESC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_DESC_IFACE]

    @dbus.service.method(GATT_DESC_IFACE,
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print ('Default ReadValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise NotSupportedException()


class HeartRateService(Service):
    """
    Fake Heart Rate Service that simulates a fake heart beat and control point
    behavior.

    """
    HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.HR_UUID, True)
        self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))
        self.add_characteristic(BodySensorLocationChrc(bus, 1, self))
        self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))
        self.energy_expended = 0


class HeartRateMeasurementChrc(Characteristic):
    HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.HR_MSRMT_UUID,
                ['read', 'notify'],
                service)
        self.notifying = False
        self.hr_ee_count = 0
        

    def ReadValue(self, options):
        print('HeartRateMeasurementChrc Read')
        return [dbus.Byte(6), dbus.Byte(99)]


    def hr_msrmt_cb(self):
        value = []
        value.append(dbus.Byte(0x06))

        value.append(dbus.Byte(randint(90, 130)))

        if self.hr_ee_count % 10 == 0:
            value[0] = dbus.Byte(value[0] | 0x08)
            value.append(dbus.Byte(self.service.energy_expended & 0xff))
            value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))

        self.service.energy_expended = \
                min(0xffff, self.service.energy_expended + 1)
        self.hr_ee_count += 1

        print('Updating value: ' + repr(value))

        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])

        return self.notifying

    def _update_hr_msrmt_simulation(self):
        print('Update HR Measurement Simulation')

        if not self.notifying:
            return

        GObject.timeout_add(1000, self.hr_msrmt_cb)

    def StartNotify(self):
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self._update_hr_msrmt_simulation()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False
        self._update_hr_msrmt_simulation()


class BodySensorLocationChrc(Characteristic):
    BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BODY_SNSR_LOC_UUID,
                ['read', 'notify'],
                service)
        self.notifying = False
        self.value = [0x01]

    def ReadValue(self, options):
        # Return 'Chest' as the sensor location.
        return [ 0x02 ]


    def _cb(self):
        value = []

        value.append(dbus.Byte(randint(1, 3)))

        print('Updating value: ' + repr(value))

        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])

        return self.notifying

    def _update_simulation(self):
        print('Update Simulation')

        if not self.notifying:
            return

        GObject.timeout_add(1000, self._cb)

    def StartNotify(self):
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self._update_simulation()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False
        self._update_simulation()


class HeartRateControlPointChrc(Characteristic):
    HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.HR_CTRL_PT_UUID,
                ['write'],
                service)

    def WriteValue(self, value, options):
        print('Heart Rate Control Point WriteValue called')

        if len(value) != 1:
            raise InvalidValueLengthException()

        byte = value[0]
        print('Control Point value: ' + repr(byte))

        if byte != 1:
            raise FailedException("0x80")

        print('Energy Expended field reset!')
        self.service.energy_expended = 0


class BatteryService(Service):
    """
    Fake Battery service that emulates a draining battery.

    """
    BATTERY_UUID = '180f'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.BATTERY_UUID, True)
        self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))


class BatteryLevelCharacteristic(Characteristic):
    """
    Fake Battery Level characteristic. The battery level is drained by 2 points
    every 5 seconds.

    """
    BATTERY_LVL_UUID = '2a19'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BATTERY_LVL_UUID,
                ['read', 'notify'],
                service)
        self.notifying = False
        self.battery_lvl = 100
        GObject.timeout_add(5000, self.drain_battery)

    def notify_battery_level(self):
        if not self.notifying:
            return
        self.PropertiesChanged(
                GATT_CHRC_IFACE,
                { 'Value': [dbus.Byte(self.battery_lvl)] }, [])

    def drain_battery(self):
        if not self.notifying:
            return True
        if self.battery_lvl > 0:
            self.battery_lvl -= 2
            if self.battery_lvl < 0:
                self.battery_lvl = 0
        print('Battery Level drained: ' + repr(self.battery_lvl))
        self.notify_battery_level()
        return True

    def ReadValue(self, options):
        print('Battery Level read: ' + repr(self.battery_lvl))
        return [dbus.Byte(self.battery_lvl)]

    def StartNotify(self):
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self.notify_battery_level()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False


class TestService(Service):
    """
    Dummy test service that provides characteristics and descriptors that
    exercise various API functionality.

    """
    TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)
        self.add_characteristic(TestCharacteristic(bus, 0, self))
        self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))
        self.add_characteristic(TestSecureCharacteristic(bus, 2, self))

class TestCharacteristic(Characteristic):
    """
    Dummy test characteristic. Allows writing arbitrary bytes to its value, and
    contains "extended properties", as well as a test descriptor.

    """
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['read', 'write', 'writable-auxiliaries'],
                service)
        self.value = []
        self.add_descriptor(TestDescriptor(bus, 0, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 1, self))

    def ReadValue(self, options):
        print('TestCharacteristic Read: ' + repr(self.value))
        return self.value

    def WriteValue(self, value, options):
        print('TestCharacteristic Write: ' + repr(value))
        self.value = value


class TestDescriptor(Descriptor):
    """
    Dummy test descriptor. Returns a static value.

    """
    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.TEST_DESC_UUID,
                ['read', 'write'],
                characteristic)

    def ReadValue(self, options):
        return [
                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
        ]


class CharacteristicUserDescriptionDescriptor(Descriptor):
    """
    Writable CUD descriptor.

    """
    CUD_UUID = '2901'

    def __init__(self, bus, index, characteristic):
        self.writable = 'writable-auxiliaries' in characteristic.flags
        self.value = array.array('B', b'This is a characteristic for testing')
        self.value = self.value.tolist()
        Descriptor.__init__(
                self, bus, index,
                self.CUD_UUID,
                ['read', 'write'],
                characteristic)

    def ReadValue(self, options):
        return self.value

    def WriteValue(self, value, options):
        if not self.writable:
            raise NotPermittedException()
        self.value = value

class TestEncryptCharacteristic(Characteristic):
    """
    Dummy test characteristic requiring encryption.

    """
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['encrypt-read', 'encrypt-write'],
                service)
        self.value = []
        self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 3, self))

    def ReadValue(self, options):
        print('TestEncryptCharacteristic Read: ' + repr(self.value))
        return self.value

    def WriteValue(self, value, options):
        print('TestEncryptCharacteristic Write: ' + repr(value))
        self.value = value

class TestEncryptDescriptor(Descriptor):
    """
    Dummy test descriptor requiring encryption. Returns a static value.

    """
    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.TEST_DESC_UUID,
                ['encrypt-read', 'encrypt-write'],
                characteristic)

    def ReadValue(self, options):
        return [
                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
        ]


class TestSecureCharacteristic(Characteristic):
    """
    Dummy test characteristic requiring secure connection.

    """
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['secure-read', 'secure-write'],
                service)
        self.value = []
        self.add_descriptor(TestSecureDescriptor(bus, 2, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 3, self))

    def ReadValue(self, options):
        print('TestSecureCharacteristic Read: ' + repr(self.value))
        return self.value

    def WriteValue(self, value, options):
        print('TestSecureCharacteristic Write: ' + repr(value))
        self.value = value


class TestSecureDescriptor(Descriptor):
    """
    Dummy test descriptor requiring secure connection. Returns a static value.

    """
    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.TEST_DESC_UUID,
                ['secure-read', 'secure-write'],
                characteristic)

    def ReadValue(self, options):
        return [
                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
        ]

def register_app_cb():
    print('GATT application registered')


def register_app_error_cb(error):
    print('Failed to register application: ' + str(error))
    mainloop.quit()


def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()

    for o, props in objects.items():
        if GATT_MANAGER_IFACE in props.keys():
            return o

    return None

def main():
    global mainloop

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    adapter = find_adapter(bus)
    if not adapter:
        print('GattManager1 interface not found')
        return

    service_manager = dbus.Interface(
            bus.get_object(BLUEZ_SERVICE_NAME, adapter),
            GATT_MANAGER_IFACE)

    app = Application(bus)

    mainloop = GObject.MainLoop()

    print('Registering GATT application...')

    service_manager.RegisterApplication(app.get_path(), {},
                                    reply_handler=register_app_cb,
                                    error_handler=register_app_error_cb)


    # Advert
    if not adapter:
        print( 'LEAdvertisingManager1 interface not found')
        return

    adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                   "org.freedesktop.DBus.Properties");

    adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                LE_ADVERTISING_MANAGER_IFACE)

    test_advertisement = TestAdvertisement(bus, 0)

    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)

    mainloop.run()

if __name__ == '__main__':
    main()

Attachment: btmon_notify.log
Description: Binary data

Attachment: btmon_read.log
Description: Binary data


[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux