add tests for x224 layer

This commit is contained in:
speyrefitte
2014-11-04 16:32:40 +01:00
parent 71d5fc5344
commit be77dfb92f
5 changed files with 312 additions and 55 deletions

View File

@@ -756,13 +756,13 @@ class String(Type, CallableValue):
self._unicode = unicode
self._until = until
def __eq__(self, other):
def __cmp__(self, other):
"""
@summary: call raw compare value
@param other: other String parameter
@return: if two inner value are equals
"""
return self.value == other.value
return cmp(self.value, other.value)
def __hash__(self):
"""
@@ -804,7 +804,7 @@ class String(Type, CallableValue):
"""
if self._readLen is None:
if self._until is None:
self.value = s.getvalue()
self.value = s.getvalue()[s.pos:]
else:
self.value = ""
while self.value[-len(self._until):] != self._until or s.dataLen() == 0:

View File

@@ -30,7 +30,7 @@ from rdpy.base.error import InvalidExpectedDataException
class MessageType(object):
"""
Message type
@summary: Message type
"""
X224_TPDU_CONNECTION_REQUEST = 0xE0
X224_TPDU_CONNECTION_CONFIRM = 0xD0
@@ -40,7 +40,7 @@ class MessageType(object):
class NegociationType(object):
"""
Negotiation header
@summary: Negotiation header
"""
TYPE_RDP_NEG_REQ = 0x01
TYPE_RDP_NEG_RSP = 0x02
@@ -48,7 +48,7 @@ class NegociationType(object):
class Protocols(object):
"""
Protocols available for x224 layer
@summary: Protocols available for x224 layer
"""
PROTOCOL_RDP = 0x00000000
PROTOCOL_SSL = 0x00000001
@@ -57,7 +57,7 @@ class Protocols(object):
class NegotiationFailureCode(object):
"""
Protocol negotiation failure code
@summary: Protocol negotiation failure code
"""
SSL_REQUIRED_BY_SERVER = 0x00000001
SSL_NOT_ALLOWED_BY_SERVER = 0x00000002
@@ -68,8 +68,8 @@ class NegotiationFailureCode(object):
class ClientConnectionRequestPDU(CompositeType):
"""
Connection request
client -> server
@summary: Connection request
client -> server
@see: http://msdn.microsoft.com/en-us/library/cc240470.aspx
"""
def __init__(self):
@@ -83,7 +83,7 @@ class ClientConnectionRequestPDU(CompositeType):
class ServerConnectionConfirm(CompositeType):
"""
Server response
@summary: Server response
@see: http://msdn.microsoft.com/en-us/library/cc240506.aspx
"""
def __init__(self):
@@ -96,7 +96,7 @@ class ServerConnectionConfirm(CompositeType):
class X224DataHeader(CompositeType):
"""
Header send when x224 exchange application data
@summary: Header send when x224 exchange application data
"""
def __init__(self):
CompositeType.__init__(self)
@@ -106,7 +106,7 @@ class X224DataHeader(CompositeType):
class Negotiation(CompositeType):
"""
Negociate request message
@summary: Negociate request message
@see: request -> http://msdn.microsoft.com/en-us/library/cc240500.aspx
@see: response -> http://msdn.microsoft.com/en-us/library/cc240506.aspx
@see: failure ->http://msdn.microsoft.com/en-us/library/cc240507.aspx
@@ -122,8 +122,8 @@ class Negotiation(CompositeType):
class X224Layer(LayerAutomata, IStreamSender):
"""
x224 layer management
there is an connection automata
@summary: x224 layer management
there is an connection automata
"""
def __init__(self, presentation):
"""
@@ -139,8 +139,8 @@ class X224Layer(LayerAutomata, IStreamSender):
def recvData(self, data):
"""
Read data header from packet
And pass to presentation layer
@summary: Read data header from packet
And pass to presentation layer
@param data: Stream
"""
header = X224DataHeader()
@@ -149,15 +149,15 @@ class X224Layer(LayerAutomata, IStreamSender):
def send(self, message):
"""
Write message packet for TPDU layer
Add TPDU header
@summary: Write message packet for TPDU layer
Add TPDU header
@param message: network.Type message
"""
self._transport.send((X224DataHeader(), message))
class Client(X224Layer):
"""
Client automata of TPDU layer
@summary: Client automata of TPDU layer
"""
def __init__(self, presentation):
"""
@@ -167,14 +167,14 @@ class Client(X224Layer):
def connect(self):
"""
Connection request for client send a connection request packet
@summary: Connection request for client send a connection request packet
"""
self.sendConnectionRequest()
def sendConnectionRequest(self):
"""
Write connection request message
Next state is recvConnectionConfirm
@summary: Write connection request message
Next state is recvConnectionConfirm
@see: http://msdn.microsoft.com/en-us/library/cc240500.aspx
"""
message = ClientConnectionRequestPDU()
@@ -185,9 +185,9 @@ class Client(X224Layer):
def recvConnectionConfirm(self, data):
"""
Receive connection confirm message
Next state is recvData
Call connect on presentation layer if all is good
@summary: Receive connection confirm message
Next state is recvData
Call connect on presentation layer if all is good
@param data: Stream that contain connection confirm
@see: response -> http://msdn.microsoft.com/en-us/library/cc240506.aspx
@see: failure ->http://msdn.microsoft.com/en-us/library/cc240507.aspx
@@ -218,7 +218,7 @@ class Client(X224Layer):
class Server(X224Layer):
"""
Server automata of X224 layer
@summary: Server automata of X224 layer
"""
def __init__(self, presentation, privateKeyFileName, certificateFileName):
"""
@@ -233,14 +233,14 @@ class Server(X224Layer):
def connect(self):
"""
Connection request for server wait connection request packet from client
@summary: Connection request for server wait connection request packet from client
"""
self.setNextState(self.recvConnectionRequest)
def recvConnectionRequest(self, data):
"""
Read connection confirm packet
Next state is send connection confirm
@summary: Read connection confirm packet
Next state is send connection confirm
@param data: Stream
@see : http://msdn.microsoft.com/en-us/library/cc240470.aspx
"""
@@ -265,9 +265,9 @@ class Server(X224Layer):
def sendConnectionConfirm(self):
"""
Write connection confirm message
Start TLS connection
Next state is recvData
@summary: Write connection confirm message
Start TLS connection
Next state is recvData
@see : http://msdn.microsoft.com/en-us/library/cc240501.aspx
"""
message = ServerConnectionConfirm()
@@ -286,7 +286,7 @@ from OpenSSL import SSL
class ClientTLSContext(ssl.ClientContextFactory):
"""
client context factory for open ssl
@summary: client context factory for open ssl
"""
def getContext(self):
context = SSL.Context(SSL.TLSv1_METHOD)
@@ -296,7 +296,7 @@ class ClientTLSContext(ssl.ClientContextFactory):
class ServerTLSContext(ssl.DefaultOpenSSLContextFactory):
"""
Server context factory for open ssl
@summary: Server context factory for open ssl
@param privateKeyFileName: Name of a file containing a private key
@param certificateFileName: Name of a file containing a certificate
"""

View File

@@ -29,21 +29,21 @@ import unittest
import rdpy.network.layer
class LayerCase(unittest.TestCase):
'''
represent test case for all classes and function
present in rdpy.network.layer
'''
"""
@summary: represent test case for all classes and function
present in rdpy.network.layer
"""
class LayerCaseException(Exception):
'''
exception use for event base test
'''
"""
@summary: exception use for event base test
"""
pass
def test_layer_connect_event(self):
'''
test if connect event is send from transport to presentation
'''
"""
@summary: test if connect event is send from transport to presentation
"""
class TestConnect(rdpy.network.layer.Layer):
def connect(self):
raise LayerCase.LayerCaseException()
@@ -51,9 +51,9 @@ class LayerCase(unittest.TestCase):
self.assertRaises(LayerCase.LayerCaseException, rdpy.network.layer.Layer(presentation = TestConnect()).connect)
def test_layer_automata_more_than_expected(self):
'''
test layer automata mechanism if data received is more than expected
'''
"""
@summary: test layer automata mechanism if data received is more than expected
"""
class TestAutomata(rdpy.network.layer.RawLayer):
def expectedCallBack(self, data):
if data.dataLen() == 4:
@@ -64,9 +64,9 @@ class LayerCase(unittest.TestCase):
self.assertRaises(LayerCase.LayerCaseException, t.dataReceived, "\x00\x00\x00\x00\x00")
def test_layer_automata_less_than_expected(self):
'''
test layer automata mechanism
'''
"""
@summary: test layer automata mechanism
"""
class TestAutomata(rdpy.network.layer.RawLayer):
def expectedCallBack(self, data):
if data.dataLen() == 4:

View File

@@ -26,10 +26,267 @@ import os, sys
sys.path.insert(1, os.path.join(sys.path[0], '..'))
import unittest
import rdpy.protocol.rdp.x224 as x224
import rdpy.network.type as type
import rdpy.base.error as error
class X224Case(unittest.TestCase):
'''
test case for x224 layer (RDP)
'''
def test_x224_client(self):
"""
@summary: test case for x224 layer (RDP)
"""
class X224_PASS(Exception):
"""
@summary: for OK tests
"""
pass
class X224_FAIL(Exception):
"""
@summary: for KO tests
"""
pass
def test_x224_layer_recvData(self):
"""
@summary: unit test for X224Layer.recvData function
"""
class Presentation(object):
def recv(self, data):
data.readType(type.String('\x01\x02', constant = True))
raise X224Case.X224_PASS()
layer = x224.X224Layer(Presentation())
s = type.Stream()
s.writeType((x224.X224DataHeader(), type.String('\x01\x02')))
#reinit position
s.pos = 0
self.assertRaises(X224Case.X224_PASS, layer.recvData, s)
def test_x224_layer_send(self):
"""
@summary: unit test for X224Layer.send function
"""
class Transport(object):
def send(self, data):
s = type.Stream()
s.writeType(data)
s.pos = 0
s.readType(x224.X224DataHeader())
s.readType(type.String('\x01\x02', constant = True))
raise X224Case.X224_PASS()
layer = x224.X224Layer(None)
layer._transport = Transport()
self.assertRaises(X224Case.X224_PASS, layer.send, type.String('\x01\x02'))
def test_x224_client_connect(self):
"""
@summary: unit test for X224Client.connect and sendConnectionRequest function
"""
class Transport(object):
def send(self, data):
s = type.Stream()
s.writeType(data)
s.pos = 0
t = x224.ClientConnectionRequestPDU()
s.readType(t)
if t.protocolNeg.code != x224.NegociationType.TYPE_RDP_NEG_REQ:
raise X224Case.X224_FAIL()
if t.protocolNeg.selectedProtocol.value != x224.Protocols.PROTOCOL_SSL:
raise X224Case.X224_FAIL()
def nextAutomata(data):
raise X224Case.X224_PASS()
layer = x224.Client(None)
layer._transport = Transport()
layer.recvConnectionConfirm = nextAutomata
layer.connect()
self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02'))
def test_x224_client_recvConnectionConfirm_negotiation_old(self):
"""
@summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function
whithout protocol negotiation (doesn't support)
"""
message = x224.ServerConnectionConfirm()
del message._typeName[message._typeName.index("protocolNeg")]
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Client(None)
self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s)
def test_x224_client_recvConnectionConfirm_negotiation_failure(self):
"""
@summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function
check negotiation failure
"""
message = x224.ServerConnectionConfirm()
message.protocolNeg.code.value = x224.NegociationType.TYPE_RDP_NEG_FAILURE
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Client(None)
self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s)
def test_x224_client_recvConnectionConfirm_negotiation_bad_protocol(self):
"""
@summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function
Server ask another protocol than SSL
"""
message = x224.ServerConnectionConfirm()
message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_RDP
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Client(None)
self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s)
def test_x224_client_recvConnectionConfirm_ok(self):
"""
@summary: nominal case of protocol negotiation
"""
global tls_begin, presentation_connect
tls_begin = False
presentation_connect = False
class Transport(object):
def __init__(self):
class TLSTransport(object):
def startTLS(self, context):
global tls_begin
tls_begin = True
self.transport = TLSTransport()
class Presentation(object):
def connect(self):
global presentation_connect
presentation_connect = True
def recvData(data):
raise X224Case.X224_PASS()
message = x224.ServerConnectionConfirm()
message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_SSL
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Client(Presentation())
layer._transport = Transport()
layer.recvData = recvData
layer.recvConnectionConfirm(s)
self.assertTrue(tls_begin, "TLS is not started")
self.assertTrue(presentation_connect, "connect event is not forwarded")
self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02'))
def test_x224_server_recvConnectionRequest_invalid_old_client(self):
"""
@summary: unit test for X224Server.recvConnectionRequest function
old client with non protocol neg
"""
message = x224.ClientConnectionRequestPDU()
del message._typeName[message._typeName.index("protocolNeg")]
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Server(None, "key", "cert")
layer.connect()
self.assertRaises(error.InvalidExpectedDataException, layer.recv, s)
def test_x224_server_recvConnectionRequest_invalid_protocol_neg_failure(self):
"""
@summary: unit test for X224Server.recvConnectionRequest function
"""
message = x224.ClientConnectionRequestPDU()
message.protocolNeg.code.value = x224.NegociationType.TYPE_RDP_NEG_FAILURE
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Server(None, "key", "cert")
layer.connect()
self.assertRaises(error.InvalidExpectedDataException, layer.recv, s)
def test_x224_server_recvConnectionRequest_client_accept_ssl(self):
"""
@summary: unit test for X224Server.recvConnectionRequest function
test client doesn't support TLS case
"""
class Transport(object):
def send(self, data):
if not isinstance(data, x224.ServerConnectionConfirm):
raise X224Case.X224_FAIL()
if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_FAILURE or data.protocolNeg.failureCode.value != x224.NegotiationFailureCode.SSL_REQUIRED_BY_SERVER:
raise X224Case.X224_FAIL()
message = x224.ClientConnectionRequestPDU()
message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_HYBRID
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Server(None, "key", "cert")
layer._transport = Transport()
layer.connect()
self.assertRaises(error.InvalidExpectedDataException, layer.recv, s)
def test_x224_server_recvConnectionRequest_valid(self):
"""
@summary: unit test for X224Server.recvConnectionRequest function
"""
global tls, connect_event
tls = False
connect_event = False
class ServerTLSContext(object):
def __init__(self, key, cert):
pass
x224.ServerTLSContext = ServerTLSContext
class Transport(object):
def __init__(self):
class TLS(object):
def startTLS(self, context):
global tls
tls = True
self.transport = TLS()
def send(self, data):
if not isinstance(data, x224.ServerConnectionConfirm):
raise X224Case.X224_FAIL()
if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_RSP or data.protocolNeg.selectedProtocol.value != x224.Protocols.PROTOCOL_SSL:
raise X224Case.X224_FAIL()
class Presentation(object):
def connect(self):
global connect_event
connect_event = True
message = x224.ClientConnectionRequestPDU()
message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_SSL | x224.Protocols.PROTOCOL_RDP
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Server(Presentation(), "key", "cert")
layer._transport = Transport()
layer.connect()
layer.recvConnectionRequest(s)
self.assertTrue(tls, "TLS not started")
self.assertTrue(connect_event, "connect event not forwarded")

View File

@@ -29,7 +29,7 @@ import unittest
class RfbCase(unittest.TestCase):
'''
test casefor rfb layer (vnc)
test case for rfb layer (vnc)
'''
def testName(self):
pass