From: Inga Stotland <inga.stotland@xxxxxxxxx> --- test/test-join | 408 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 408 insertions(+) create mode 100644 test/test-join diff --git a/test/test-join b/test/test-join new file mode 100644 index 000000000..cdf92a2f1 --- /dev/null +++ b/test/test-join @@ -0,0 +1,408 @@ +#!/usr/bin/env python3 + +import sys +import struct +import numpy +import dbus +import dbus.service +import dbus.exceptions + +from threading import Timer +import time + +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject +from dbus.mainloop.glib import DBusGMainLoop + +import agent + +MESH_SERVICE_NAME = 'org.bluez.mesh' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' + +MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1' +MESH_NODE_IFACE = 'org.bluez.mesh.Node1' +MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1' +MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1' + +APP_COMPANY_ID = 0x05f1 +APP_PRODUCT_ID = 0x0001 +APP_VERSION_ID = 0x0001 + +VENDOR_ID_NONE = 0xffff + +mesh_net = None +app = None +bus = None +mainloop = None +node = None + +token = None + +def generic_error_cb(error): + print('D-Bus call failed: ' + str(error)) + +def generic_reply_cb(): + print('D-Bus call done') + +def unwrap(item): + if isinstance(item, dbus.Boolean): + return bool(item) + if isinstance(item, (dbus.UInt16, dbus.Int16, + dbus.UInt32, dbus.Int32, + dbus.UInt64, dbus.Int64)): + return int(item) + if isinstance(item, dbus.Byte): + return bytes([int(item)]) + if isinstance(item, dbus.String): + return item + if isinstance(item, (dbus.Array, list, tuple)): + return [unwrap(x) for x in item] + if isinstance(item, (dbus.Dictionary, dict)): + return dict([(unwrap(x), unwrap(y)) for x, y in item.items()]) + + print('Dictionary item not handled') + print(type(item)) + return item + +def join_cb(): + print('Join procedure started') + +def join_error_cb(reason): + print('Join procedure failed: ', reason) + +def attach_app_cb(node_path, dict_array): + print('Mesh application registered ', node_path) + + obj = bus.get_object(MESH_SERVICE_NAME, node_path) + + global node + node = dbus.Interface(obj, MESH_NODE_IFACE) + + els = unwrap(dict_array) + print("Get Elements") + + for el in els: + idx = struct.unpack('b', el[0])[0] + print('Configuration for Element ', end='') + print(idx) + + models = el[1] + element = app.get_element(idx) + element.set_model_config(models) + +def attach_app_error_cb(error): + print('Failed to register application: ' + str(error)) + mainloop.quit() + +def attach(token): + print('Attach') + mesh_net.Attach(app.get_path(), token, + reply_handler=attach_app_cb, + error_handler=attach_app_error_cb) + +def interfaces_removed_cb(object_path, interfaces): + if not mesh_net: + return + + if object_path == mesh_net[2]: + print('Service was removed') + mainloop.quit() + +def send_response(path, dest, key, data): + print('send response ', end='') + print(data) + node.Send(path, dest, key, data, reply_handler=generic_reply_cb, + error_handler=generic_error_cb) + +def send_publication(path, model_id, data): + print('send publication ', end='') + print(data) + node.Publish(path, model_id, data, reply_handler=generic_reply_cb, + error_handler=generic_error_cb) + +class PubTimer(): + def __init__(self): + self.seconds = None + self.func = None + self.thread = None + self.busy = False + + def _timeout_cb(self): + self.func() + self.busy = True + self._schedule_timer() + self.busy =False + + def _schedule_timer(self): + self.thread = Timer(self.seconds, self._timeout_cb) + self.thread.start() + + def start(self, seconds, func): + self.func = func + self.seconds = seconds + if not self.busy: + self._schedule_timer() + + def cancel(self): + print('Cancel timer') + if self.thread is not None: + print('Cancel thread') + self.thread.cancel() + self.thread = None + +class Application(dbus.service.Object): + + def __init__(self, bus): + self.path = '/example' + self.agent = None + self.elements = [] + dbus.service.Object.__init__(self, bus, self.path) + + def set_agent(self, agent): + self.agent = agent + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_element(self, element): + self.elements.append(element) + + def get_element(self, idx): + for ele in self.elements: + if ele.get_index() == idx: + return ele + + def get_properties(self): + return { + MESH_APPLICATION_IFACE: { + 'CompanyID': dbus.UInt16(APP_COMPANY_ID), + 'ProductID': dbus.UInt16(APP_PRODUCT_ID), + 'VersionID': dbus.UInt16(APP_VERSION_ID) + } + } + + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') + + def GetManagedObjects(self): + response = {} + print('GetManagedObjects') + response[self.path] = self.get_properties() + response[self.agent.get_path()] = self.agent.get_properties() + for element in self.elements: + response[element.get_path()] = element.get_properties() + return response + + @dbus.service.method(MESH_APPLICATION_IFACE, + in_signature="t", out_signature="") + + def JoinComplete(self, value): + global token + print('JoinComplete ', value) + + token = value + attach(token) + + @dbus.service.method(MESH_APPLICATION_IFACE, + in_signature="s", out_signature="") + + def JoinFailed(self, value): + print('JoinFailed ', value) + token = value + +class Element(dbus.service.Object): + PATH_BASE = '/example/ele' + + def __init__(self, bus, index): + self.path = self.PATH_BASE + format(index, '02x') + print(self.path) + self.models = [] + self.bus = bus + self.index = index + dbus.service.Object.__init__(self, bus, self.path) + + def _get_sig_models(self): + ids = [] + for model in self.models: + id = model.get_id() + vendor = model.get_vendor() + if vendor == VENDOR_ID_NONE: + ids.append(id) + return ids + + def get_properties(self): + return { + MESH_ELEMENT_IFACE: { + 'Index': dbus.Byte(self.index), + 'Models': dbus.Array(self._get_sig_models(), 'q') + } + } + + def add_model(self, model): + model.set_path(self.path) + self.models.append(model) + + def get_index(self): + return self.index + + def set_model_config(self, configs): + print('Set element models config') + for config in configs: + mod_id = config[0] + self.UpdateModelConfiguration(mod_id, config[1]) + + @dbus.service.method(MESH_ELEMENT_IFACE, + in_signature="qqbay", out_signature="") + def MessageReceived(self, source, key, is_sub, data): + print('Message Received on Element ', end='') + print(self.index) + for model in self.models: + model.process_message(source, key, data) + + @dbus.service.method(MESH_ELEMENT_IFACE, + in_signature="qa{sv}", out_signature="") + + def UpdateModelConfiguration(self, model_id, config): + print('UpdateModelConfig ', end='') + print(hex(model_id)) + for model in self.models: + if model_id == model.get_id(): + model.set_config(config) + return + + @dbus.service.method(MESH_ELEMENT_IFACE, + in_signature="", out_signature="") + + def get_path(self): + return dbus.ObjectPath(self.path) + +class Model(): + def __init__(self, model_id): + self.cmd_ops = [] + self.model_id = model_id + self.vendor = VENDOR_ID_NONE + self.bindings = [] + self.pub_period = 0 + self.pub_id = 0 + self.path = None + + def set_path(self, path): + self.path = path + + def get_id(self): + return self.model_id + + def get_vendor(self): + return self.vendor + + def process_message(self, source, key, data): + print('Model process message') + + def set_publication(self, period): + self.pub_period = period + + def set_config(self, config): + if 'Bindings' in config: + self.bindings = config.get('Bindings') + print('Bindings: ', end='') + print(self.bindings) + if 'PublicationPeriod' in config: + self.set_publication(config.get('PublicationPeriod')) + print('Model publication period ', end='') + print(self.pub_period, end='') + print(' ms') + +class OnOffServer(Model): + def __init__(self, model_id): + Model.__init__(self, model_id) + self.cmd_ops = { 0x8201, # get + 0x8202, # set + 0x8203 } # set unacknowledged + + print("OnOff Server ", end="") + self.state = 0 + print('State ', end='') + self.timer = PubTimer() + + def process_message(self, source, key, data): + datalen = len(data) + print('OnOff Server process message len ', datalen) + + if datalen!=2 and datalen!=3: + return + + if datalen==2: + op_tuple=struct.unpack('<H',bytes(data)) + opcode = op_tuple[0] + if opcode != 0x8201: + print(hex(opcode)) + return + print('Get state') + elif datalen==3: + opcode,self.state=struct.unpack('<HB',bytes(data)) + if opcode != 0x8202 and opcode != 0x8203: + print(hex(opcode)) + return + print('Set state: ', end='') + print(self.state) + + rsp_data = struct.pack('<HB', 0x8204, self.state) + send_response(self.path, source, key, rsp_data) + + def publish(self): + print('Publish') + data = struct.pack('B', self.state) + send_publication(self.path, self.model_id, data) + + def set_publication(self, period): + if period == 0: + self.pub_period = 0 + self.timer.cancel() + return + + # We do not handle ms in this example + if period < 1000: + return + + self.pub_period = period + self.timer.start(period/1000, self.publish) + +def main(): + + DBusGMainLoop(set_as_default=True) + + global bus + bus = dbus.SystemBus() + global mainloop + global app + global mesh_net + + mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME, + "/org/bluez/mesh"), + MESH_NETWORK_IFACE) + mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb) + + app = Application(bus) + prov_agent = agent.Agent(bus) + app.set_agent(prov_agent) + first_ele = Element(bus, 0x00) + first_ele.add_model(OnOffServer(0x1000)) + app.add_element(first_ele) + + mainloop = GObject.MainLoop() + + print('Join') + caps = ["out-numeric"] + oob = ["other"] + uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F") + print(uuid) + mesh_net.Join(app.get_path(), uuid, + reply_handler=join_cb, + error_handler=join_error_cb) + + mainloop.run() + +if __name__ == '__main__': + main() -- 2.14.5