[PATCH] Sim Access Profile test scripts

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Add simple SAP client python implementation and a test script.

To run test-sap-server you need Python 2.6 or newer (tested with 2.6 only)
and PyBluez package installed.
---
 Makefile.tools       |   17 +-
 test/sap-client      |  944 ++++++++++++++++++++++++++++++++++++++++++++++++++
 test/test-sap-server |  138 ++++++++
 3 files changed, 1091 insertions(+), 8 deletions(-)
 create mode 100644 test/sap-client
 create mode 100755 test/test-sap-server

diff --git a/Makefile.tools b/Makefile.tools
index 1a1f5a1..6808132 100644
--- a/Makefile.tools
+++ b/Makefile.tools
@@ -188,14 +188,15 @@ else
 EXTRA_DIST += test/rctest.1 test/hciemu.1 test/bdaddr.8
 endif
 
-EXTRA_DIST += test/apitest test/hsplay test/hsmicro test/dbusdef.py \
-		test/monitor-bluetooth test/list-devices test/test-discovery \
-		test/test-manager test/test-adapter test/test-device \
-		test/test-service test/test-serial test/test-telephony \
-		test/test-network test/simple-agent test/simple-service \
-		test/simple-endpoint test/test-audio test/test-input \
-		test/test-attrib test/service-record.dtd test/service-did.xml \
-		test/service-spp.xml test/service-opp.xml test/service-ftp.xml
+EXTRA_DIST += test/apitest test/sap-client test/hsplay test/hsmicro \
+		test/dbusdef.py test/monitor-bluetooth test/list-devices \
+		test/test-discovery test/test-manager test/test-adapter \
+		test/test-device test/test-service test/test-serial \
+		test/test-telephony test/test-network test/simple-agent \
+		test/simple-service test/simple-endpoint test/test-audio \
+		test/test-input test/test-attrib test/test-sapserver \
+		test/service-record.dtd test/service-did.xml test/service-spp.xml \
+		test/service-opp.xml test/service-ftp.xml
 
 
 if HIDD
diff --git a/test/sap-client b/test/sap-client
new file mode 100644
index 0000000..5a0c711
--- /dev/null
+++ b/test/sap-client
@@ -0,0 +1,944 @@
+""" Copyright (C) 2011 Tieto """
+
+""" Szymon Janc <szymon.janc@xxxxxxxxx> """
+""" Waldemar Rymarkiewicz <waldemar.rymarkiewicz@xxxxxxxxx> """
+
+""" This program is free software; you can redistribute it and/or modify """
+""" it under the terms of the GNU General Public License as published by """
+""" the Free Software Foundation; either version 2 of the License, or """
+""" (at your option) any later version. """
+
+""" This program is distributed in the hope that it will be useful, """
+""" but WITHOUT ANY WARRANTY; without even the implied warranty of """
+""" MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the """
+""" GNU General Public License for more details. """
+
+""" You should have received a copy of the GNU General Public License """
+""" along with this program; if not, write to the Free Software """
+""" Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """
+
+from array import array
+from bluetooth import *
+import time
+import re
+
+class SAPParam:
+    """ SAP Parameter Class """
+
+    MaxMsgSize = 0x00
+    ConnectionStatus = 0x01
+    ResultCode = 0x02
+    DisconnectionType = 0x03
+    CommandAPDU = 0x04
+    ResponseAPDU = 0x05
+    ATR = 0x06
+    CardReaderStatus = 0x07
+    StatusChange = 0x08
+    TransportProtocol = 0x09
+    CommandAPDU7816 = 0x10
+
+    def __init__(self, name, id, value = None):
+        self.name = name
+        self.id = id
+        self.value = value
+
+    def _padding(self,  buf):
+        pad = array('B')
+        while ( (len(buf) + len(pad)) % 4 ) != 0:
+            pad.append(0)
+        return pad
+
+    def _basicCheck(self,  buf):
+        if len(buf) < 4 or (len(buf) % 4) != 0 or buf[1] != 0:
+                return (-1,  -1)
+        if buf[0] != self.id:
+            return (-1,  -1)
+        plen = buf[2] * 256 + buf[3] + 4
+        if plen > len(buf):
+            return (-1,  -1)
+        pad = plen
+        while (pad % 4) != 0:
+            if buf[pad] != 0:
+                return (-1,  -1)
+            pad+=1
+        return (plen,  pad)
+
+    def getID(self):
+        return self.id
+
+    def getValue(self):
+        return self.value
+
+    def getContent(self):
+        return "%s(id=0x%.2X), value=%s \n" %  (self.name,  self.id, self.value)
+
+    def serialize(self):
+        a = array('B', '\00\00\00\00')
+        a[0] = self.id
+        a[1] = 0	# reserved
+        a[2] = 0	# length
+        a[3] = 1	# length
+        a.append(self.value)
+        a.extend(self._padding(a))
+        return a
+
+    def deserialize(self,  buf):
+        p = self._basicCheck(buf)
+        if p[0] == -1:
+            return -1
+        self.id = buf[0]
+        self.value = buf[4]
+        return p[1]
+
+
+class SAPParam_MaxMsgSize(SAPParam):
+    """MaxMsgSize Param """
+
+    def __init__(self,  value = None):
+        SAPParam.__init__(self,"MaxMsgSize",  SAPParam.MaxMsgSize, value)
+        self.__validate()
+
+    def __validate(self):
+        if self.value > 0xFFFF:
+             self.value = 0xFFFF
+
+    def serialize(self):
+        a = array('B', '\00\00\00\00')
+        a[0] = self.id
+        a[3] = 2
+        a.append(self.value / 256)
+        a.append(self.value % 256)
+        a.extend(self._padding(a))
+        return a
+
+    def deserialize(self,  buf):
+        p = self._basicCheck(buf)
+        if p[0] == -1 :
+            return -1
+        self.value = buf[4] * 256 + buf[5]
+        return p[1]
+
+class SAPParam_CommandAPDU(SAPParam):
+    def __init__(self,  value = None):
+        if value is None:
+            SAPParam.__init__(self, "CommandAPDU",  SAPParam.CommandAPDU, array('B'))
+        else:
+            SAPParam.__init__(self, "CommandAPDU",  SAPParam.CommandAPDU, array('B', value))
+
+    def serialize(self):
+        a = array('B', '\00\00\00\00')
+        a[0] = self.id
+        plen = len(self.value)
+        a[2] = plen / 256
+        a[3] = plen % 256
+        a.extend(self.value)
+        a.extend(self._padding(a))
+        return a
+
+    def deserialize(self,  buf):
+        p = self._basicCheck(buf)
+        if p[0] == -1:
+            return -1
+        self.value = buf[4:p[0]]
+        return p[1]
+
+class SAPParam_ResponseAPDU(SAPParam_CommandAPDU):
+    """ResponseAPDU Param """
+
+    def __init__(self,  value = None):
+        if value is None:
+            SAPParam.__init__(self, "ResponseAPDU",  SAPParam.ResponseAPDU, array('B'))
+        else:
+            SAPParam.__init__(self, "ResponseAPDU",  SAPParam.ResponseAPDU, array('B', value))
+
+class SAPParam_ATR(SAPParam_CommandAPDU):
+    """ATR Param """
+
+    def __init__(self,  value = None):
+        if value is None:
+            SAPParam.__init__(self, "ATR",  SAPParam.ATR, array('B'))
+        else:
+            SAPParam.__init__(self, "ATR",  SAPParam.ATR, array('B', value))
+
+class SAPParam_CommandAPDU7816(SAPParam_CommandAPDU):
+    """Command APDU7816 Param."""
+
+    def __init__(self,  value = None):
+        if value is None:
+            SAPParam.__init__(self, "CommandAPDU7816",  SAPParam.CommandAPDU7816, array('B'))
+        else:
+            SAPParam.__init__(self, "CommandAPDU7816",  SAPParam.CommandAPDU7816, array('B', value))
+
+
+class SAPParam_ConnectionStatus(SAPParam):
+    """Connection status Param."""
+
+    def __init__(self,  value = None):
+        SAPParam.__init__(self,"ConnectionStatus",  SAPParam.ConnectionStatus, value)
+        self.__validate()
+
+    def __validate(self):
+        if self.value is not None and self.value not in (0x00,  0x01,  0x02,  0x03,  0x04):
+            print "Warning. ConnectionStatus value in reserved range (0x%x)" % self.value
+
+    def deserialize(self,  buf):
+        ret = SAPParam.deserialize(self, buf)
+        if ret == -1:
+            return -1
+        self.__validate()
+        return ret
+
+class SAPParam_ResultCode(SAPParam):
+    """ Result Code Param """
+
+    def __init__(self,  value = None):
+        SAPParam.__init__(self,"ResultCode",  SAPParam.ResultCode, value)
+        self.__validate()
+
+    def __validate(self):
+        if self.value is not None and self.value not in (0x00,  0x01,  0x02,  0x03,  0x04,  0x05,  0x06,  0x07):
+            print "Warning. ResultCode value in reserved range (0x%x)" % self.value
+
+    def deserialize(self,  buf):
+        ret = SAPParam.deserialize(self, buf)
+        if ret == -1:
+            return -1
+        self.__validate()
+        return ret
+
+class SAPParam_DisconnectionType(SAPParam):
+    """Disconnection Type Param."""
+
+    def __init__(self,  value = None):
+        SAPParam.__init__(self,"DisconnectionType",  SAPParam.DisconnectionType, value)
+        self.__validate()
+
+    def __validate(self):
+        if self.value is not None and self.value not in (0x00,  0x01):
+            print "Warning. DisconnectionType value in reserved range (0x%x)" % self.value
+
+    def deserialize(self,  buf):
+        ret = SAPParam.deserialize(self, buf)
+        if ret == -1:
+            return -1
+        self.__validate()
+        return ret
+
+class SAPParam_CardReaderStatus(SAPParam_CommandAPDU):
+    """Card reader Status Param."""
+
+    def __init__(self,  value = None):
+        if value is None:
+            SAPParam.__init__(self, "CardReaderStatus",  SAPParam.CardReaderStatus, array('B'))
+        else:
+            SAPParam.__init__(self, "CardReaderStatus",  SAPParam.CardReaderStatus, array('B', value))
+
+class SAPParam_StatusChange(SAPParam):
+    """Status Change Param """
+
+    def __init__(self,  value = None):
+        SAPParam.__init__(self,"StatusChange",  SAPParam.StatusChange, value)
+
+    def __validate(self):
+        if self.value is not None and self.value not in (0x00,  0x01,  0x02,  0x03,  0x04,  0x05):
+            print "Warning. StatusChange value in reserved range (0x%x)" % self.value
+
+    def deserialize(self,  buf):
+        ret = SAPParam.deserialize(self, buf)
+        if ret == -1:
+            return -1
+        self.__validate()
+        return ret
+
+class SAPParam_TransportProtocol(SAPParam):
+    """Transport Protocol Param """
+
+    def __init__(self,  value = None):
+        SAPParam.__init__(self,"TransportProtocol",  SAPParam.TransportProtocol, value)
+        self.__validate()
+
+    def __validate(self):
+        if self.value is not None and self.value not in (0x00,  0x01):
+            print "Warning. TransportProtoco value in reserved range (0x%x)" % self.value
+
+    def deserialize(self,  buf):
+        ret = SAPParam.deserialize(self, buf)
+        if ret == -1:
+            return -1
+        self.__validate()
+        return ret
+
+class SAPMessage:
+
+    CONNECT_REQ = 0x00
+    CONNECT_RESP = 0x01
+    DISCONNECT_REQ = 0x02
+    DISCONNECT_RESP =0x03
+    DISCONNECT_IND = 0x04
+    TRANSFER_APDU_REQ = 0x05
+    TRANSFER_APDU_RESP = 0x06
+    TRANSFER_ATR_REQ = 0x07
+    TRANSFER_ATR_RESP = 0x08
+    POWER_SIM_OFF_REQ = 0x09
+    POWER_SIM_OFF_RESP = 0x0A
+    POWER_SIM_ON_REQ = 0x0B
+    POWER_SIM_ON_RESP = 0x0C
+    RESET_SIM_REQ = 0x0D
+    RESET_SIM_RESP = 0x0E
+    TRANSFER_CARD_READER_STATUS_REQ = 0x0F
+    TRANSFER_CARD_READER_STATUS_RESP = 0x10
+    STATUS_IND = 0x11
+    ERROR_RESP = 0x12
+    SET_TRANSPORT_PROTOCOL_REQ = 0x13
+    SET_TRANSPORT_PROTOCOL_RESP = 0x14
+
+    def __init__(self,  name,  id):
+        self.name = name
+        self.id = id
+        self.params = []
+        self.buf = array('B')
+
+    def _basicCheck(self,  buf):
+        if len(buf) < 4 or (len(buf) % 4) != 0 :
+            return False
+
+        if buf[0] != self.id:
+            return False
+
+        return True
+
+    def getID(self):
+        return self.id
+
+    def getContent(self):
+        s = "%s(id=0x%.2X) " % (self.name,  self.id)
+        if len( self.buf): s = s + "[%s]" % re.sub("(.{2})", "0x\\1 " , self.buf.tostring().encode("hex").upper(), re.DOTALL)
+        s = s + "\n\t"
+        for p in self.params:
+            s = s + "\t" + p.getContent()
+        return s
+
+    def getParams(self):
+        return self.params
+
+    def addParam(self,  param):
+        self.params.append(param)
+
+    def serialize(self):
+        ret = array('B', '\00\00\00\00')
+        ret[0] = self.id
+        ret[1] = len(self.params)
+        ret[2] = 0	# reserved
+        ret[3] = 0	# reserved
+        for p in self.params:
+            ret.extend(p.serialize())
+
+        self.buf = ret
+        return ret
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        return len(buf) == 4 and buf[1] == 0 and self._basicCheck(buf)
+
+
+class SAPMessage_CONNECT_REQ(SAPMessage):
+    def __init__(self,  MaxMsgSize = None):
+        SAPMessage.__init__(self,"CONNECT_REQ",  SAPMessage.CONNECT_REQ)
+        if MaxMsgSize is not None:
+            self.addParam(SAPParam_MaxMsgSize(MaxMsgSize))
+
+    def _validate(self):
+        if len(self.params) == 1:
+            if self.params[0].getID() == SAPParam.MaxMsgSize:
+                return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_MaxMsgSize()
+            if p.deserialize(buf[4:]) == len(buf[4:]):
+                self.addParam(p)
+                return self._validate()
+
+        return False
+
+class SAPMessage_CONNECT_RESP(SAPMessage):
+    def __init__(self,  ConnectionStatus = None,  MaxMsgSize = None):
+        SAPMessage.__init__(self,"CONNECT_RESP",  SAPMessage.CONNECT_RESP)
+        if ConnectionStatus is not None:
+            self.addParam(SAPParam_ConnectionStatus(ConnectionStatus))
+            if MaxMsgSize is not None:
+                self.addParam(SAPParam_MaxMsgSize(MaxMsgSize))
+
+    def _validate(self):
+        if len(self.params) > 0:
+            if self.params[0] .getID() == SAPParam.ConnectionStatus:
+                if self.params[0].getValue() ==  0x02:
+                    if len(self.params) == 2:
+                        return True
+                else:
+                    if len(self.params) == 1:
+                        return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_ConnectionStatus()
+            r = p.deserialize(buf[4:])
+            if  r != -1:
+                self.addParam(p)
+                if buf[1] == 2:
+                    p = SAPParam_MaxMsgSize()
+                    r = p.deserialize(buf[4+r:])
+                    if r != -1:
+                        self.addParam(p)
+
+                return self._validate()
+
+        return False
+
+class SAPMessage_DISCONNECT_REQ(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"DISCONNECT_REQ",  SAPMessage.DISCONNECT_REQ)
+
+class SAPMessage_DISCONNECT_RESP(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"DISCONNECT_RESP",  SAPMessage.DISCONNECT_RESP)
+
+class SAPMessage_DISCONNECT_IND(SAPMessage):
+    def __init__(self,  Type = None):
+        SAPMessage.__init__(self,"DISCONNECT_IND",  SAPMessage.DISCONNECT_IND)
+        if Type is not None:
+            self.addParam(SAPParam_DisconnectionType(Type))
+
+    def _validate(self):
+        if len(self.params) == 1:
+            if self.params[0].getID() == SAPParam.DisconnectionType:
+                return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_DisconnectionType()
+            if p.deserialize(buf[4:]) == len(buf[4:]):
+                self.addParam(p)
+                return self._validate()
+
+        return False
+
+
+class SAPMessage_TRANSFER_APDU_REQ(SAPMessage):
+    def __init__(self,  APDU = None,  T = False):
+        SAPMessage.__init__(self,"TRANSFER_APDU_REQ",  SAPMessage.TRANSFER_APDU_REQ)
+        if APDU is not None:
+            if T :
+                self.addParam(SAPParam_CommandAPDU(APDU))
+            else:
+                self.addParam(SAPParam_CommandAPDU7816(APDU))
+
+    def _validate(self):
+        if len(self.params) == 1:
+            if self.params[0].getID() == SAPParam.CommandAPDU or self.params[0].getID() == SAPParam.CommandAPDU7816:
+                return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+        if SAPMessage._basicCheck(self,  buf):
+
+            p = SAPParam_CommandAPDU()
+            p2 = SAPParam_CommandAPDU7816()
+            if p.deserialize(buf[4:]) == len(buf[4:]):
+                self.addParam(p)
+                return self._validate()
+            elif p2.deserialize(buf[4:]) == len(buf[4:]):
+                self.addParam(p2)
+                return self._validate()
+
+        return False
+
+class SAPMessage_TRANSFER_APDU_RESP(SAPMessage):
+    def __init__(self,  ResultCode = None,  Response = None):
+        SAPMessage.__init__(self,"TRANSFER_APDU_RESP",  SAPMessage.TRANSFER_APDU_RESP)
+        if ResultCode is not None:
+            self.addParam(SAPParam_ResultCode(ResultCode))
+            if Response is not None:
+                self.addParam(SAPParam_ResponseAPDU(Response))
+
+    def _validate(self):
+        if len(self.params) > 0:
+            if self.params[0] .getID() == SAPParam.ResultCode:
+                if self.params[0].getValue() == 0x00:
+                    if len(self.params) == 2:
+                        return True
+                else:
+                    if len(self.params) == 1:
+                        return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_ResultCode()
+            r = p.deserialize(buf[4:])
+            if  r != -1:
+                self.addParam(p)
+                if buf[1] == 2:
+                    p = SAPParam_ResponseAPDU()
+                    r = p.deserialize(buf[4+r:])
+                    if r != -1:
+                        self.addParam(p)
+
+                return self._validate()
+
+        return False
+
+class SAPMessage_TRANSFER_ATR_REQ(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"TRANSFER_ATR_REQ",  SAPMessage.TRANSFER_ATR_REQ)
+
+class SAPMessage_TRANSFER_ATR_RESP(SAPMessage):
+    def __init__(self,  ResultCode = None,  ATR = None):
+        SAPMessage.__init__(self,"TRANSFER_ATR_RESP",  SAPMessage.TRANSFER_ATR_RESP)
+        if ResultCode is not None:
+            self.addParam(SAPParam_ResultCode(ResultCode))
+            if ATR is not None:
+                self.addParam(SAPParam_ATR(ATR))
+
+    def _validate(self):
+        if len(self.params) > 0:
+            if self.params[0] .getID() == SAPParam.ResultCode:
+                if self.params[0].getValue() == 0x00:
+                    if len(self.params) == 2:
+                        return True
+                else:
+                    if len(self.params) == 1:
+                        return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+
+        if SAPMessage._basicCheck(self,  buf):
+
+            p = SAPParam_ResultCode()
+            r = p.deserialize(buf[4:])
+
+            if  r != -1:
+
+                self.addParam(p)
+                if buf[1] == 2:
+
+                    p = SAPParam_ATR()
+                    r = p.deserialize(buf[4+r:])
+                    if r != -1:
+                        self.addParam(p)
+
+                return self._validate()
+
+        return False
+
+class SAPMessage_POWER_SIM_OFF_REQ(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"POWER_SIM_OFF_REQ",  SAPMessage.POWER_SIM_OFF_REQ)
+
+class SAPMessage_POWER_SIM_OFF_RESP(SAPMessage):
+    def __init__(self,  ResultCode = None):
+        SAPMessage.__init__(self,"POWER_SIM_OFF_RESP",  SAPMessage.POWER_SIM_OFF_RESP)
+        if ResultCode is not None:
+            self.addParam(SAPParam_ResultCode(ResultCode))
+
+    def _validate(self):
+        if len(self.params) == 1:
+            if self.params[0].getID() == SAPParam.ResultCode:
+                return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_ResultCode()
+            if p.deserialize(buf[4:]) == len(buf[4:]):
+                self.addParam(p)
+                return self._validate()
+
+        return False
+
+class SAPMessage_POWER_SIM_ON_REQ(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"POWER_SIM_ON_REQ",  SAPMessage.POWER_SIM_ON_REQ)
+
+class SAPMessage_POWER_SIM_ON_RESP(SAPMessage_POWER_SIM_OFF_RESP):
+    def __init__(self,  ResultCode = None):
+        SAPMessage.__init__(self,"POWER_SIM_ON_RESP",  SAPMessage.POWER_SIM_ON_RESP)
+        if ResultCode is not None:
+            self.addParam(SAPParam_ResultCode(ResultCode))
+
+class SAPMessage_RESET_SIM_REQ(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"RESET_SIM_REQ",  SAPMessage.RESET_SIM_REQ)
+
+class SAPMessage_RESET_SIM_RESP(SAPMessage_POWER_SIM_OFF_RESP):
+    def __init__(self,  ResultCode = None):
+        SAPMessage.__init__(self,"RESET_SIM_RESP",  SAPMessage.RESET_SIM_RESP)
+        if ResultCode is not None:
+            self.addParam(SAPParam_ResultCode(ResultCode))
+
+class SAPMessage_STATUS_IND(SAPMessage):
+    def __init__(self,  StatusChange = None):
+        SAPMessage.__init__(self,"STATUS_IND",  SAPMessage.STATUS_IND)
+        if StatusChange is not None:
+            self.addParam(SAPParam_StatusChange(StatusChange))
+
+    def _validate(self):
+        if len(self.params) == 1:
+            if self.params[0].getID() == SAPParam.StatusChange:
+                return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_StatusChange()
+            if p.deserialize(buf[4:]) == len(buf[4:]):
+                self.addParam(p)
+                return self._validate()
+
+        return False
+
+class SAPMessage_TRANSFER_CARD_READER_STATUS_REQ(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"TRANSFER_CARD_READER_STATUS_REQ",  SAPMessage.TRANSFER_CARD_READER_STATUS_REQ)
+
+class SAPMessage_TRANSFER_CARD_READER_STATUS_RESP(SAPMessage):
+    def __init__(self,  ResultCode = None,  Status = None):
+        SAPMessage.__init__(self,"TRANSFER_CARD_READER_STATUS_RESP",  SAPMessage.TRANSFER_CARD_READER_STATUS_RESP)
+        if ResultCode is not None:
+            self.addParam(SAPParam_ResultCode(ResultCode))
+            if Status is not None:
+                self.addParam(SAPParam_CardReaderStatus(Status))
+
+    def _validate(self):
+        if len(self.params) > 0:
+            if self.params[0] .getID() == SAPParam.ResultCode:
+                if self.params[0].getValue() == 0x00:
+                    if len(self.params) == 2:
+                        return True
+                else:
+                    if len(self.params) == 1:
+                        return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_ResultCode()
+            r = p.deserialize(buf[4:])
+            if  r != -1:
+                self.addParam(p)
+                if buf[1] == 2:
+                    p = SAPParam_CardReaderStatus()
+                    r = p.deserialize(buf[4+r:])
+                    if r != -1:
+                        self.addParam(p)
+
+                return self._validate()
+
+        return False
+
+class SAPMessage_ERROR_RESP(SAPMessage):
+    def __init__(self):
+        SAPMessage.__init__(self,"ERROR_RESP",  SAPMessage.ERROR_RESP)
+
+
+class SAPMessage_SET_TRANSPORT_PROTOCOL_REQ(SAPMessage):
+    def __init__(self,  protocol = None):
+        SAPMessage.__init__(self,"SET_TRANSPORT_PROTOCOL_REQ",  SAPMessage.SET_TRANSPORT_PROTOCOL_REQ)
+        if protocol is not None:
+            self.addParam(SAPParam_TransportProtocol(protocol))
+
+    def _validate(self):
+        if len(self.params) == 1:
+            if self.params[0].getID() == SAPParam.TransportProtocol:
+                return True
+        return False
+
+    def deserialize(self,  buf):
+        self.buf = buf
+        self.params[:] = []
+        if SAPMessage._basicCheck(self,  buf):
+            p = SAPParam_TransportProtocol()
+            if p.deserialize(buf[4:]) == len(buf[4:]):
+                self.addParam(p)
+                return self._validate()
+
+        return False
+
+class SAPMessage_SET_TRANSPORT_PROTOCOL_RESP(SAPMessage_POWER_SIM_OFF_RESP):
+    def __init__(self,  ResultCode = None):
+        SAPMessage.__init__(self,"SET_TRANSPORT_PROTOCOL_RESP",  SAPMessage.SET_TRANSPORT_PROTOCOL_RESP)
+        if ResultCode is not None:
+            self.addParam(SAPParam_ResultCode(ResultCode))
+
+
+class SAPClient:
+
+    CONNECTED = 1
+    DISCONNECTED = 0
+
+    uuid = "0000112D-0000-1000-8000-00805F9B34FB"
+    bufsize = 1024
+    timeout = 20
+    state = DISCONNECTED
+
+    def __init__(self,  host = None,  port = None):
+        self.sock = None
+
+        if host is None or is_valid_address(host):
+            self.host = host
+        else:
+            raise BluetoothError ("%s is not a valid BT address." % host)
+            self.host = None
+            return
+
+        if port is None:
+            self.__discover()
+        else:
+            self.port = port
+
+        self.__connectRFCOMM()
+
+    def __del__(self):
+        self.__disconnectRFCOMM()
+
+    def __disconnectRFCOMM(self):
+        if self.sock is not None:
+            self.sock.close()
+            self.state = self.DISCONNECTED
+
+    def __discover(self):
+        service_matches = find_service(self.uuid, self.host)
+
+        if len(service_matches) == 0:
+            raise BluetoothError ("No SAP service found")
+            return
+
+        first_match = service_matches[0]
+        self.port = first_match["port"]
+        self.host = first_match["host"]
+
+        print "SAP Service found on %s(%s)" % first_match["name"] % self.host
+
+    def __connectRFCOMM(self):
+        self.sock=BluetoothSocket( RFCOMM )
+        self.sock.connect((self.host, self.port))
+        self.sock.settimeout(self.timeout)
+        self.state = self.CONNECTED
+
+    def __sendMsg(self, msg):
+        if isinstance(msg,  SAPMessage):
+            s = msg.serialize()
+            print "\tTX: " + msg.getContent()
+            return self.sock.send(s.tostring())
+
+    def __rcvMsg(self,  msg):
+        if isinstance(msg,  SAPMessage):
+            print "\tRX Wait: %s(id = 0x%.2x)" % (msg.name, msg.id)
+            data = self.sock.recv(self.bufsize)
+            if data:
+                if msg.deserialize(array('B',data)):
+                    print "\tRX: len(%d) %s" % (len(data), msg.getContent())
+                    return msg
+                else:
+                    print "msg: %s" % array('B',data)
+                    raise BluetoothError ("Message deserialization failed.")
+            else:
+                raise BluetoothError ("Timeout. No data received.")
+
+    def connect(self):
+        self.__connectRFCOMM()
+
+    def disconnect(self):
+        self.__disconnectRFCOMM()
+
+    def isConnected(self):
+        return self.state
+
+    def proc_connect(self):
+        try:
+            self.__sendMsg(SAPMessage_CONNECT_REQ(self.bufsize))
+            params = self.__rcvMsg(SAPMessage_CONNECT_RESP()).getParams()
+
+            if params[0].getValue() in (0x00,  0x04):
+                pass
+            elif params[0].getValue() == 0x02:
+                self.bufsize = params[1].getValue()
+
+                self.__sendMsg(SAPMessage_CONNECT_REQ(self.bufsize))
+                params = self.__rcvMsg(SAPMessage_CONNECT_RESP()).getParams()
+
+                if params[0].getValue() not in (0x00,  0x04):
+                    return False
+            else:
+                return False
+
+            params = self.__rcvMsg(SAPMessage_STATUS_IND()).getParams()
+            if params[0].getValue() == 0x00:
+                return False
+            elif params[0].getValue() == 0x01:
+                """OK, Card reset"""
+                return self.proc_transferATR()
+            elif params[0].getValue() == 0x02:
+                """T0 not supported"""
+                if self.proc_transferATR():
+                    return self.proc_setTransportProtocol(1)
+                else:
+                    return False
+            else:
+                return False
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_disconnectByClient(self, timeout=0):
+        try:
+            self.__sendMsg(SAPMessage_DISCONNECT_REQ())
+            self.__rcvMsg(SAPMessage_DISCONNECT_RESP())
+            time.sleep(timeout) # let srv to close rfcomm
+            self.__disconnectRFCOMM()
+            return True
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_disconnectByServer(self, timeout=0):
+        try:
+            params = self.__rcvMsg(SAPMessage_DISCONNECT_IND()).getParams()
+
+            """gracefull"""
+            if params[0].getValue() == 0x00:
+                if not self.proc_transferAPDU():
+                    return False
+
+            return self.proc_disconnectByClient(timeout)
+
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_transferAPDU(self,  apdu = "Sample APDU command"):
+        try:
+            self.__sendMsg(SAPMessage_TRANSFER_APDU_REQ(apdu))
+            params = self.__rcvMsg(SAPMessage_TRANSFER_APDU_RESP()).getParams()
+            return True
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_transferATR(self):
+        try:
+            self.__sendMsg(SAPMessage_TRANSFER_ATR_REQ())
+            params = self.__rcvMsg(SAPMessage_TRANSFER_ATR_RESP()).getParams()
+            return True
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_powerSimOff(self):
+        try:
+            self.__sendMsg(SAPMessage_POWER_SIM_OFF_REQ())
+            params = self.__rcvMsg(SAPMessage_POWER_SIM_OFF_RESP()).getParams()
+            return True
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_powerSimOn(self):
+        try:
+            self.__sendMsg(SAPMessage_POWER_SIM_ON_REQ())
+            params = self.__rcvMsg(SAPMessage_POWER_SIM_ON_RESP()).getParams()
+            if params[0].getValue() == 0x00:
+                return self.proc_transferATR()
+
+            return True
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_resetSim(self):
+        try:
+            self.__sendMsg(SAPMessage_RESET_SIM_REQ())
+            params = self.__rcvMsg(SAPMessage_RESET_SIM_RESP()).getParams()
+            if params[0].getValue() == 0x00:
+                return self.proc_transferATR()
+
+            return True
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_reportStatus(self):
+        try:
+            params = self.__rcvMsg(SAPMessage_STATUS_IND()).getParams()
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_transferCardReaderStatus(self):
+        try:
+            self.__sendMsg(SAPMessage_TRANSFER_CARD_READER_STATUS_REQ())
+            params = self.__rcvMsg(SAPMessage_TRANSFER_CARD_READER_STATUS_RESP()).getParams()
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_errorResponse(self):
+        try:
+            """ send malformed message, no mandatory maxmsgsize parameter"""
+            self.__sendMsg(SAPMessage_CONNECT_REQ())
+
+            params = self.__rcvMsg(SAPMessage_ERROR_RESP()).getParams()
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+    def proc_setTransportProtocol(self,  protocol = 0):
+        try:
+            self.__sendMsg(SAPMessage_SET_TRANSPORT_PROTOCOL_REQ(protocol))
+            params = self.__rcvMsg(SAPMessage_SET_TRANSPORT_PROTOCOL_RESP()).getParams()
+
+            if params[0].getValue() == 0x00:
+                params = self.__rcvMsg(SAPMessage_STATUS_IND()).getParams()
+                if params[0].getValue() in (0x01,  0x02):
+                    return self.proc_transferATR()
+                else:
+                    return True
+                    """return False ???"""
+            elif params[0].getValue == 0x07:
+                """not supported"""
+                return True
+                """return False ???"""
+            else:
+                return False
+
+        except BluetoothError , e:
+            print "Error. " +str(e)
+            return False
+
+if __name__ == "__main__":
+    pass
diff --git a/test/test-sap-server b/test/test-sap-server
new file mode 100755
index 0000000..bea6ca9
--- /dev/null
+++ b/test/test-sap-server
@@ -0,0 +1,138 @@
+#!/usr/bin/python
+
+from sap import *
+import time
+
+def connect_disconnect_by_client(sap):
+
+    print "[Test] Connect - Disconnect by client \n"
+
+    try:
+        if not sap.isConnected():
+           sap.connect()
+
+        if sap.proc_connect():
+            if sap.proc_disconnectByClient():
+                print "OK"
+                return 0
+
+        print "NOT OK"
+        return 1
+
+    except BluetoothError , e:
+        print "Error " + str(e)
+
+
+def connect_disconnect_by_server_gracefully(sap, timeout=0):
+
+    print "[Test] Connect - Disconnect by server with timer \n"
+
+    try:
+        if not sap.isConnected():
+           sap.connect()
+
+        if sap.proc_connect():
+            if sap.proc_disconnectByServer(timeout):
+                print "OK"
+                return 0
+
+        print "NOT OK"
+        return 1
+
+    except BluetoothError , e:
+        print "Error " + str(e)
+
+
+def connect_txAPDU_disconnect_by_client(sap):
+
+    print "[Test] Connect - TX APDU - Disconnect by client \n"
+
+    try:
+        if not sap.isConnected():
+           sap.connect()
+
+        if sap.proc_connect():
+            if not sap.proc_transferAPDU():
+                print "NOT OK 1"
+                return 1
+
+            if not sap.proc_transferAPDU():
+                print "NOT OK 2"
+                return 1
+
+            if not sap.proc_transferAPDU():
+                print "NOT OK 3"
+                return 1
+
+            if not sap.proc_transferAPDU():
+                print "NOT OK 4"
+                return 1
+
+            if sap.proc_disconnectByClient():
+                print "OK"
+                return 0
+
+        print "NOT OK"
+        return 1
+
+    except BluetoothError , e:
+        print "Error " + str(e)
+
+def connect_rfcomm_only_and_wait_for_close_by_server(sap):
+
+    print "[Test] Connect rfcomm only  - Disconnect by server timeout \n"
+
+    if not sap.isConnected():
+       sap.connect()
+
+    time.sleep(40)
+    print "OK"
+
+def power_sim_off_on(sap):
+
+    print "[Test] Powe sim off \n"
+
+    try:
+        if not sap.isConnected():
+           sap.connect()
+
+        if sap.proc_connect():
+            if not sap.proc_resetSim():
+                print "NOT OK"
+                return 1
+
+            if not sap.proc_powerSimOff():
+                print "NOT OK"
+                return 1
+
+            if not sap.proc_powerSimOn():
+                print "NOT OK"
+                return 1
+
+            if sap.proc_disconnectByClient():
+                print "OK"
+                return 0
+
+        print "NOT OK"
+        return 1
+
+    except BluetoothError , e:
+        print "Error " + str(e)
+
+
+if __name__ == "__main__":
+
+    host = "00:00:00:00:00:0"  # server bd_addr
+    port = 8  # sap server port
+
+    try:
+        s = SAPClient(host, port)
+    except BluetoothError , e:
+        print "Error " + str(e)
+
+    connect_disconnect_by_client(s)
+    connect_disconnect_by_server_gracefully(s)
+    connect_disconnect_by_server_gracefully(s, 40)  #  wait 40 sec for srv to close rfcomm sock
+    connect_rfcomm_only_and_wait_for_close_by_server(s)
+    connect_txAPDU_disconnect_by_client(s)
+    power_sim_off_on(s)
-- 
1.7.1

--
To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux