[bluez PATCH v1] test: add test app for Advertisement Monitor API

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The python test app to test working of the Advertisement Monitor API.

This app:
- registers itself with bluez by invoking the RegisterMonitor with the
  app root path
- exposes multiple monitor objects with both valid and invalid monitor
  parameters
- implements Activate/Release/DeviceFound/DeviceLost methods on the
  monitor object

Signed-off-by: Manish Mandlik <mmandlik@xxxxxxxxxx>
Reviewed-by: apusaka@xxxxxxxxxxxx
Reviewed-by: howardchung@xxxxxxxxxx
Reviewed-by: mcchou@xxxxxxxxxxxx
---

 test/example-adv-monitor | 402 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 402 insertions(+)
 create mode 100644 test/example-adv-monitor

diff --git a/test/example-adv-monitor b/test/example-adv-monitor
new file mode 100644
index 000000000..6fe8a3058
--- /dev/null
+++ b/test/example-adv-monitor
@@ -0,0 +1,402 @@
+#!/usr/bin/python
+# SPDX-License-Identifier: LGPL-2.1-or-later
+
+import argparse
+import dbus
+import dbus.mainloop.glib
+import dbus.service
+import json
+import time
+
+from threading import Thread
+
+try:
+    from gi.repository import GObject  # python3
+except ImportError:
+    import gobject as GObject  # python2
+
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+
+BLUEZ_SERVICE_NAME = 'org.bluez'
+
+ADV_MONITOR_MANAGER_IFACE = 'org.bluez.AdvertisementMonitorManager1'
+ADV_MONITOR_IFACE = 'org.bluez.AdvertisementMonitor1'
+ADV_MONITOR_APP_BASE_PATH = '/org/bluez/example/adv_monitor_app'
+
+
+class AdvMonitor(dbus.service.Object):
+
+    # Indexes of the Monitor object parameters in a monitor data list.
+    MONITOR_TYPE = 0
+    RSSI_FILTER = 1
+    PATTERNS = 2
+
+    # Indexes of the RSSI filter parameters in a monitor data list.
+    RSSI_H_THRESH = 0
+    RSSI_H_TIMEOUT = 1
+    RSSI_L_THRESH = 2
+    RSSI_L_TIMEOUT = 3
+
+    # Indexes of the Patterns filter parameters in a monitor data list.
+    PATTERN_START_POS = 0
+    PATTERN_AD_TYPE = 1
+    PATTERN_DATA = 2
+
+    def __init__(self, bus, app_path, monitor_id, monitor_data):
+        self.path = app_path + '/monitor' + str(monitor_id)
+        self.bus = bus
+
+        self._set_type(monitor_data[self.MONITOR_TYPE])
+        self._set_rssi(monitor_data[self.RSSI_FILTER])
+        self._set_patterns(monitor_data[self.PATTERNS])
+
+        super(AdvMonitor, self).__init__(self.bus, self.path)
+
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+
+    def get_properties(self):
+        properties = dict()
+        properties['Type'] = dbus.String(self.monitor_type)
+        properties['RSSIThresholdsAndTimers'] = dbus.Struct(self.rssi,
+                                                            signature='nqnq')
+        properties['Patterns'] = dbus.Array(self.patterns, signature='(yyay)')
+        return {ADV_MONITOR_IFACE: properties}
+
+
+    def _set_type(self, monitor_type):
+        self.monitor_type = monitor_type
+
+
+    def _set_rssi(self, rssi):
+        h_thresh = dbus.Int16(rssi[self.RSSI_H_THRESH])
+        h_timeout = dbus.UInt16(rssi[self.RSSI_H_TIMEOUT])
+        l_thresh = dbus.Int16(rssi[self.RSSI_L_THRESH])
+        l_timeout = dbus.UInt16(rssi[self.RSSI_L_TIMEOUT])
+        self.rssi = (h_thresh, h_timeout, l_thresh, l_timeout)
+
+
+    def _set_patterns(self, patterns):
+        self.patterns = []
+        for pattern in patterns:
+            start_pos = dbus.Byte(pattern[self.PATTERN_START_POS])
+            ad_type = dbus.Byte(pattern[self.PATTERN_AD_TYPE])
+            ad_data = []
+            for byte in pattern[self.PATTERN_DATA]:
+                ad_data.append(dbus.Byte(byte))
+            adv_pattern = dbus.Struct((start_pos, ad_type, ad_data),
+                                      signature='yyay')
+            self.patterns.append(adv_pattern)
+
+
+    def remove_monitor(self):
+        self.remove_from_connection()
+
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        print('{}: {} GetAll'.format(self.path, interface))
+        if interface != ADV_MONITOR_IFACE:
+            print('{}: GetAll: Invalid arg {}'.format(self.path, interface))
+            return {}
+
+        return self.get_properties()[ADV_MONITOR_IFACE]
+
+
+    @dbus.service.method(ADV_MONITOR_IFACE,
+                         in_signature='',
+                         out_signature='')
+    def Activate(self):
+        print('{}: Monitor Activated'.format(self.path))
+
+
+    @dbus.service.method(ADV_MONITOR_IFACE,
+                         in_signature='',
+                         out_signature='')
+    def Release(self):
+        print('{}: Monitor Released'.format(self.path))
+
+
+    @dbus.service.method(ADV_MONITOR_IFACE,
+                         in_signature='o',
+                         out_signature='')
+    def DeviceFound(self, device):
+        print('{}: {} Device Found'.format(self.path, device))
+
+
+    @dbus.service.method(ADV_MONITOR_IFACE,
+                         in_signature='o',
+                         out_signature='')
+    def DeviceLost(self, device):
+        print('{}: {} Device Lost'.format(self.path, device))
+
+
+class AdvMonitorApp(dbus.service.Object):
+
+    def __init__(self, bus, advmon_manager, app_id):
+        self.bus = bus
+        self.advmon_mgr = advmon_manager
+        self.app_path = ADV_MONITOR_APP_BASE_PATH + str(app_id)
+
+        self.monitors = dict()
+
+        super(AdvMonitorApp, self).__init__(self.bus, self.app_path)
+
+
+    def get_app_path(self):
+        return dbus.ObjectPath(self.app_path)
+
+
+    def add_monitor(self, monitor_data):
+        monitor_id = 0
+        while monitor_id in self.monitors:
+            monitor_id += 1
+
+        monitor = AdvMonitor(self.bus, self.app_path, monitor_id, monitor_data)
+
+        # Emit the InterfacesAdded signal once the Monitor object is created.
+        self.InterfacesAdded(monitor.get_path(), monitor.get_properties())
+
+        self.monitors[monitor_id] = monitor
+
+        return monitor_id
+
+
+    def remove_monitor(self, monitor_id):
+        monitor = self.monitors.pop(monitor_id, None)
+        if not monitor:
+            return False
+
+        # Emit the InterfacesRemoved signal before removing the Monitor object.
+        self.InterfacesRemoved(monitor.get_path(),
+                               monitor.get_properties().keys())
+
+        monitor.remove_monitor()
+
+        return True
+
+
+    def register_app(self):
+        self.register_successful = None
+
+        def register_cb():
+            print('{}: RegisterMonitor successful'.format(self.app_path))
+            self.register_successful = True
+
+        def register_error_cb(error):
+            print('{}: RegisterMonitor failed: {}'.format(self.app_path,
+                                                          str(error)))
+            self.register_successful = False
+
+        self.advmon_mgr.RegisterMonitor(self.get_app_path(),
+                                        reply_handler=register_cb,
+                                        error_handler=register_error_cb)
+
+        # Wait for the reply.
+        while self.register_successful is None:
+            pass
+
+        return self.register_successful
+
+
+    def unregister_app(self):
+        self.unregister_successful = None
+
+        def unregister_cb():
+            print('{}: UnregisterMonitor successful'.format(self.app_path))
+            self.unregister_successful = True
+
+        def unregister_error_cb(error):
+            print('{}: UnregisterMonitor failed: {}'.format(self.app_path,
+                                                            str(error)))
+            self.unregister_successful = False
+
+        self.advmon_mgr.UnregisterMonitor(self.get_app_path(),
+                                          reply_handler=unregister_cb,
+                                          error_handler=unregister_error_cb)
+
+        # Wait for the reply.
+        while self.unregister_successful is None:
+            pass
+
+        return self.unregister_successful
+
+
+    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+    def GetManagedObjects(self):
+        print('{}: GetManagedObjects'.format(self.app_path))
+        objects = dict()
+        for monitor_id in self.monitors:
+            monitor = self.monitors[monitor_id]
+            objects[monitor.get_path()] = monitor.get_properties()
+
+        return objects
+
+
+    @dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}')
+    def InterfacesAdded(self, object_path, interfaces_and_properties):
+        # Invoking this method emits the InterfacesAdded signal,
+        # nothing needs to be done here.
+        return
+
+
+    @dbus.service.signal(DBUS_OM_IFACE, signature='oas')
+    def InterfacesRemoved(self, object_path, interfaces):
+        # Invoking this method emits the InterfacesRemoved signal,
+        # nothing needs to be done here.
+        return
+
+
+def read_adapter_supported_monitor_types(adapter_props):
+    types = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE,
+                       'SupportedMonitorTypes',
+                       dbus_interface=DBUS_PROP_IFACE))
+    return json.loads(types)
+
+
+def read_adapter_supported_monitor_features(adapter_props):
+    features = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE,
+                          'SupportedFeatures',
+                          dbus_interface=DBUS_PROP_IFACE))
+    return json.loads(features)
+
+
+def print_supported_types_and_features(adapter_props):
+    supported_types = read_adapter_supported_monitor_types(adapter_props)
+    for supported_type in supported_types:
+        print(supported_type)
+
+    supported_features = read_adapter_supported_monitor_features(adapter_props)
+    for supported_feature in supported_features:
+        print(supported_feature)
+
+
+def find_advmon_mgr(bus, adapter):
+    return dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+                          ADV_MONITOR_MANAGER_IFACE)
+
+
+def find_adapter(bus):
+    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
+                                DBUS_OM_IFACE)
+    objects = remote_om.GetManagedObjects()
+
+    adapter = None
+    adapter_props = None
+
+    for o, props in objects.items():
+        if ADV_MONITOR_MANAGER_IFACE in props:
+            adapter = o
+            break
+
+    if adapter:
+        # Turn on the bluetooth adapter.
+        adapter_props = dbus.Interface(
+                                bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+                                DBUS_PROP_IFACE)
+        adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
+
+    return adapter, adapter_props
+
+
+def test(bus, mainloop, advmon_mgr, app_id):
+    # Create an App instance.
+    app = AdvMonitorApp(bus, advmon_mgr, app_id)
+
+    # Create two monitor objects before registering the app. No Activate() or
+    # Release() should get called yet as the app is not registered.
+    data0 = [
+        'invalid_patterns',
+        [-50, 1, -70, 1],
+        [[0, 0x03, [0x12, 0x18]]] # Service Class UUID is 0x1812 (HOG)
+    ]
+    data1 = [
+        'or_patterns',
+        [127, 0, 127, 0],
+        [[5, 0x09, [ord('_')]]] # 5th character of the Local Name is '_'
+    ]
+    monitor0 = app.add_monitor(data0)
+    monitor1 = app.add_monitor(data1)
+
+    # Register the app root path to expose advertisement monitors.
+    # Release() should get called on monitor0 - incorrect monitor type.
+    # Activate() should get called on monitor1.
+    ret = app.register_app()
+    if not ret:
+        print('RegisterMonitor failed.')
+        mainloop.quit()
+        exit(-1)
+
+    # Create two more monitor objects.
+    # Release() should get called on monitor2 - incorrect RSSI Filter values.
+    # Activate() should get called on monitor3.
+    data2 = [
+        'or_patterns',
+        [-50, 1, -30, 1],
+        [[0, 0x19, [0xC2, 0x03]]] # Appearance is 0xC203 (Mouse)
+    ]
+    data3 = [
+        'or_patterns',
+        [-50, 1, -70, 1],
+        [[0, 0x03, [0x12, 0x18]], [0, 0x19, [0xC2, 0x03]]]
+    ]
+    monitor2 = app.add_monitor(data2)
+    monitor3 = app.add_monitor(data3)
+
+    # Run until user hits the 'Enter' key. If any peer device is advertising
+    # during this time, DeviceFound() should get triggered for monitors
+    # matching the advertisements.
+    raw_input('Press "Enter" key to quit...\n')
+
+    # Remove a monitor. DeviceFound() for this monitor should not get
+    # triggered any more.
+    app.remove_monitor(monitor1)
+
+    # Unregister the app. Release() should get invoked on active monitors,
+    # monitor3 in this case.
+    app.unregister_app()
+
+    mainloop.quit()
+
+
+def main(app_id):
+    # Initialize threads in gobject/dbus-glib before creating local threads.
+    GObject.threads_init()
+    dbus.mainloop.glib.threads_init()
+
+    # Arrange for the GLib main loop to be the default.
+    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+    bus = dbus.SystemBus()
+    mainloop = GObject.MainLoop()
+
+    # Find bluetooth adapter and power it on.
+    adapter, adapter_props = find_adapter(bus)
+    if not adapter or not adapter_props:
+        print('Bluetooth adapter not found.')
+        exit(-1)
+
+    # Read supported types and find AdvertisementMonitorManager1 interface.
+    print_supported_types_and_features(adapter_props)
+    advmon_mgr = find_advmon_mgr(bus, adapter)
+    if not advmon_mgr :
+        print('AdvertisementMonitorManager1 interface not found.')
+        exit(-1)
+
+    Thread(target=test, args=(bus, mainloop, advmon_mgr, app_id)).start()
+
+    mainloop.run() # blocks until mainloop.quit() is called
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--app_id', default=0, type=int, help='use this App-ID '
+                        'for creating dbus objects (default: 0)')
+    args = parser.parse_args()
+
+    main(args.app_id)
-- 
2.29.2.576.ga3fc446d84-goog




[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux