Re: [PATCH] test: Add Python GATT client example

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi,

> On Wed, Apr 1, 2015 at 5:41 PM, Arman Uguray <armansito@xxxxxxxxxxxx> wrote:
> This patch introduces test/example-gatt-client which implements a simple
> D-Bus client application for a remote Heart Rate service.
> ---
>  test/example-gatt-client | 218 +++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 218 insertions(+)
>  create mode 100755 test/example-gatt-client
>
> diff --git a/test/example-gatt-client b/test/example-gatt-client
> new file mode 100755
> index 0000000..329df35
> --- /dev/null
> +++ b/test/example-gatt-client
> @@ -0,0 +1,218 @@
> +#!/usr/bin/python
> +
> +import argparse
> +import dbus
> +import gobject
> +import sys
> +
> +from dbus.mainloop.glib import DBusGMainLoop
> +
> +bus = None
> +mainloop = None
> +
> +BLUEZ_SERVICE_NAME = 'org.bluez'
> +DBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'
> +DBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'
> +
> +GATT_SERVICE_IFACE = 'org.bluez.GattService1'
> +GATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'
> +
> +HR_SVC_UUID =        '0000180d-0000-1000-8000-00805f9b34fb'
> +HR_MSRMT_UUID =      '00002a37-0000-1000-8000-00805f9b34fb'
> +BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
> +HR_CTRL_PT_UUID =    '00002a39-0000-1000-8000-00805f9b34fb'
> +
> +# The objects that we interact with.
> +hr_service = None
> +hr_msrmt_chrc = None
> +body_snsr_loc_chrc = None
> +hr_ctrl_pt_chrc = None
> +
> +
> +def generic_error_cb(error):
> +    print('D-Bus call failed: ' + str(error))
> +    mainloop.quit()
> +
> +
> +def body_sensor_val_to_str(val):
> +    if val == 0:
> +        return 'Other'
> +    if val == 1:
> +        return 'Chest'
> +    if val == 2:
> +        return 'Wrist'
> +    if val == 3:
> +        return 'Finger'
> +    if val == 4:
> +        return 'Hand'
> +    if val == 5:
> +        return 'Ear Lobe'
> +    if val == 6:
> +        return 'Foot'
> +
> +    return 'Reserved value'
> +
> +
> +def sensor_contact_val_to_str(val):
> +    if val == 0 or val == 1:
> +        return 'not supported'
> +    if val == 2:
> +        return 'no contact detected'
> +    if val == 3:
> +        return 'contact detected'
> +
> +    return 'invalid value'
> +
> +
> +def body_sensor_val_cb(value):
> +    if len(value) != 1:
> +        print('Invalid body sensor location value: ' + repr(value))
> +        return
> +
> +    print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
> +
> +
> +def hr_msrmt_start_notify_cb():
> +    print('HR Measurement notifications enabled')
> +
> +
> +def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
> +    if iface != GATT_CHRC_IFACE:
> +        return
> +
> +    if not len(changed_props):
> +        return
> +
> +    value = changed_props.get('Value', None)
> +    if not value:
> +        return
> +
> +    print('New HR Measurement')
> +
> +    flags = value[0]
> +    value_format = flags & 0x01
> +    sc_status = (flags >> 1) & 0x03
> +    ee_status = flags & 0x08
> +
> +    if value_format == 0x00:
> +        hr_msrmt = value[1]
> +        next_ind = 2
> +    else:
> +        hr_msrmt = value[1] | (value[2] << 8)
> +        next_ind = 3
> +
> +    print('\tHR: ' + str(int(hr_msrmt)))
> +    print('\tSensor Contact status: ' +
> +          sensor_contact_val_to_str(sc_status))
> +
> +    if ee_status:
> +        print('\tEnergy Expended: ' + str(int(value[next_ind])))
> +
> +
> +def start_client():
> +    # Read the Body Sensor Location value and print it asynchronously.
> +    body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
> +                                    error_handler=generic_error_cb,
> +                                    dbus_interface=GATT_CHRC_IFACE)
> +
> +    # Listen to PropertiesChanged signals from the Heart Measurement
> +    # Characteristic.
> +    hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
> +    hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
> +                                          hr_msrmt_changed_cb)
> +
> +    # Subscribe to Heart Rate Measurement notifications.
> +    hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
> +                                 error_handler=generic_error_cb,
> +                                 dbus_interface=GATT_CHRC_IFACE)
> +
> +
> +def process_chrc(chrc_path):
> +    chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
> +    chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
> +                             dbus_interface=DBUS_PROP_IFACE)
> +
> +    uuid = chrc_props['UUID']
> +
> +    if uuid == HR_MSRMT_UUID:
> +        global hr_msrmt_chrc
> +        hr_msrmt_chrc = (chrc, chrc_props)
> +    elif uuid == BODY_SNSR_LOC_UUID:
> +        global body_snsr_loc_chrc
> +        body_snsr_loc_chrc = (chrc, chrc_props)
> +    elif uuid == HR_CTRL_PT_UUID:
> +        global hr_ctrl_pt_chrc
> +        hr_ctrl_pt_chrc = (chrc, chrc_props)
> +    else:
> +        print('Unrecognized characteristic: ' + uuid)
> +
> +    return True
> +
> +
> +def process_hr_service(service_path):
> +    service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
> +    service_props = service.GetAll(GATT_SERVICE_IFACE,
> +                                   dbus_interface=DBUS_PROP_IFACE)
> +
> +    uuid = service_props['UUID']
> +
> +    if uuid != HR_SVC_UUID:
> +        print('Service is not a Heart Rate Service: ' + uuid)
> +        return False
> +
> +    # Process the characteristics.
> +    chrc_paths = service_props['Characteristics']
> +    for chrc_path in chrc_paths:
> +        process_chrc(chrc_path)
> +
> +    global hr_service
> +    hr_service = (service, service_props, service_path)
> +
> +    return True
> +
> +
> +def interfaces_removed_cb(object_path, interfaces):
> +    if not hr_service:
> +        return
> +
> +    if object_path == hr_service[2]:
> +        print('Service was removed')
> +        mainloop.quit()
> +
> +
> +def main():
> +    # Prase the service path from the arguments.
> +    parser = argparse.ArgumentParser(
> +            description='D-Bus Heart Rate Service client example')
> +    parser.add_argument('service_path', metavar='<service-path>',
> +                        type=dbus.ObjectPath, nargs=1,
> +                        help='GATT service object path')
> +    args = parser.parse_args()
> +    service_path = args.service_path[0]
> +
> +    # Set up the main loop.
> +    DBusGMainLoop(set_as_default=True)
> +    global bus
> +    bus = dbus.SystemBus()
> +    global mainloop
> +    mainloop = gobject.MainLoop()
> +
> +    om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
> +    om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> +
> +    try:
> +        if not process_hr_service(service_path):
> +            sys.exit(1)
> +    except dbus.DBusException as e:
> +        print e.message
> +        sys.exit(1)
> +
> +    print 'Heart Rate Service ready'
> +
> +    start_client()
> +
> +    mainloop.run()
> +
> +
> +if __name__ == '__main__':
> +    main()
> --
> 2.2.0.rc0.207.ga3a616c
>

Pushed.

Thanks,
Arman
--
To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux