support 40bits and 56bits key on client side, bug on update keys

This commit is contained in:
speyrefitte
2014-12-11 18:34:51 +01:00
parent f3a3ad8ac3
commit d4d98471eb
14 changed files with 352 additions and 89 deletions

View File

@@ -30,7 +30,7 @@ import rdpy.protocol.rdp.x224 as x224
import rdpy.core.type as type
import rdpy.core.error as error
class X224Case(unittest.TestCase):
class X224Test(unittest.TestCase):
"""
@summary: test case for x224 layer (RDP)
"""
@@ -54,7 +54,7 @@ class X224Case(unittest.TestCase):
class Presentation(object):
def recv(self, data):
data.readType(type.String('test_x224_layer_recvData', constant = True))
raise X224Case.X224_PASS()
raise X224Test.X224_PASS()
layer = x224.X224Layer(Presentation())
s = type.Stream()
@@ -62,7 +62,7 @@ class X224Case(unittest.TestCase):
#reinit position
s.pos = 0
self.assertRaises(X224Case.X224_PASS, layer.recvData, s)
self.assertRaises(X224Test.X224_PASS, layer.recvData, s)
def test_x224_layer_send(self):
"""
@@ -75,12 +75,12 @@ class X224Case(unittest.TestCase):
s.pos = 0
s.readType(x224.X224DataHeader())
s.readType(type.String('test_x224_layer_send', constant = True))
raise X224Case.X224_PASS()
raise X224Test.X224_PASS()
layer = x224.X224Layer(None)
layer._transport = Transport()
self.assertRaises(X224Case.X224_PASS, layer.send, type.String('test_x224_layer_send'))
self.assertRaises(X224Test.X224_PASS, layer.send, type.String('test_x224_layer_send'))
def test_x224_client_connect(self):
"""
@@ -95,22 +95,22 @@ class X224Case(unittest.TestCase):
s.readType(t)
if t.protocolNeg.code != x224.NegociationType.TYPE_RDP_NEG_REQ:
raise X224Case.X224_FAIL()
raise X224Test.X224_FAIL()
def nextAutomata(data):
raise X224Case.X224_PASS()
raise X224Test.X224_PASS()
layer = x224.Client(None)
layer._transport = Transport()
layer.recvConnectionConfirm = nextAutomata
layer.connect()
self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02'))
self.assertRaises(X224Test.X224_PASS, layer.recv, type.String('\x01\x02'))
def test_x224_client_recvConnectionConfirm_negotiation_bad_protocol(self):
"""
@summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function
Server ask another protocol than SSL
Server ask another protocol than SSL or RDP
"""
message = x224.ServerConnectionConfirm()
message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_HYBRID
@@ -119,6 +119,19 @@ class X224Case(unittest.TestCase):
s.pos = 0
layer = x224.Client(None)
self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s)
def test_x224_client_recvConnectionConfirm_negotiation_failure(self):
"""
@summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function
check negotiation failure
"""
message = x224.ServerConnectionConfirm()
message.protocolNeg.code.value = x224.NegociationType.TYPE_RDP_NEG_FAILURE
s = type.Stream()
s.writeType(message)
s.pos = 0
layer = x224.Client(None)
self.assertRaises(error.RDPSecurityNegoFail, layer.recvConnectionConfirm, s)
def test_x224_client_recvConnectionConfirm_ok(self):
"""
@@ -141,7 +154,7 @@ class X224Case(unittest.TestCase):
presentation_connect = True
def recvData(data):
raise X224Case.X224_PASS()
raise X224Test.X224_PASS()
message = x224.ServerConnectionConfirm()
message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_SSL
@@ -157,7 +170,7 @@ class X224Case(unittest.TestCase):
self.assertTrue(tls_begin, "TLS is not started")
self.assertTrue(presentation_connect, "connect event is not forwarded")
self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02'))
self.assertRaises(X224Test.X224_PASS, layer.recv, type.String('\x01\x02'))
def test_x224_server_recvConnectionRequest_invalid_old_client(self):
"""
@@ -199,9 +212,9 @@ class X224Case(unittest.TestCase):
class Transport(object):
def send(self, data):
if not isinstance(data, x224.ServerConnectionConfirm):
raise X224Case.X224_FAIL()
raise X224Test.X224_FAIL()
if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_FAILURE or data.protocolNeg.failureCode.value != x224.NegotiationFailureCode.SSL_REQUIRED_BY_SERVER:
raise X224Case.X224_FAIL()
raise X224Test.X224_FAIL()
message = x224.ClientConnectionRequestPDU()
message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_HYBRID
@@ -239,9 +252,9 @@ class X224Case(unittest.TestCase):
def send(self, data):
if not isinstance(data, x224.ServerConnectionConfirm):
raise X224Case.X224_FAIL()
raise X224Test.X224_FAIL()
if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_RSP or data.protocolNeg.selectedProtocol.value != x224.Protocols.PROTOCOL_SSL:
raise X224Case.X224_FAIL()
raise X224Test.X224_FAIL()
class Presentation(object):
def connect(self):