Signed-off-by: Christopher Friedt chrisfriedt@xxxxxxxxx commit c8536b2e007a84aff09e800ab59d8f0387a74098 Author: Christopher Friedt <chrisfriedt@xxxxxxxxx> Date: Sat Jul 31 23:21:55 2010 +0200 fixed uwb pretty-printing. added lots of new stuff: cbaf, *wa events, added event dispatcher. diff --git a/VUsbTools/Decode.py b/VUsbTools/Decode.py index 1ac5de7..6004e31 100644 --- a/VUsbTools/Decode.py +++ b/VUsbTools/Decode.py @@ -369,7 +369,8 @@ class ControlDecoder: while buffer: desc = self.descriptorClass() buffer = desc.decode(buffer) - setup.event.appendDecoded("\n%s descriptor:\n%s" % (desc.type, desc)) + setup.event.appendDecoded("\n%s descriptor:\n%s" % + (desc.type.replace("_", " "), desc)) descriptors.append(desc) self.device.storeDescriptors(descriptors) diff --git a/VUsbTools/Decoders/WirelessUsb.py b/VUsbTools/Decoders/WirelessUsb.py index 7d2c562..457dea6 100644 --- a/VUsbTools/Decoders/WirelessUsb.py +++ b/VUsbTools/Decoders/WirelessUsb.py @@ -7,7 +7,6 @@ # Copyright (C) 2005-2010 Christopher Friedt. Licensed under the MIT # License, please see the README.txt. All rights reserved. # - #################### NOTE: THIS IS A WORK-IN-PROGRESS #################### from VUsbTools import Decode, Struct @@ -20,27 +19,25 @@ class BCD16(Struct.Item): _format = 'H' def __str__(self): return "%d.%02d" % ( (self._value & 0xff00) >> 8, (self._value & 0xff) ) - class UInt8Exp2(Struct.UInt8): def decode(self,buffer): self._value = 2**buffer[0] return buffer[1:] - class ByteArray(Struct.UInt8): def __init__(self,l,name): - if l <= 0: + if l < 0: raise ValueError('arrays must be greater than or equal to zero') x = 'B' * l - y = '%02X ' * (l-1) + '%02X' + y = None if l == 0 else '%02X ' * (l-1) + '%02X' self._length = l self._name = name self._format = x self._strFormat = y self._value = None def decode(self,buffer): - self._value = struct.unpack(self._format,buffer[:self._length]) + if self._length: + self._value = struct.unpack(self._format,buffer[:self._length]) return buffer[self._length:] - # FIXME: this is an ugly hack class KeyDescriptor(Struct.Item): def __init__(self,l,name): @@ -115,53 +112,55 @@ class RPipeDescriptor(Struct.Item): ####################### UWB RADIO ######################## -# Command Results -rcResultCodes = Struct.EnumDict({ - 0x00: 'SUCCESS', - 0x01: 'FAILURE', - 0x02: 'FAILURE_HARDWARE', - 0x03: 'FAILURE_NO_SLOTS', - 0x04: 'FAILURE_BEACON_TOO_LARGE', - 0x05: 'FAILURE_INVALID_PARAMETER', - 0x06: 'FAILURE_UNSUPPORTED_PWR_LEVEL', - 0x07: 'TIME_OUT', - }) - -rcCommandOrEventTypes = Struct.EnumDict({ - 0x00: 'GENERAL', - # the rest are vendor / reserved - }) - -rcCommandsOrEvents = Struct.EnumDict({ - # NOTIFICATIONS - 0x00: 'AS_PROBE_IE_RECEIVED', - 0x01: 'BEACON_RECEIVED', - 0x02: 'BEACON_SIZE_CHANGE', - 0x03: 'BPOIE_CHANGE', - 0x04: 'BP_SLOT_CHANGE', - 0x05: 'BP_SWITCH_IE_RECEIVED', - 0x06: 'DEV_ADDR_CONFLICT', - 0x07: 'DRP_AVAILABILITY_CHANGE', - 0x08: 'DRP', - # 0x09-0x0f reserved - - # COMMANDS - 0x10: 'CHANNEL_CHANGE', - 0x11: 'DEV_ADDR', - 0x12: 'GET_IE', - 0x13: 'RESET', - 0x14: 'SCAN', - 0x15: 'SET_BEACON_FILTER', - 0x16: 'SET_DRP_IE', - 0x17: 'SET_IE', - 0x18: 'SET_NOTIFICATION_FILTER', - 0x19: 'SET_TX_POWER', - 0x1a: 'SLEEP', - 0x1b: 'START_BEACONING', - 0x1c: 'STOP_BEACONING', -# XXX: This is not part of the WUSB spec, but exists with Wisair hardware - 0xfffe: 'HEARTBEAT', - }) +class RcTypes(): + + # Command Results + results = Struct.EnumDict({ + 0x00: 'SUCCESS', + 0x01: 'FAILURE', + 0x02: 'FAILURE_HARDWARE', + 0x03: 'FAILURE_NO_SLOTS', + 0x04: 'FAILURE_BEACON_TOO_LARGE', + 0x05: 'FAILURE_INVALID_PARAMETER', + 0x06: 'FAILURE_UNSUPPORTED_PWR_LEVEL', + 0x07: 'TIME_OUT', + }) + + cmdOrEvtTypes = Struct.EnumDict({ + 0x00: 'GENERAL', + # the rest are vendor / reserved + }) + + cmdOrEvts = Struct.EnumDict({ + # NOTIFICATIONS + 0x00: 'AS_PROBE_IE_RECEIVED', + 0x01: 'BEACON_RECEIVED', + 0x02: 'BEACON_SIZE_CHANGE', + 0x03: 'BPOIE_CHANGE', + 0x04: 'BP_SLOT_CHANGE', + 0x05: 'BP_SWITCH_IE_RECEIVED', + 0x06: 'DEV_ADDR_CONFLICT', + 0x07: 'DRP_AVAILABILITY_CHANGE', + 0x08: 'DRP', + # 0x09-0x0f reserved + + # COMMANDS + 0x10: 'CHANNEL_CHANGE', + 0x11: 'DEV_ADDR', + 0x12: 'GET_IE', + 0x13: 'RESET', + 0x14: 'SCAN', + 0x15: 'SET_BEACON_FILTER', + 0x16: 'SET_DRP_IE', + 0x17: 'SET_IE', + 0x18: 'SET_NOTIFICATION_FILTER', + 0x19: 'SET_TX_POWER', + 0x1a: 'SLEEP', + 0x1b: 'START_BEACONING', + 0x1c: 'STOP_BEACONING', + # XXX: This is not part of the WUSB spec, but exists with Wisair hardware + 0xfffe: 'HEARTBEAT', + }) class UwbControlDescriptorGroup(Struct.Group): @@ -169,7 +168,7 @@ class UwbControlDescriptorGroup(Struct.Group): (0x21,0x28):"exec_rc_cmd" }) - headerStruct = lambda self: ( + structHeader = lambda self: ( Struct.UInt8("bCommandType"), Struct.UInt16("wCommand"), Struct.UInt8("bCommandContext"), @@ -185,13 +184,8 @@ class UwbControlDescriptorGroup(Struct.Group): # XXX: WUSB Spec says this is a 64-bit address, but they are only 48-bit ByteArray(6,"baAddr"), ) - - struct_get_ie = lambda self: ( - ) - - struct_reset = lambda self: ( - ) - + struct_get_ie = lambda self: None + struct_reset = lambda self: None struct_scan = lambda self: ( Struct.UInt8("bChannelNumber"), Struct.UInt8("bScanState"), @@ -207,33 +201,26 @@ class UwbControlDescriptorGroup(Struct.Group): Struct.UInt16("wIELength"), ByteArray(n,"IEData"), ) - struct_set_ie = lambda self, n: ( Struct.UInt16("wIELength"), ByteArray(n,"IEData"), ) - struct_set_notification_filter = lambda self: ( Struct.UInt16("wNotification"), Struct.UInt8("bEnableState"), ) - struct_set_tx_power = lambda self: ( Struct.UInt8("bPowerLevel"), ) - struct_sleep = lambda self: ( Struct.UInt8("bHibernationCount"), Struct.UInt8("bHibernateDuration"), ) - struct_start_beaconing = lambda self: ( Struct.UInt16("wBPSTOffset"), Struct.UInt8("bChannelNumber"), ) - - struct_stop_beaconing = lambda self: ( - ) + struct_stop_beaconing = lambda self: None def __init__(self): Struct.Group.__init__(self, "descriptors") @@ -259,14 +246,14 @@ class UwbControlDescriptorGroup(Struct.Group): } # Common descriptor header - buffer = Struct.Group.decode(self, buffer, self.headerStruct()) + buffer = Struct.Group.decode(self, buffer, self.structHeader()) - cmdt = rcCommandOrEventTypes[self.bCommandType] + cmdt = RcTypes.cmdOrEvtTypes[self.bCommandType] if cmdt is not 'GENERAL': raise ValueError("rcCommandType %s is not supported" % cmdt ) # Decode command type - cmd = str(rcCommandsOrEvents[self.wCommand]).lower() + cmd = str(RcTypes.cmdOrEvts[self.wCommand]).lower() if str(cmd).startswith('0x'): raise ValueError("rcCommand %s is not supported" % cmd ) @@ -295,37 +282,28 @@ class UwbControlDescriptorGroup(Struct.Group): Struct.Group.decode(self, descriptor, dr) return buffer - class UwbControlDecoder(Decode.ControlDecoder): - classRequests = Struct.EnumDict({ - 0x28: "EXEC_RC_CMD", - }) + + requests = Struct.EnumDict({ + (0x21,0x28): "EXEC_RC_CMD", + }) def handleEvent(self, event): if not event.isDataTransaction(): return setup = Decode.SetupPacket(event) - - # Look up the request name - setup.requestName = getattr(self, "%sRequests" % setup.type, - Struct.EnumDict())[setup.request] - - # Look up a corresponding decoder - d = getattr(self, "decode_%s" % setup.requestName, self.decodeGeneric) - d(setup) - - def decode_EXEC_RC_CMD(self,setup): + setup.event.decoded = None setup.event.decodedSummary = None x = UwbControlDescriptorGroup() buffer = setup.event.data[8:] - setup.event.data = x.decode(buffer) + buffer = x.decode(buffer) setup.event.pushDecoded(str(x)) - setup.event.pushDecoded('UWB Command: %s' % x.name ) - + setup.event.pushDecoded('UWB Command (%s)' % x.name ) + return buffer class UwbEventDescriptorGroup(Struct.Group): - headerStruct = lambda self: ( + structHeader = lambda self: ( Struct.UInt8("bEventType"), Struct.UInt16("wEvent"), Struct.UInt8("bEventContext"), @@ -391,9 +369,7 @@ class UwbEventDescriptorGroup(Struct.Group): Struct.UInt8("bSlotNumber"), ) struct_bp_switch_ie_received = struct_as_probe_ie_received - struct_dev_addr_conflict = lambda self: ( - ) - + struct_dev_addr_conflict = lambda self: None struct_drp_availability_change = lambda self: ( # XXX: WUSB Spec claims wIELength field. However, IEData is a fixed 32-bytes ByteArray(32,"IEData"), @@ -418,6 +394,20 @@ class UwbEventDescriptorGroup(Struct.Group): hdr_sz = 4 + dlens_before = { "get_ie":6, "as_probe_ie_received":8, + "bp_switch_ie_received":8, +# XXX: WUSB Spec is missing bBeaconType field at offset 5 + "beacon_received": 12, + "bpoie_change":6, +# XXX: WUSB Spec claims wIELength field. However, IEData is a fixed 32-bytes + "drp_availability_change":4, + "drp":9, + } + doffs = { + "get_ie":0, "as_probe_ie_received":2, "bp_switch_ie_received":2, + "beacon_received":6, "bpoie_change":0, "drp_availability_change":0, + "drp":3, + } dlens = { "channel_change":5, # XXX: WUSB Spec says this is a 64-bit address, but they are only 48-bit "dev_addr":11, @@ -431,40 +421,21 @@ class UwbEventDescriptorGroup(Struct.Group): "drp_availability_change":36, # XXX: This is not part of the WUSB spec, but exists with Wisair hardware "drp":-1, "heartbeat":12, - } - - dnames = { "channel_change":"Channel Change", "dev_addr":"Dev Addr", - "get_ie":"Get IE", "reset":"Reset", "scan":"Scan", - "set_beacon_filter":"Set Beacon Filter", "set_drp_ie":"Set DRP IE", - "set_ie":"Set IE", "set_notification_filter":"Set Notification Filter", - "set_tx_power":"Set TX Power", "sleep":"Sleep", - "start_beaconing":"Start Beaconing", "stop_beaconing":"Stop Beaconing", - "as_probe_ie_received":"AS Probe IE Received", "beacon_received":"Beacon Received", - "beacon_size_change":"Beacon Size Change", "bpoie_change":"BPOIE Change", - "bp_slot_change":"BP Slot Change", "bp_switch_ie_received":"BP Switch IE Received", - "dev_addr_conflict":"Dev Addr Conflict", "drp_availability_change":"DRP Availability Change", - "drp":"DRP", -# XXX: This is not part of the WUSB spec, but exists with Wisair hardware - "heartbeat":"HeartBeat" } - - if not buffer: - print 'WirelessUSB Radio Event: buffer is empty' - return - + } + # Common descriptor header - buffer = Struct.Group.decode(self, buffer, self.headerStruct()) + buffer = Struct.Group.decode(self, buffer, self.structHeader()) - evt = rcCommandOrEventTypes[self.bEventType] + evt = RcTypes.cmdOrEvtTypes[self.bEventType] if evt is not 'GENERAL': raise ValueError("rcEventType %s is not supported" % evt ) # Decode command type - ev = rcCommandsOrEvents[self.wEvent].lower() + ev = RcTypes.cmdOrEvts[self.wEvent].lower() + self.ev = ev if ev.startswith('0x'): raise ValueError("rcEvent %s is not supported" % ev ) - self.ev_name = dnames[ev] - d = getattr(self, "struct_%s" % ev, lambda: None) up = lambda buf, ofs: struct.unpack_from('<H',buf,ofs)[0] @@ -479,45 +450,44 @@ class UwbEventDescriptorGroup(Struct.Group): raise ValueError( "expected packet length >= %d but have %d" % (dlen-hdr_sz,len(buffer)) ) if dlen == -1: - if ev == "get_ie": - nb = 6 - n = up(buffer,0) - elif ev == "as_probe_ie_received" or ev == "bp_switch_ie_received": - nb = 8 - n = up(buffer,2) - elif ev == "beacon_received": -# XXX: WUSB Spec is missing bBeaconType field at offset 5 - nb = 12 - n = up(buffer,6) - elif ev == "bpoie_change": - nb = 6 - n = up(buffer,0) -# XXX: WUSB Spec claims wIELength field. However, IEData is a fixed 32-bytes - elif ev == "drp_availability_change": - nb = 4 - n = up(buffer,0) - elif ev == "drp": - nb = 9 - n = up(buffer,3) - dlen = nb + n + nb = dlens_before[ev] + offs = doffs[ev] + n = up(buffer,offs) + dlen = nb + n + dr = d(n) else: dr = d() buffer = Struct.Group.decode(self, buffer, dr) return buffer - class UwbEventDecoder(): + + dnames = { "channel_change":"Channel Change", "dev_addr":"Dev Addr", + "get_ie":"Get IE", "reset":"Reset", "scan":"Scan", + "set_beacon_filter":"Set Beacon Filter", "set_drp_ie":"Set DRP IE", + "set_ie":"Set IE", "set_notification_filter":"Set Notification Filter", + "set_tx_power":"Set TX Power", "sleep":"Sleep", + "start_beaconing":"Start Beaconing", "stop_beaconing":"Stop Beaconing", + "as_probe_ie_received":"AS Probe IE Received", "beacon_received":"Beacon Received", + "beacon_size_change":"Beacon Size Change", "bpoie_change":"BPOIE Change", + "bp_slot_change":"BP Slot Change", "bp_switch_ie_received":"BP Switch IE Received", + "dev_addr_conflict":"Dev Addr Conflict", "drp_availability_change":"DRP Availability Change", + "drp":"DRP", +# XXX: This is not part of the WUSB spec, but exists with Wisair hardware + "heartbeat":"HeartBeat" } + descriptorClass = UwbEventDescriptorGroup def handleEvent(self,event): if not event.data: return x = self.descriptorClass() - event.data = x.decode(event.data) + buffer = event.data + buffer = x.decode(buffer) event.pushDecoded( str(x) ) label = "Response" if x.wEvent in range(0x10,0x1c + 1) else "Notification" - event.pushDecoded( "UWB %s: %s" % (label,x.ev_name) ) + event.pushDecoded( "UWB %s (%s)" % (label,self.dnames[x.ev]) ) ####################### HOST WIRE ADAPTER #################### @@ -641,6 +611,8 @@ class HwaControlDescriptorGroup(Struct.Group): event.decoded = None event.decodedSummary = None + buffer = event.data + setup = Decode.SetupPacket(event) self.setup = setup request = self.requests.get((setup.bitmap,setup.request)) @@ -672,8 +644,7 @@ class HwaControlDescriptorGroup(Struct.Group): dr = d() offs = self.doffs.get(request) - Struct.Group.decode(self, event.data[offs:], dr) - + return Struct.Group.decode(self, buffer[offs:], dr) class HwaControlDecoder(Decode.ControlDecoder): dnames = { @@ -695,17 +666,60 @@ class HwaControlDecoder(Decode.ControlDecoder): cmd.decode(event) event.pushDecoded( "%s" % cmd ) event.pushDecoded("HWA Command (%s)" % self.dnames.get(cmd.request) ) +class HwaEventDescriptorGroup(Struct.Group): -class HwaEventDecoder(): + requests = Struct.EnumDict({ + 0x94: "bpst_adj", + 0x95: "dn_received", + }) + + structHeader = lambda self: ( + Struct.UInt8("bLength"), + Struct.UInt8("bNotifyType"), + ) + struct_bpst_adj = lambda self:( + Struct.UInt8("bAdjustment"), + ) + struct_dn_received = lambda self, n: ( + Struct.UInt8("bmAttributes"), + ByteArray(n,"NotificationSpecific"), + ) + + def __init__(self): + Struct.Group.__init__(self,"descriptors") - def handleEvent(self,event): + def decode(self,event): + # erase the default decoding + event.decoded = None + event.decodedSummary = None + buffer = event.data - if buffer: - print 'HwaEvent Dump:' - for i in range(len(buffer)): - print '0x%02x' % ord(buffer[i]) -# TODO: finish this off - z = 0 + 0 + buffer = Struct.Group.decode(self, buffer, self.structHeader()) + + ev = self.requests.get(self.bNotifyType) + self.ev = ev + + n = 0 + if ev == "dn_received": + n = self.bLength - 3 + + d = getattr(self, "struct_%s" % self.ev, lambda: None) + try: + if n: + dr = d(n) + else: + dr = d() + except Exception as inst: + print type(inst) + + return Struct.Group.decode(self, buffer, dr) +class HwaEventDecoder(): + dnames = { "bpst_adj":"BPST Adjustment", "dn_received":"DN Received" } + def handleEvent(self,event): + evt = HwaEventDescriptorGroup() + evt.decode(event) + event.pushDecoded( "%s" % evt ) + event.pushDecoded("HWA Event (%s)" % self.dnames.get(evt.ev) ) ####################### DEVICE WIRE ADAPTER #################### @@ -768,7 +782,6 @@ class DwaControlDescriptorGroup(Struct.Group): offs = self.doffs.get(request) Struct.Group.decode(self, event.data[offs:], dr) - class DwaControlDecoder(Decode.ControlDecoder): dnames = { "clear_port_feature":"Clear Port Feature", @@ -781,28 +794,47 @@ class DwaControlDecoder(Decode.ControlDecoder): cmd.decode(event) event.pushDecoded( "%s" % cmd ) event.pushDecoded("DWA Command (%s)" % self.dnames.get(cmd.request) ) +class DwaEventDescriptorGroup(Struct.Group): + + requests = Struct.EnumDict({ + 0x91: "remote_wake", + 0x92: "port_status", + }) + structHeader = lambda self: ( + Struct.UInt8("bLength"), + Struct.UInt8("bNotifyType"), + ) + struct_remote_wake = lambda self: None + struct_port_status = lambda self: ( + Struct.UInt8("bPortIndex"), + ) + + def __init__(self): + Struct.Group.__init__(self,"descriptors") + def decode(self,event): + # erase the default decoding + event.decoded = None + event.decodedSummary = None + + buffer = event.data + buffer = Struct.Group.decode(self, buffer, self.structHeader()) + + self.ev = self.requests.get(self.bNotifyType) + + return Struct.Group.decode(self, buffer, + getattr(self, "struct_%s" % self.ev, lambda: None)()) class DwaEventDecoder(): + dnames = { "remote_wake":"Remote Wake", "port_status":"Port Status" } def handleEvent(self,event): -# TODO: finish this off - z = 0 + 0 + evt = DwaEventDescriptorGroup() + evt.decode(event) + event.pushDecoded( "%s" % evt ) + event.pushDecoded("DWA Event (%s)" % self.dnames.get(evt.ev) ) ####################### WIRE ADAPTER (COMMON) #################### class WaControlDescriptorGroup(Struct.Group): - - doffs = { "abort_rpipe":4, "clear_rpipe_feature":2, - "clear_wire_adapter_feature":2, "get_rpipe_descriptor":2, - "get_rpipe_status":4, "get_wire_adapter_status":4, - "set_rpipe_descriptor":2, "set_rpipe_feature":2, - "set_wire_adapter_feature":2, "reset_rpipe":4, } - - dlens = { "abort_rpipe":8, "clear_rpipe_feature":8, - "clear_wire_adapter_feature":8, "get_rpipe_descriptor":36, - "get_rpipe_status":9, "get_wire_adapter_status":12, - "set_rpipe_descriptor":36, "set_rpipe_feature":8, - "set_wire_adapter_feature":8, "reset_rpipe":8, } - requests = Struct.EnumDict({ (0x25,0x0e): "abort_rpipe", (0x25,0x01): "clear_rpipe_feature", @@ -816,6 +848,18 @@ class WaControlDescriptorGroup(Struct.Group): (0x25,0x0f): "reset_rpipe", }) + doffs = { "abort_rpipe":4, "clear_rpipe_feature":2, + "clear_wire_adapter_feature":2, "get_rpipe_descriptor":2, + "get_rpipe_status":4, "get_wire_adapter_status":4, + "set_rpipe_descriptor":2, "set_rpipe_feature":2, + "set_wire_adapter_feature":2, "reset_rpipe":4, } + + dlens = { "abort_rpipe":8, "clear_rpipe_feature":8, + "clear_wire_adapter_feature":8, "get_rpipe_descriptor":36, + "get_rpipe_status":9, "get_wire_adapter_status":12, + "set_rpipe_descriptor":36, "set_rpipe_feature":8, + "set_wire_adapter_feature":8, "reset_rpipe":8, } + struct_abort_rpipe = lambda self: ( Struct.UInt16("RPipeIndex"), ) @@ -862,6 +906,8 @@ class WaControlDescriptorGroup(Struct.Group): event.decoded = None event.decodedSummary = None + buffer = event.data + setup = Decode.SetupPacket(event) self.setup = setup request = self.requests.get((setup.bitmap,setup.request)) @@ -874,30 +920,60 @@ class WaControlDescriptorGroup(Struct.Group): dr = d() offs = self.doffs.get(request) - Struct.Group.decode(self, event.data[offs:], dr) - + return Struct.Group.decode(self, buffer[offs:], dr) class WaControlDecoder(Decode.ControlDecoder): dnames = { "abort_rpipe":"Abort RPipe", - "clear_rpipe_feature":"Clear RPipe Feature", + "clear_rpipe_feature":"Clear RPipe Feature", "clear_wire_adapter_feature":"Clear Wire Adapter Feature", - "get_rpipe_descriptor":"Get RPipe Descriptor", + "get_rpipe_descriptor":"Get RPipe Descriptor", "get_rpipe_status":"Get RPipe Status", - "get_wire_adapter_status":"Get Wire Adapter Status", + "get_wire_adapter_status":"Get Wire Adapter Status", "set_rpipe_descriptor":"Set RPipe Descriptor", - "set_rpipe_feature":"Set RPipe Feature", + "set_rpipe_feature":"Set RPipe Feature", "set_wire_adapter_feature":"Set Wire Adapter Feature", - "reset_rpipe":"Reset RPipe", } + "reset_rpipe":"Reset RPipe" } def handleEvent(self,event): cmd = WaControlDescriptorGroup() cmd.decode(event) event.pushDecoded( "%s" % cmd ) event.pushDecoded("WA Command (%s)" % self.dnames.get(cmd.request) ) +class WaEventDescriptorGroup(Struct.Group): + + requests = Struct.EnumDict({ + 0x91: "transfer", + }) + structHeader = lambda self: ( + Struct.UInt8("bLength"), + Struct.UInt8("bNotifyType"), + ) + struct_transfer = lambda self:( + Struct.UInt8("bEndpoint"), + Struct.UInt8("bReserved"), + ) + + def __init__(self): + Struct.Group.__init__(self,"descriptors") + def decode(self,event): + # erase the default decoding + event.decoded = None + event.decodedSummary = None + + buffer = event.data + buffer = Struct.Group.decode(self, buffer, self.structHeader()) + + self.ev = self.requests.get(self.bNotifyType) + + return Struct.Group.decode(self, buffer, + getattr(self, "struct_%s" % self.ev, lambda: None)()) class WaEventDecoder(): + dnames = { "transfer":"Transfer Completion", } def handleEvent(self,event): -# TODO: finish this off - z = 0 + 0 + evt = WaEventDescriptorGroup() + evt.decode(event) + event.pushDecoded( "%s" % evt ) + event.pushDecoded("WA Event (%s)" % self.dnames.get(evt.ev) ) ####################### SECURITY ####################### @@ -973,11 +1049,10 @@ class SecurityControlDescriptorGroup(Struct.Group): # erase the default decoding event.decoded = None event.decodedSummary = None - - buffer2 = buffer + + buffer = event.data setup = Decode.SetupPacket(event) - self.setup = setup request = self.requests.get((setup.bitmap,setup.request)) assert(request) @@ -1004,10 +1079,8 @@ class SecurityControlDescriptorGroup(Struct.Group): else: dr = d() - self.data = buffer2[:8+n] offs = self.doffs.get(request) - Struct.Group.decode(self, event.data[offs:], dr) - + return Struct.Group.decode(self, buffer[offs:], dr) class SecurityControlDecoder(Decode.ControlDecoder): dnames = { "get_key":"Get Key", "set_key":"Set Key", @@ -1026,22 +1099,177 @@ class SecurityControlDecoder(Decode.ControlDecoder): cmd.decode(event) event.pushDecoded( "%s" % cmd ) event.pushDecoded("Security Command (%s)" % self.dnames.get(cmd.request) ) - class SecurityEventDecoder(): def handleEvent(self,event): z = 0 + 0 ####################### WIRELESS USB ####################### +class WusbControlDescriptorGroup(Decode.DescriptorGroup): + requests = Struct.EnumDict({ + (0x00,0x01):"clear_feature", + (0x80,0x00):"get_status", + (0x00,0x05):"set_address", + (0x00,0x03):"set_feature", + (0x01,0x17):"set_interface_ds", + (0x00,0x14):"set_wusb_data", + (0x00,0x13):"loopback_data_write", + (0x80,0x15):"loopback_data_read", + }) + dlens = { "clear_feature":8, "get_status":-1, "set_address":8, + "set_feature":8, "set_interface_ds":10, "set_wusb_data":-1, + "loopback_data_write":-1, "loopback_data_read":-1, + } + doffs = { "clear_feature":4, "get_status":4, "set_address":2, + "set_feature":4, "set_interface_ds":2, "set_wusb_data":2, + "loopback_data_write":8, "loopback_data_read":8, + } + struct_clear_feature = lambda self: ( + Struct.UInt8("Feature"), + Struct.UInt8("Value"), + ) + struct_get_status = lambda self, n: ( + Struct.UInt8("Selector"), + Struct.UInt16("wLength"), + ByteArray(n,"SelectorData"), + ) + struct_set_address = lambda self: ( + Struct.UInt16("DeviceAddress"), + ) + struct_set_feature = struct_clear_feature + struct_set_interface_ds = lambda self: ( + Struct.UInt16("AlternateSetting"), + Struct.UInt16("Interface"), + Struct.UInt16("wLength"), + Struct.UInt16("SwitchTime"), + ) + def __init__(self): + Struct.Group.__init__(self,"descriptors") + + def decode(self,event): + # erase the default decoding + event.decoded = None + event.decodedSummary = None + + buffer = event.data + + setup = Decode.SetupPacket(event) + request = self.requests.get((setup.bitmap,setup.request)) + assert(request) + self.request = request + + # process data, if it exists + d = getattr(self, "struct_%s" % request, None) + + n = 0 + if self.dlens[request] == -1: + n = setup.wLength + dr = d(n) + else: + dr = d() + + offs = self.doffs.get(request) + return Struct.Group.decode(self, buffer[offs:], dr) class WusbControlDecoder(Decode.ControlDecoder): + dnames = { "clear_feature":"Clear Feature", "get_status":"Get Status", + "set_address":"Set Address", "set_feature":"Set Feature", + "set_interface_ds":"Set Interface DS", + "set_wusb_data":"Set WUSB Data", + "loopback_data_write":"Loopback Data Write", + "loopback_data_read":"Loopback Data Read", + } + def handlEvent(self,event): + cmd = WusbControlDescriptorGroup() + cmd.decode(event) + event.pushDecoded( "%s" % cmd ) + event.pushDecoded("WUSB Command (%s)" % self.dnames.get(cmd.request) ) +class WusbEventDecoder(): def handleEvent(self,event): z = 0 + 0 -class WusbEventDecoder(): +####################### CBAF ####################### + +class CbafControlDescriptorGroup(Decode.DescriptorGroup): + requests = Struct.EnumDict({ + (0xa1,0x01,0x0000):"get_association_info", + (0x21,0x03,0x0101):"set_host_info", + (0xa1,0x02,0x0200):"get_association_request", + (0x21,0x03,0x0201):"set_cc_data", + }) + struct_get_association_info = lambda self, n: ( + Struct.UInt16("Interface"), + Struct.UInt16("DataLength"), + ByteArray(n,"Data"), + ) + dlens = { "get_association_info":-1, "set_host_info":-1, + "get_association_request":-1, "set_cc_data":-1, } + doffs = { "get_association_info":4, "set_host_info":4, + "get_association_request":4, "set_cc_data":4, } + + struct_set_host_info = struct_get_association_info + struct_set_cc_data = struct_get_association_info + struct_get_association_request = struct_get_association_info + def __init__(self): + Struct.Group.__init__(self,"descriptors") + def decode(self,event): +# if not event.isDataTransaction(): +# return + # erase the default decoding + event.decoded = None + event.decodedSummary = None + + buffer = event.data + + setup = Decode.SetupPacket(event) + request = self.requests.get((setup.bitmap,setup.request,setup.wValue)) + assert(request) + self.request = request + + d = getattr(self, "struct_%s" % request, None) + + # process data, if it exists + n = 0 + if self.dlens[request] == -1: + n = min(setup.wLength,len(event.data)-8) + dr = d(n) + else: + dr = d() + + offs = self.doffs.get(request) + return Struct.Group.decode(self, buffer[offs:], dr) +class CbafControlDecoder(Decode.ControlDecoder): + classRequests = Struct.EnumDict({ + 0x01: "Get Association Information", + 0x01: "Get Association Request", + 0x01: "Set Association Response", + }) + dnames = { "get_association_information":"Get Association Info", + "set_host_info":"Set Host Info", + "get_association_request":"Get Association Request", + "set_cc_data":"Set Connection Context", } + def handleEvent(self,event): - z = 0 + 0 + buffer = event.data + setup = Decode.SetupPacket(event) + + if not event.isDataTransaction(): + event.decoded = None + event.decodedSummary = None + return + + if not CbafControlDescriptorGroup.requests.__contains__( + (setup.bitmap,setup.request,setup.wValue)): + # fall back to the default controller + x = Decode.ControlDecoder(self.device) + x.handleEvent(event) + return + + cmd = CbafControlDescriptorGroup() + cmd.decode(event) + event.pushDecoded( "%s" % str(cmd) ) + event.pushDecoded("CBAF Command (%s)" % self.dnames.get(cmd.request) ) -####################### CONTROL DISPATCHER ####################### +####################### DISPATCHERS ####################### class ControlDispatcher(Decode.ControlDecoder): @@ -1051,46 +1279,28 @@ class ControlDispatcher(Decode.ControlDecoder): def isUwbControlPacket(self,event): setup = Decode.SetupPacket(event) - if UwbControlDescriptorGroup.requests.keys().__contains__( (setup.bitmap,setup.request) ): - return 1 - return 0 - + return UwbControlDescriptorGroup.requests.keys().__contains__( + (setup.bitmap,setup.request) ) def isHwaControlPacket(self,event): setup = Decode.SetupPacket(event) - if HwaControlDescriptorGroup.requests.keys().__contains__( (setup.bitmap,setup.request) ): - return 1 - return 0 + return HwaControlDescriptorGroup.requests.keys().__contains__( + (setup.bitmap,setup.request) ) def isDwaControlPacket(self,event): setup = Decode.SetupPacket(event) - if DwaControlDescriptorGroup.requests.keys().__contains__( (setup.bitmap,setup.request) ): - return 1 - return 0 + return DwaControlDescriptorGroup.requests.keys().__contains__( + (setup.bitmap,setup.request) ) def isWaControlPacket(self,event): setup = Decode.SetupPacket(event) - if WaControlDescriptorGroup.requests.keys().__contains__( (setup.bitmap,setup.request) ): - return 1 - return 0 + return WaControlDescriptorGroup.requests.keys().__contains__( + (setup.bitmap,setup.request) ) def isSecurityControlPacket(self,event): setup = Decode.SetupPacket(event) - if SecurityControlDescriptorGroup.requests.keys().__contains__( (setup.bitmap,setup.request) ): - return 1 - return 0 + return SecurityControlDescriptorGroup.requests.keys().__contains__( + (setup.bitmap,setup.request) ) def isWusbControlPacket(self,event): - rqs = ( - (0x00,0x01), - (0x80,0x00), - (0x00,0x05), - (0x00,0x03), - (0x01,0x17), - (0x00,0x14), - (0x00,0x13), - (0x80,0x15), - ) setup = Decode.SetupPacket(event) - if rqs.__contains__( (setup.bitmap,setup.request) ): - return 1 - return 0 - + return WusbControlDescriptorGroup.requests.keys().__contains__( + (setup.bitmap,setup.request) ) def handleEvent(self, event): if not event.isDataTransaction(): @@ -1104,7 +1314,6 @@ class ControlDispatcher(Decode.ControlDecoder): assert(callable(q1)) if q1(event): q2 = getattr(sys.modules[globals()['__name__']],"%sControlDecoder" % i, None) - assert(callable(q2)) x = q2(self.device) if x: break @@ -1125,10 +1334,36 @@ class ControlDispatcher(Decode.ControlDecoder): # Look up a corresponding decoder d = getattr(self, "decode_%s" % setup.requestName, self.decodeGeneric) - try: - d(setup) - except Exception as inst: - print type(inst) + d(setup) + +class EventDispatcher(): + def __init__(self,device): + self.device = device + def isHwaEvent(self,event): + k = ord(event.data[1]) + return HwaEventDescriptorGroup.requests.keys().__contains__(k) + def isDwaEvent(self,event): + k = ord(event.data[1]) + return DwaEventDescriptorGroup.requests.keys().__contains__(k) + def isWaEvent(self,event): + k = ord(event.data[1]) + return WaEventDescriptorGroup.requests.keys().__contains__(k) + def handleEvent(self, event): + if not len(event.data): + return + + x = None + for i in ( "Hwa", "Dwa", "Wa", ): + q1 = getattr(self, "is%sEvent" % i, None ) + assert(callable(q1)) + if q1(event): + q2 = getattr(sys.modules[globals()['__name__']],"%sEventDecoder" % i, None) + x = q2() + if x: + break + + if x is not None: + x.handleEvent(event) ####################### WUSB DEVICE DESCRIPTORS ################## @@ -1139,7 +1374,17 @@ class DescriptorGroup(Decode.DescriptorGroup): 0x0e: "encryption", 0x21: "wire_adapter_class", 0x23: "radio_control_interface_class", + 0x06: "device_qualifier", }) + struct_device_qualifier = lambda self: ( + BCD16("bcdUSB"), + Struct.UInt8("bDeviceClass"), + Struct.UInt8("bDeviceSubClass"), + Struct.UInt8("bDeviceProtocol"), + Struct.UInt8("bMaxPacketSize0"), + Struct.UInt8("bNumConfigurations"), + Struct.UInt8("bReserved"), + ) struct_security = lambda self: ( Struct.UInt16("wTotalLength"), Struct.UInt8("bNumEncryptionTypes"), @@ -1219,31 +1464,45 @@ def detector(context): dev.bDeviceClass, dev.bDeviceSubClass, dev.bDeviceProtocol, - ) + ) + if not (clazz and subclazz and proto): +# FIXME: this is a dirty hack, see below... once again hijacking ep0.ctl + if devi.controlDecoder.__class__ != CbafControlDecoder: + devi.controlDecoder = CbafControlDecoder(devi) + devi.endpointDecoders[0] = devi.controlDecoder + return else: clazz = subclazz = proto = None + +# FIXME: There is a problem with the context +# when ep = 0, no context.interface information is provided, +# and that's the only way to identify a wisair cbaf interface, since +# the manufacturers identified it with bInterface = 0 !!!! (very naughty) +# Since other hardware often occasionally does this as well, vusb-analyzer +# should probably fixed to provide pertinent interface information, even +# when ep=0, but especially when bDevice{Class/SubClass/Interface} = 0 +# (i.e. when hardware is defined at the interface level) + + # Cable-Based Association interfaces get their own special treatment +# if ifc: +# (clazz,subclazz,proto) = ( +# ifc.bInterfaceClass, +# ifc.bInterfaceSubClass, +# ifc.bInterfaceProtocol, +# ) +# if (clazz,subclazz,proto) == (239,3,1): +# return CbafControlDecoder(devi) # We hijack the default control decoder for wire adapter peripherals # so that we can do the control routing ourselves (better) - if (clazz,subclazz,proto) == (239,2,2) and not ep: + if ( not ep ) and (clazz,subclazz,proto) == (239,2,2): if devi.controlDecoder.__class__ != ControlDispatcher: devi.controlDecoder = ControlDispatcher(devi) devi.endpointDecoders[0] = devi.controlDecoder return - # Cable-Based Association interfaces get their own special treatment - if ifc: - (clazz,proto,subclazz) = ( - ifc.bInterfaceClass, - ifc.bInterfaceSubClass, - ifc.bInterfaceProtocol, - ) - if (clazz,proto,subclazz) == (239,3,1): -# TODO: finish this off - print 'CableBasedAssociation' - # We respond with event decoders for interrupt endpoints - if ifc and ep: + if ep and ifc: (clazz,subclazz,proto,attr) = ( ifc.bInterfaceClass, ifc.bInterfaceSubClass, @@ -1253,5 +1512,4 @@ def detector(context): if (clazz,subclazz,proto,attr) == (224,1,2,3): return UwbEventDecoder() elif (clazz,subclazz,proto,attr) == (224,2,1,3): -# TODO: finish this off - print 'HwaEventDecoder' + return EventDispatcher(devi) commit 9f6b311d125056ff3e666a31ef7573bfa5668042 Author: Christopher Friedt <chrisfriedt@xxxxxxxxx> Date: Fri Jul 30 23:43:10 2010 +0200 pre-uwb-renovation diff --git a/VUsbTools/Decoders/WirelessUsb.py b/VUsbTools/Decoders/WirelessUsb.py index 8d829ca..7d2c562 100644 --- a/VUsbTools/Decoders/WirelessUsb.py +++ b/VUsbTools/Decoders/WirelessUsb.py @@ -970,12 +970,12 @@ class SecurityControlDescriptorGroup(Struct.Group): Struct.Group.__init__(self,"descriptors") def decode(self,event): - - # erase the default decoding event.decoded = None event.decodedSummary = None + buffer2 = buffer + setup = Decode.SetupPacket(event) self.setup = setup request = self.requests.get((setup.bitmap,setup.request)) @@ -996,6 +996,7 @@ class SecurityControlDescriptorGroup(Struct.Group): self.request = request # process data, if it exists + n = 0 d = getattr(self, "struct_%s" % request, None) if self.dlens[request] == -1: n = setup.wLength @@ -1003,6 +1004,7 @@ class SecurityControlDescriptorGroup(Struct.Group): else: dr = d() + self.data = buffer2[:8+n] offs = self.doffs.get(request) Struct.Group.decode(self, event.data[offs:], dr) @@ -1168,6 +1170,7 @@ class DescriptorGroup(Decode.DescriptorGroup): self.descriptorTypes.update(self.wusbDescriptorTypes) def decode(self, buffer): + # Common descriptor header buffer = Struct.Group.decode(self, buffer, self.headerStruct()) commit f2c8fafc25cdbd7f43537cb3b3168e612d72ebab Author: Christopher Friedt <chrisfriedt@xxxxxxxxx> Date: Fri Jul 30 17:17:11 2010 +0200 Prevented unconditional commandeering of the control endpoint in Decode.py. Added extra descriptor decoding / printing for WirelessUSB.py diff --git a/VUsbTools/Decode.py b/VUsbTools/Decode.py index 6637622..1ac5de7 100644 --- a/VUsbTools/Decode.py +++ b/VUsbTools/Decode.py @@ -70,7 +70,7 @@ class DecoderFactory: decoder.__module__, decoder.__class__.__name__) return decoder - if not context.endpoint: + if not ( decoder or context.endpoint): return ControlDecoder(context.devInstance) diff --git a/VUsbTools/Decoders/WirelessUsb.py b/VUsbTools/Decoders/WirelessUsb.py index 3148b63..8d829ca 100644 --- a/VUsbTools/Decoders/WirelessUsb.py +++ b/VUsbTools/Decoders/WirelessUsb.py @@ -16,6 +16,11 @@ import sys ####################### PACKED BINARY TYPES #################### +class BCD16(Struct.Item): + _format = 'H' + def __str__(self): + return "%d.%02d" % ( (self._value & 0xff00) >> 8, (self._value & 0xff) ) + class UInt8Exp2(Struct.UInt8): def decode(self,buffer): self._value = 2**buffer[0] @@ -1037,6 +1042,11 @@ class WusbEventDecoder(): ####################### CONTROL DISPATCHER ####################### class ControlDispatcher(Decode.ControlDecoder): + + def __init__(self,device): + self.descriptorClass = DescriptorGroup + self.device = device + def isUwbControlPacket(self,event): setup = Decode.SetupPacket(event) if UwbControlDescriptorGroup.requests.keys().__contains__( (setup.bitmap,setup.request) ): @@ -1112,8 +1122,85 @@ class ControlDispatcher(Decode.ControlDecoder): Struct.EnumDict())[setup.request] # Look up a corresponding decoder - d = getattr(self, "decode_%s" % setup.requestName, self.decodeGeneric) - d(setup) + d = getattr(self, "decode_%s" % setup.requestName, self.decodeGeneric) + try: + d(setup) + except Exception as inst: + print type(inst) + +####################### WUSB DEVICE DESCRIPTORS ################## + +class DescriptorGroup(Decode.DescriptorGroup): + + wusbDescriptorTypes = Struct.EnumDict({ + 0x0c: "security", + 0x0e: "encryption", + 0x21: "wire_adapter_class", + 0x23: "radio_control_interface_class", + }) + struct_security = lambda self: ( + Struct.UInt16("wTotalLength"), + Struct.UInt8("bNumEncryptionTypes"), + ) + struct_encryption = lambda self: ( + Struct.UInt8("bEncryptionType"), + Struct.UInt8("bEncryptionValue"), + Struct.UInt8("bAuthKeyIndex"), + ) +# struct_wire_adapter_class = lambda self, n: ( + struct_wire_adapter_class = lambda self: ( + BCD16("bcdWAVersion"), + Struct.UInt8("bNumPorts"), + Struct.UInt8("bmAttributes"), + Struct.UInt16("wNumRPipes"), + Struct.UInt16("wRPipeMaxBlock"), + Struct.UInt8("bRPipeBlockSize"), + Struct.UInt8("bPwrOn2PwrGood"), + Struct.UInt8("bNumMMCIEs"), +# ByteArray(n,"DeviceRemovable"), + Struct.UInt8("DeviceRemovable"), + ) + struct_radio_control_interface_class = lambda self: ( + BCD16("bcdRCIVersion"), + ) + def __init__(self): + Struct.Group.__init__(self, "descriptors") + self.descriptorTypes.update(self.wusbDescriptorTypes) + + def decode(self, buffer): + # Common descriptor header + buffer = Struct.Group.decode(self, buffer, self.headerStruct()) + + # Decode descriptor type + self.type = self.descriptorTypes[self.bDescriptorType] + +# FIXME: For some reason, I was unable to reliably decode wire_adapter_class +# descriptors as a variable-length descriptor... + + n = 0 +# if self.type == "wire_adapter_class": +# nmprts = min( ord(buffer[2]), 127 ) +# nmbytes = (nmprts + 1) / 8 +# remainder = (nmprts + 1) % 8 +# nmbytes = nmbytes + 1 if remainder else nmbytes +# n = nmbytes + + # Make sure that we eat exactly the right number of bytes, + # according to the descriptor header + descriptor = buffer[:self.bLength + n - 2] + buffer = buffer[self.bLength + n - 2:] + + # The rest of the decoding is done by a handler, in the form of a child item list. + d = getattr(self, "struct_%s" % self.type, lambda: None) + if n: + dr = d(n) + else: + dr = d() + + Struct.Group.decode(self, descriptor, dr) +# if self.type == "wire_adapter_class": +# z = 0 + 0 + return buffer ####################### DETECTOR FUNCTION ######################## @@ -1124,8 +1211,6 @@ def detector(context): ifc = context.interface devi = context.devInstance -# TODO: config decoder needed for pretty-printing WUSB descriptors - if dev: (clazz,subclazz,proto) = ( dev.bDeviceClass, @@ -1138,8 +1223,9 @@ def detector(context): # We hijack the default control decoder for wire adapter peripherals # so that we can do the control routing ourselves (better) if (clazz,subclazz,proto) == (239,2,2) and not ep: - devi.controlDecoder = ControlDispatcher(devi) - devi.endpointDecoders[0] = devi.controlDecoder + if devi.controlDecoder.__class__ != ControlDispatcher: + devi.controlDecoder = ControlDispatcher(devi) + devi.endpointDecoders[0] = devi.controlDecoder return # Cable-Based Association interfaces get their own special treatment -- To unsubscribe from this list: send the line "unsubscribe linux-usb" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html