The python test app to test working of the Advertisement Monitor API. This app: - registers itself with bluez by invoking the RegisterMonitor with the app root path - exposes multiple monitor objects with both valid and invalid monitor parameters - implements Activate/Release/DeviceFound/DeviceLost methods on the monitor object Signed-off-by: Manish Mandlik <mmandlik@xxxxxxxxxx> Reviewed-by: apusaka@xxxxxxxxxxxx Reviewed-by: howardchung@xxxxxxxxxx Reviewed-by: mcchou@xxxxxxxxxxxx --- test/example-adv-monitor | 402 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 402 insertions(+) create mode 100644 test/example-adv-monitor diff --git a/test/example-adv-monitor b/test/example-adv-monitor new file mode 100644 index 000000000..6fe8a3058 --- /dev/null +++ b/test/example-adv-monitor @@ -0,0 +1,402 @@ +#!/usr/bin/python +# SPDX-License-Identifier: LGPL-2.1-or-later + +import argparse +import dbus +import dbus.mainloop.glib +import dbus.service +import json +import time + +from threading import Thread + +try: + from gi.repository import GObject # python3 +except ImportError: + import gobject as GObject # python2 + +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +BLUEZ_SERVICE_NAME = 'org.bluez' + +ADV_MONITOR_MANAGER_IFACE = 'org.bluez.AdvertisementMonitorManager1' +ADV_MONITOR_IFACE = 'org.bluez.AdvertisementMonitor1' +ADV_MONITOR_APP_BASE_PATH = '/org/bluez/example/adv_monitor_app' + + +class AdvMonitor(dbus.service.Object): + + # Indexes of the Monitor object parameters in a monitor data list. + MONITOR_TYPE = 0 + RSSI_FILTER = 1 + PATTERNS = 2 + + # Indexes of the RSSI filter parameters in a monitor data list. + RSSI_H_THRESH = 0 + RSSI_H_TIMEOUT = 1 + RSSI_L_THRESH = 2 + RSSI_L_TIMEOUT = 3 + + # Indexes of the Patterns filter parameters in a monitor data list. + PATTERN_START_POS = 0 + PATTERN_AD_TYPE = 1 + PATTERN_DATA = 2 + + def __init__(self, bus, app_path, monitor_id, monitor_data): + self.path = app_path + '/monitor' + str(monitor_id) + self.bus = bus + + self._set_type(monitor_data[self.MONITOR_TYPE]) + self._set_rssi(monitor_data[self.RSSI_FILTER]) + self._set_patterns(monitor_data[self.PATTERNS]) + + super(AdvMonitor, self).__init__(self.bus, self.path) + + + def get_path(self): + return dbus.ObjectPath(self.path) + + + def get_properties(self): + properties = dict() + properties['Type'] = dbus.String(self.monitor_type) + properties['RSSIThresholdsAndTimers'] = dbus.Struct(self.rssi, + signature='nqnq') + properties['Patterns'] = dbus.Array(self.patterns, signature='(yyay)') + return {ADV_MONITOR_IFACE: properties} + + + def _set_type(self, monitor_type): + self.monitor_type = monitor_type + + + def _set_rssi(self, rssi): + h_thresh = dbus.Int16(rssi[self.RSSI_H_THRESH]) + h_timeout = dbus.UInt16(rssi[self.RSSI_H_TIMEOUT]) + l_thresh = dbus.Int16(rssi[self.RSSI_L_THRESH]) + l_timeout = dbus.UInt16(rssi[self.RSSI_L_TIMEOUT]) + self.rssi = (h_thresh, h_timeout, l_thresh, l_timeout) + + + def _set_patterns(self, patterns): + self.patterns = [] + for pattern in patterns: + start_pos = dbus.Byte(pattern[self.PATTERN_START_POS]) + ad_type = dbus.Byte(pattern[self.PATTERN_AD_TYPE]) + ad_data = [] + for byte in pattern[self.PATTERN_DATA]: + ad_data.append(dbus.Byte(byte)) + adv_pattern = dbus.Struct((start_pos, ad_type, ad_data), + signature='yyay') + self.patterns.append(adv_pattern) + + + def remove_monitor(self): + self.remove_from_connection() + + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + print('{}: {} GetAll'.format(self.path, interface)) + if interface != ADV_MONITOR_IFACE: + print('{}: GetAll: Invalid arg {}'.format(self.path, interface)) + return {} + + return self.get_properties()[ADV_MONITOR_IFACE] + + + @dbus.service.method(ADV_MONITOR_IFACE, + in_signature='', + out_signature='') + def Activate(self): + print('{}: Monitor Activated'.format(self.path)) + + + @dbus.service.method(ADV_MONITOR_IFACE, + in_signature='', + out_signature='') + def Release(self): + print('{}: Monitor Released'.format(self.path)) + + + @dbus.service.method(ADV_MONITOR_IFACE, + in_signature='o', + out_signature='') + def DeviceFound(self, device): + print('{}: {} Device Found'.format(self.path, device)) + + + @dbus.service.method(ADV_MONITOR_IFACE, + in_signature='o', + out_signature='') + def DeviceLost(self, device): + print('{}: {} Device Lost'.format(self.path, device)) + + +class AdvMonitorApp(dbus.service.Object): + + def __init__(self, bus, advmon_manager, app_id): + self.bus = bus + self.advmon_mgr = advmon_manager + self.app_path = ADV_MONITOR_APP_BASE_PATH + str(app_id) + + self.monitors = dict() + + super(AdvMonitorApp, self).__init__(self.bus, self.app_path) + + + def get_app_path(self): + return dbus.ObjectPath(self.app_path) + + + def add_monitor(self, monitor_data): + monitor_id = 0 + while monitor_id in self.monitors: + monitor_id += 1 + + monitor = AdvMonitor(self.bus, self.app_path, monitor_id, monitor_data) + + # Emit the InterfacesAdded signal once the Monitor object is created. + self.InterfacesAdded(monitor.get_path(), monitor.get_properties()) + + self.monitors[monitor_id] = monitor + + return monitor_id + + + def remove_monitor(self, monitor_id): + monitor = self.monitors.pop(monitor_id, None) + if not monitor: + return False + + # Emit the InterfacesRemoved signal before removing the Monitor object. + self.InterfacesRemoved(monitor.get_path(), + monitor.get_properties().keys()) + + monitor.remove_monitor() + + return True + + + def register_app(self): + self.register_successful = None + + def register_cb(): + print('{}: RegisterMonitor successful'.format(self.app_path)) + self.register_successful = True + + def register_error_cb(error): + print('{}: RegisterMonitor failed: {}'.format(self.app_path, + str(error))) + self.register_successful = False + + self.advmon_mgr.RegisterMonitor(self.get_app_path(), + reply_handler=register_cb, + error_handler=register_error_cb) + + # Wait for the reply. + while self.register_successful is None: + pass + + return self.register_successful + + + def unregister_app(self): + self.unregister_successful = None + + def unregister_cb(): + print('{}: UnregisterMonitor successful'.format(self.app_path)) + self.unregister_successful = True + + def unregister_error_cb(error): + print('{}: UnregisterMonitor failed: {}'.format(self.app_path, + str(error))) + self.unregister_successful = False + + self.advmon_mgr.UnregisterMonitor(self.get_app_path(), + reply_handler=unregister_cb, + error_handler=unregister_error_cb) + + # Wait for the reply. + while self.unregister_successful is None: + pass + + return self.unregister_successful + + + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') + def GetManagedObjects(self): + print('{}: GetManagedObjects'.format(self.app_path)) + objects = dict() + for monitor_id in self.monitors: + monitor = self.monitors[monitor_id] + objects[monitor.get_path()] = monitor.get_properties() + + return objects + + + @dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}') + def InterfacesAdded(self, object_path, interfaces_and_properties): + # Invoking this method emits the InterfacesAdded signal, + # nothing needs to be done here. + return + + + @dbus.service.signal(DBUS_OM_IFACE, signature='oas') + def InterfacesRemoved(self, object_path, interfaces): + # Invoking this method emits the InterfacesRemoved signal, + # nothing needs to be done here. + return + + +def read_adapter_supported_monitor_types(adapter_props): + types = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE, + 'SupportedMonitorTypes', + dbus_interface=DBUS_PROP_IFACE)) + return json.loads(types) + + +def read_adapter_supported_monitor_features(adapter_props): + features = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE, + 'SupportedFeatures', + dbus_interface=DBUS_PROP_IFACE)) + return json.loads(features) + + +def print_supported_types_and_features(adapter_props): + supported_types = read_adapter_supported_monitor_types(adapter_props) + for supported_type in supported_types: + print(supported_type) + + supported_features = read_adapter_supported_monitor_features(adapter_props) + for supported_feature in supported_features: + print(supported_feature) + + +def find_advmon_mgr(bus, adapter): + return dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + ADV_MONITOR_MANAGER_IFACE) + + +def find_adapter(bus): + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), + DBUS_OM_IFACE) + objects = remote_om.GetManagedObjects() + + adapter = None + adapter_props = None + + for o, props in objects.items(): + if ADV_MONITOR_MANAGER_IFACE in props: + adapter = o + break + + if adapter: + # Turn on the bluetooth adapter. + adapter_props = dbus.Interface( + bus.get_object(BLUEZ_SERVICE_NAME, adapter), + DBUS_PROP_IFACE) + adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1)) + + return adapter, adapter_props + + +def test(bus, mainloop, advmon_mgr, app_id): + # Create an App instance. + app = AdvMonitorApp(bus, advmon_mgr, app_id) + + # Create two monitor objects before registering the app. No Activate() or + # Release() should get called yet as the app is not registered. + data0 = [ + 'invalid_patterns', + [-50, 1, -70, 1], + [[0, 0x03, [0x12, 0x18]]] # Service Class UUID is 0x1812 (HOG) + ] + data1 = [ + 'or_patterns', + [127, 0, 127, 0], + [[5, 0x09, [ord('_')]]] # 5th character of the Local Name is '_' + ] + monitor0 = app.add_monitor(data0) + monitor1 = app.add_monitor(data1) + + # Register the app root path to expose advertisement monitors. + # Release() should get called on monitor0 - incorrect monitor type. + # Activate() should get called on monitor1. + ret = app.register_app() + if not ret: + print('RegisterMonitor failed.') + mainloop.quit() + exit(-1) + + # Create two more monitor objects. + # Release() should get called on monitor2 - incorrect RSSI Filter values. + # Activate() should get called on monitor3. + data2 = [ + 'or_patterns', + [-50, 1, -30, 1], + [[0, 0x19, [0xC2, 0x03]]] # Appearance is 0xC203 (Mouse) + ] + data3 = [ + 'or_patterns', + [-50, 1, -70, 1], + [[0, 0x03, [0x12, 0x18]], [0, 0x19, [0xC2, 0x03]]] + ] + monitor2 = app.add_monitor(data2) + monitor3 = app.add_monitor(data3) + + # Run until user hits the 'Enter' key. If any peer device is advertising + # during this time, DeviceFound() should get triggered for monitors + # matching the advertisements. + raw_input('Press "Enter" key to quit...\n') + + # Remove a monitor. DeviceFound() for this monitor should not get + # triggered any more. + app.remove_monitor(monitor1) + + # Unregister the app. Release() should get invoked on active monitors, + # monitor3 in this case. + app.unregister_app() + + mainloop.quit() + + +def main(app_id): + # Initialize threads in gobject/dbus-glib before creating local threads. + GObject.threads_init() + dbus.mainloop.glib.threads_init() + + # Arrange for the GLib main loop to be the default. + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + mainloop = GObject.MainLoop() + + # Find bluetooth adapter and power it on. + adapter, adapter_props = find_adapter(bus) + if not adapter or not adapter_props: + print('Bluetooth adapter not found.') + exit(-1) + + # Read supported types and find AdvertisementMonitorManager1 interface. + print_supported_types_and_features(adapter_props) + advmon_mgr = find_advmon_mgr(bus, adapter) + if not advmon_mgr : + print('AdvertisementMonitorManager1 interface not found.') + exit(-1) + + Thread(target=test, args=(bus, mainloop, advmon_mgr, app_id)).start() + + mainloop.run() # blocks until mainloop.quit() is called + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--app_id', default=0, type=int, help='use this App-ID ' + 'for creating dbus objects (default: 0)') + args = parser.parse_args() + + main(args.app_id) -- 2.29.2.576.ga3fc446d84-goog