From be77dfb92f21e16976e77bcf46de44afdd5047de Mon Sep 17 00:00:00 2001 From: speyrefitte Date: Tue, 4 Nov 2014 16:32:40 +0100 Subject: [PATCH] add tests for x224 layer --- rdpy/network/type.py | 6 +- rdpy/protocol/rdp/x224.py | 62 ++++---- test/test_network_layer.py | 32 ++-- test/test_protocol_rdp_x224.py | 265 ++++++++++++++++++++++++++++++++- test/test_protocol_rfb_rfb.py | 2 +- 5 files changed, 312 insertions(+), 55 deletions(-) diff --git a/rdpy/network/type.py b/rdpy/network/type.py index c6fa7bb..a187fbe 100644 --- a/rdpy/network/type.py +++ b/rdpy/network/type.py @@ -756,13 +756,13 @@ class String(Type, CallableValue): self._unicode = unicode self._until = until - def __eq__(self, other): + def __cmp__(self, other): """ @summary: call raw compare value @param other: other String parameter @return: if two inner value are equals """ - return self.value == other.value + return cmp(self.value, other.value) def __hash__(self): """ @@ -804,7 +804,7 @@ class String(Type, CallableValue): """ if self._readLen is None: if self._until is None: - self.value = s.getvalue() + self.value = s.getvalue()[s.pos:] else: self.value = "" while self.value[-len(self._until):] != self._until or s.dataLen() == 0: diff --git a/rdpy/protocol/rdp/x224.py b/rdpy/protocol/rdp/x224.py index 4d55630..ca773f3 100644 --- a/rdpy/protocol/rdp/x224.py +++ b/rdpy/protocol/rdp/x224.py @@ -30,7 +30,7 @@ from rdpy.base.error import InvalidExpectedDataException class MessageType(object): """ - Message type + @summary: Message type """ X224_TPDU_CONNECTION_REQUEST = 0xE0 X224_TPDU_CONNECTION_CONFIRM = 0xD0 @@ -40,7 +40,7 @@ class MessageType(object): class NegociationType(object): """ - Negotiation header + @summary: Negotiation header """ TYPE_RDP_NEG_REQ = 0x01 TYPE_RDP_NEG_RSP = 0x02 @@ -48,7 +48,7 @@ class NegociationType(object): class Protocols(object): """ - Protocols available for x224 layer + @summary: Protocols available for x224 layer """ PROTOCOL_RDP = 0x00000000 PROTOCOL_SSL = 0x00000001 @@ -57,7 +57,7 @@ class Protocols(object): class NegotiationFailureCode(object): """ - Protocol negotiation failure code + @summary: Protocol negotiation failure code """ SSL_REQUIRED_BY_SERVER = 0x00000001 SSL_NOT_ALLOWED_BY_SERVER = 0x00000002 @@ -68,8 +68,8 @@ class NegotiationFailureCode(object): class ClientConnectionRequestPDU(CompositeType): """ - Connection request - client -> server + @summary: Connection request + client -> server @see: http://msdn.microsoft.com/en-us/library/cc240470.aspx """ def __init__(self): @@ -83,7 +83,7 @@ class ClientConnectionRequestPDU(CompositeType): class ServerConnectionConfirm(CompositeType): """ - Server response + @summary: Server response @see: http://msdn.microsoft.com/en-us/library/cc240506.aspx """ def __init__(self): @@ -96,7 +96,7 @@ class ServerConnectionConfirm(CompositeType): class X224DataHeader(CompositeType): """ - Header send when x224 exchange application data + @summary: Header send when x224 exchange application data """ def __init__(self): CompositeType.__init__(self) @@ -106,7 +106,7 @@ class X224DataHeader(CompositeType): class Negotiation(CompositeType): """ - Negociate request message + @summary: Negociate request message @see: request -> http://msdn.microsoft.com/en-us/library/cc240500.aspx @see: response -> http://msdn.microsoft.com/en-us/library/cc240506.aspx @see: failure ->http://msdn.microsoft.com/en-us/library/cc240507.aspx @@ -122,8 +122,8 @@ class Negotiation(CompositeType): class X224Layer(LayerAutomata, IStreamSender): """ - x224 layer management - there is an connection automata + @summary: x224 layer management + there is an connection automata """ def __init__(self, presentation): """ @@ -139,8 +139,8 @@ class X224Layer(LayerAutomata, IStreamSender): def recvData(self, data): """ - Read data header from packet - And pass to presentation layer + @summary: Read data header from packet + And pass to presentation layer @param data: Stream """ header = X224DataHeader() @@ -149,15 +149,15 @@ class X224Layer(LayerAutomata, IStreamSender): def send(self, message): """ - Write message packet for TPDU layer - Add TPDU header + @summary: Write message packet for TPDU layer + Add TPDU header @param message: network.Type message """ self._transport.send((X224DataHeader(), message)) class Client(X224Layer): """ - Client automata of TPDU layer + @summary: Client automata of TPDU layer """ def __init__(self, presentation): """ @@ -167,14 +167,14 @@ class Client(X224Layer): def connect(self): """ - Connection request for client send a connection request packet + @summary: Connection request for client send a connection request packet """ self.sendConnectionRequest() def sendConnectionRequest(self): """ - Write connection request message - Next state is recvConnectionConfirm + @summary: Write connection request message + Next state is recvConnectionConfirm @see: http://msdn.microsoft.com/en-us/library/cc240500.aspx """ message = ClientConnectionRequestPDU() @@ -185,9 +185,9 @@ class Client(X224Layer): def recvConnectionConfirm(self, data): """ - Receive connection confirm message - Next state is recvData - Call connect on presentation layer if all is good + @summary: Receive connection confirm message + Next state is recvData + Call connect on presentation layer if all is good @param data: Stream that contain connection confirm @see: response -> http://msdn.microsoft.com/en-us/library/cc240506.aspx @see: failure ->http://msdn.microsoft.com/en-us/library/cc240507.aspx @@ -218,7 +218,7 @@ class Client(X224Layer): class Server(X224Layer): """ - Server automata of X224 layer + @summary: Server automata of X224 layer """ def __init__(self, presentation, privateKeyFileName, certificateFileName): """ @@ -233,14 +233,14 @@ class Server(X224Layer): def connect(self): """ - Connection request for server wait connection request packet from client + @summary: Connection request for server wait connection request packet from client """ self.setNextState(self.recvConnectionRequest) def recvConnectionRequest(self, data): """ - Read connection confirm packet - Next state is send connection confirm + @summary: Read connection confirm packet + Next state is send connection confirm @param data: Stream @see : http://msdn.microsoft.com/en-us/library/cc240470.aspx """ @@ -265,9 +265,9 @@ class Server(X224Layer): def sendConnectionConfirm(self): """ - Write connection confirm message - Start TLS connection - Next state is recvData + @summary: Write connection confirm message + Start TLS connection + Next state is recvData @see : http://msdn.microsoft.com/en-us/library/cc240501.aspx """ message = ServerConnectionConfirm() @@ -286,7 +286,7 @@ from OpenSSL import SSL class ClientTLSContext(ssl.ClientContextFactory): """ - client context factory for open ssl + @summary: client context factory for open ssl """ def getContext(self): context = SSL.Context(SSL.TLSv1_METHOD) @@ -296,7 +296,7 @@ class ClientTLSContext(ssl.ClientContextFactory): class ServerTLSContext(ssl.DefaultOpenSSLContextFactory): """ - Server context factory for open ssl + @summary: Server context factory for open ssl @param privateKeyFileName: Name of a file containing a private key @param certificateFileName: Name of a file containing a certificate """ diff --git a/test/test_network_layer.py b/test/test_network_layer.py index 4bf5e69..01b7834 100644 --- a/test/test_network_layer.py +++ b/test/test_network_layer.py @@ -29,21 +29,21 @@ import unittest import rdpy.network.layer class LayerCase(unittest.TestCase): - ''' - represent test case for all classes and function - present in rdpy.network.layer - ''' + """ + @summary: represent test case for all classes and function + present in rdpy.network.layer + """ class LayerCaseException(Exception): - ''' - exception use for event base test - ''' + """ + @summary: exception use for event base test + """ pass def test_layer_connect_event(self): - ''' - test if connect event is send from transport to presentation - ''' + """ + @summary: test if connect event is send from transport to presentation + """ class TestConnect(rdpy.network.layer.Layer): def connect(self): raise LayerCase.LayerCaseException() @@ -51,9 +51,9 @@ class LayerCase(unittest.TestCase): self.assertRaises(LayerCase.LayerCaseException, rdpy.network.layer.Layer(presentation = TestConnect()).connect) def test_layer_automata_more_than_expected(self): - ''' - test layer automata mechanism if data received is more than expected - ''' + """ + @summary: test layer automata mechanism if data received is more than expected + """ class TestAutomata(rdpy.network.layer.RawLayer): def expectedCallBack(self, data): if data.dataLen() == 4: @@ -64,9 +64,9 @@ class LayerCase(unittest.TestCase): self.assertRaises(LayerCase.LayerCaseException, t.dataReceived, "\x00\x00\x00\x00\x00") def test_layer_automata_less_than_expected(self): - ''' - test layer automata mechanism - ''' + """ + @summary: test layer automata mechanism + """ class TestAutomata(rdpy.network.layer.RawLayer): def expectedCallBack(self, data): if data.dataLen() == 4: diff --git a/test/test_protocol_rdp_x224.py b/test/test_protocol_rdp_x224.py index fa7d3fc..4c27a19 100644 --- a/test/test_protocol_rdp_x224.py +++ b/test/test_protocol_rdp_x224.py @@ -26,10 +26,267 @@ import os, sys sys.path.insert(1, os.path.join(sys.path[0], '..')) import unittest +import rdpy.protocol.rdp.x224 as x224 +import rdpy.network.type as type +import rdpy.base.error as error class X224Case(unittest.TestCase): - ''' - test case for x224 layer (RDP) - ''' - def test_x224_client(self): + """ + @summary: test case for x224 layer (RDP) + """ + + class X224_PASS(Exception): + """ + @summary: for OK tests + """ pass + + class X224_FAIL(Exception): + """ + @summary: for KO tests + """ + pass + + def test_x224_layer_recvData(self): + """ + @summary: unit test for X224Layer.recvData function + """ + class Presentation(object): + def recv(self, data): + data.readType(type.String('\x01\x02', constant = True)) + raise X224Case.X224_PASS() + + layer = x224.X224Layer(Presentation()) + s = type.Stream() + s.writeType((x224.X224DataHeader(), type.String('\x01\x02'))) + #reinit position + s.pos = 0 + + self.assertRaises(X224Case.X224_PASS, layer.recvData, s) + + def test_x224_layer_send(self): + """ + @summary: unit test for X224Layer.send function + """ + class Transport(object): + def send(self, data): + s = type.Stream() + s.writeType(data) + s.pos = 0 + s.readType(x224.X224DataHeader()) + s.readType(type.String('\x01\x02', constant = True)) + raise X224Case.X224_PASS() + + layer = x224.X224Layer(None) + layer._transport = Transport() + + self.assertRaises(X224Case.X224_PASS, layer.send, type.String('\x01\x02')) + + def test_x224_client_connect(self): + """ + @summary: unit test for X224Client.connect and sendConnectionRequest function + """ + class Transport(object): + def send(self, data): + s = type.Stream() + s.writeType(data) + s.pos = 0 + t = x224.ClientConnectionRequestPDU() + s.readType(t) + + if t.protocolNeg.code != x224.NegociationType.TYPE_RDP_NEG_REQ: + raise X224Case.X224_FAIL() + + if t.protocolNeg.selectedProtocol.value != x224.Protocols.PROTOCOL_SSL: + raise X224Case.X224_FAIL() + + def nextAutomata(data): + raise X224Case.X224_PASS() + + layer = x224.Client(None) + layer._transport = Transport() + layer.recvConnectionConfirm = nextAutomata + layer.connect() + + self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02')) + + def test_x224_client_recvConnectionConfirm_negotiation_old(self): + """ + @summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function + whithout protocol negotiation (doesn't support) + """ + message = x224.ServerConnectionConfirm() + del message._typeName[message._typeName.index("protocolNeg")] + s = type.Stream() + s.writeType(message) + s.pos = 0 + layer = x224.Client(None) + self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s) + + def test_x224_client_recvConnectionConfirm_negotiation_failure(self): + """ + @summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function + check negotiation failure + """ + message = x224.ServerConnectionConfirm() + message.protocolNeg.code.value = x224.NegociationType.TYPE_RDP_NEG_FAILURE + s = type.Stream() + s.writeType(message) + s.pos = 0 + layer = x224.Client(None) + self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s) + + def test_x224_client_recvConnectionConfirm_negotiation_bad_protocol(self): + """ + @summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function + Server ask another protocol than SSL + """ + message = x224.ServerConnectionConfirm() + message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_RDP + s = type.Stream() + s.writeType(message) + s.pos = 0 + layer = x224.Client(None) + self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s) + + def test_x224_client_recvConnectionConfirm_ok(self): + """ + @summary: nominal case of protocol negotiation + """ + global tls_begin, presentation_connect + tls_begin = False + presentation_connect = False + class Transport(object): + def __init__(self): + class TLSTransport(object): + def startTLS(self, context): + global tls_begin + tls_begin = True + self.transport = TLSTransport() + + class Presentation(object): + def connect(self): + global presentation_connect + presentation_connect = True + + def recvData(data): + raise X224Case.X224_PASS() + + message = x224.ServerConnectionConfirm() + message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_SSL + + s = type.Stream() + s.writeType(message) + s.pos = 0 + layer = x224.Client(Presentation()) + layer._transport = Transport() + layer.recvData = recvData + + layer.recvConnectionConfirm(s) + + self.assertTrue(tls_begin, "TLS is not started") + self.assertTrue(presentation_connect, "connect event is not forwarded") + self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02')) + + def test_x224_server_recvConnectionRequest_invalid_old_client(self): + """ + @summary: unit test for X224Server.recvConnectionRequest function + old client with non protocol neg + """ + message = x224.ClientConnectionRequestPDU() + del message._typeName[message._typeName.index("protocolNeg")] + s = type.Stream() + s.writeType(message) + s.pos = 0 + + layer = x224.Server(None, "key", "cert") + layer.connect() + + self.assertRaises(error.InvalidExpectedDataException, layer.recv, s) + + def test_x224_server_recvConnectionRequest_invalid_protocol_neg_failure(self): + """ + @summary: unit test for X224Server.recvConnectionRequest function + """ + message = x224.ClientConnectionRequestPDU() + message.protocolNeg.code.value = x224.NegociationType.TYPE_RDP_NEG_FAILURE + s = type.Stream() + s.writeType(message) + s.pos = 0 + + layer = x224.Server(None, "key", "cert") + layer.connect() + + self.assertRaises(error.InvalidExpectedDataException, layer.recv, s) + + def test_x224_server_recvConnectionRequest_client_accept_ssl(self): + """ + @summary: unit test for X224Server.recvConnectionRequest function + test client doesn't support TLS case + """ + + class Transport(object): + def send(self, data): + if not isinstance(data, x224.ServerConnectionConfirm): + raise X224Case.X224_FAIL() + if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_FAILURE or data.protocolNeg.failureCode.value != x224.NegotiationFailureCode.SSL_REQUIRED_BY_SERVER: + raise X224Case.X224_FAIL() + + message = x224.ClientConnectionRequestPDU() + message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_HYBRID + s = type.Stream() + s.writeType(message) + s.pos = 0 + + layer = x224.Server(None, "key", "cert") + layer._transport = Transport() + layer.connect() + + self.assertRaises(error.InvalidExpectedDataException, layer.recv, s) + + def test_x224_server_recvConnectionRequest_valid(self): + """ + @summary: unit test for X224Server.recvConnectionRequest function + """ + global tls, connect_event + tls = False + connect_event = False + + class ServerTLSContext(object): + def __init__(self, key, cert): + pass + + x224.ServerTLSContext = ServerTLSContext + + class Transport(object): + def __init__(self): + class TLS(object): + def startTLS(self, context): + global tls + tls = True + self.transport = TLS() + + def send(self, data): + if not isinstance(data, x224.ServerConnectionConfirm): + raise X224Case.X224_FAIL() + if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_RSP or data.protocolNeg.selectedProtocol.value != x224.Protocols.PROTOCOL_SSL: + raise X224Case.X224_FAIL() + + class Presentation(object): + def connect(self): + global connect_event + connect_event = True + + message = x224.ClientConnectionRequestPDU() + message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_SSL | x224.Protocols.PROTOCOL_RDP + s = type.Stream() + s.writeType(message) + s.pos = 0 + + layer = x224.Server(Presentation(), "key", "cert") + layer._transport = Transport() + layer.connect() + layer.recvConnectionRequest(s) + + self.assertTrue(tls, "TLS not started") + self.assertTrue(connect_event, "connect event not forwarded") \ No newline at end of file diff --git a/test/test_protocol_rfb_rfb.py b/test/test_protocol_rfb_rfb.py index d0b8478..e61816b 100644 --- a/test/test_protocol_rfb_rfb.py +++ b/test/test_protocol_rfb_rfb.py @@ -29,7 +29,7 @@ import unittest class RfbCase(unittest.TestCase): ''' - test casefor rfb layer (vnc) + test case for rfb layer (vnc) ''' def testName(self): pass