From d4d98471eb68f8fb49d525e1405bf16f479c3a6e Mon Sep 17 00:00:00 2001 From: speyrefitte Date: Thu, 11 Dec 2014 18:34:51 +0100 Subject: [PATCH] support 40bits and 56bits key on client side, bug on update keys --- rdpy/protocol/rdp/gcc.py | 9 ++ rdpy/protocol/rdp/lic.py | 19 +-- rdpy/protocol/rdp/pdu/layer.py | 2 +- rdpy/protocol/rdp/sec.py | 153 +++++++++++++----- ...{test_base_const.py => test_core_const.py} | 2 +- ...st_network_layer.py => test_core_layer.py} | 12 +- ...test_network_type.py => test_core_type.py} | 2 +- test/test_protocol_rdp_ber.py | 2 +- test/test_protocol_rdp_lic.py | 79 +++++++++ test/test_protocol_rdp_mcs.py | 36 +++++ test/test_protocol_rdp_per.py | 2 +- test/test_protocol_rdp_rc4.py | 48 ++++++ test/test_protocol_rdp_tpkt.py | 32 ++-- test/test_protocol_rdp_x224.py | 43 +++-- 14 files changed, 352 insertions(+), 89 deletions(-) rename test/{test_base_const.py => test_core_const.py} (98%) rename test/{test_network_layer.py => test_core_layer.py} (87%) rename test/{test_network_type.py => test_core_type.py} (99%) create mode 100644 test/test_protocol_rdp_lic.py create mode 100644 test/test_protocol_rdp_mcs.py create mode 100644 test/test_protocol_rdp_rc4.py diff --git a/rdpy/protocol/rdp/gcc.py b/rdpy/protocol/rdp/gcc.py index 1479b13..81aa74b 100644 --- a/rdpy/protocol/rdp/gcc.py +++ b/rdpy/protocol/rdp/gcc.py @@ -403,6 +403,15 @@ class Settings(CompositeType): if i.type.value == messageType: return i.dataBlock return None + + def __getattr__(self, name): + """ + @summary: Magic function for better access + @return: _value parameter + """ + if not name in MessageType.__dict__: + return None + return self.getBlock(MessageType.__dict__[name]) def clientSettings(): """ diff --git a/rdpy/protocol/rdp/lic.py b/rdpy/protocol/rdp/lic.py index cac38fc..20cd47c 100644 --- a/rdpy/protocol/rdp/lic.py +++ b/rdpy/protocol/rdp/lic.py @@ -124,10 +124,10 @@ class ProductInformation(CompositeType): self.dwVersion = UInt32Le() self.cbCompanyName = UInt32Le(lambda:sizeof(self.pbCompanyName)) #may contain "Microsoft Corporation" from server microsoft - self.pbCompanyName = String(readLen = self.cbCompanyName, unicode = True) + self.pbCompanyName = String("Microsoft Corporation", readLen = self.cbCompanyName, unicode = True) self.cbProductId = UInt32Le(lambda:sizeof(self.pbProductId)) #may contain "A02" from microsoft license server - self.pbProductId = String(readLen = self.cbProductId, unicode = True) + self.pbProductId = String("A02", readLen = self.cbProductId, unicode = True) class Scope(CompositeType): @@ -159,7 +159,7 @@ class ServerLicenseRequest(CompositeType): def __init__(self): CompositeType.__init__(self) - self.serverRandom = String(readLen = UInt8(32)) + self.serverRandom = String("\x00" * 32, readLen = UInt8(32)) self.productInfo = ProductInformation() self.keyExchangeList = LicenseBinaryBlob(BinaryBlobType.BB_KEY_EXCHG_ALG_BLOB) self.serverCertificate = LicenseBinaryBlob(BinaryBlobType.BB_CERTIFICATE_BLOB) @@ -180,7 +180,7 @@ class ClientNewLicenseRequest(CompositeType): #pure microsoft client ;-) #http://msdn.microsoft.com/en-us/library/1040af38-c733-4fb3-acd1-8db8cc979eda#id10 self.platformId = UInt32Le(0x04000000 | 0x00010000) - self.clientRandom = String("\x00" * 32) + self.clientRandom = String("\x00" * 32, readLen = UInt8(32)) self.encryptedPreMasterSecret = LicenseBinaryBlob(BinaryBlobType.BB_RANDOM_BLOB) self.ClientUserName = LicenseBinaryBlob(BinaryBlobType.BB_CLIENT_USER_NAME_BLOB) self.ClientMachineName = LicenseBinaryBlob(BinaryBlobType.BB_CLIENT_MACHINE_NAME_BLOB) @@ -271,11 +271,10 @@ class LicenseManager(object): """ @summary: generate key for license session """ - masterSecret = sec.generateMicrosoftKeyABBCCC(self._preMasterSecret, self._clientRandom, self._serverRandom) - sessionKeyBlob = sec.generateMicrosoftKeyABBCCC(masterSecret, self._serverRandom, self._clientRandom) + masterSecret = sec.masterSecret(self._preMasterSecret, self._clientRandom, self._serverRandom) + sessionKeyBlob = sec.masterSecret(masterSecret, self._serverRandom, self._clientRandom) self._macSalt = sessionKeyBlob[:16] self._licenseKey = sec.finalHash(sessionKeyBlob[16:32], self._clientRandom, self._serverRandom) - self._rc4LicenseKey = rc4.RC4Key(self._licenseKey) def recv(self, s): """ @@ -293,10 +292,12 @@ class LicenseManager(object): self._serverRandom = licPacket.licensingMessage.serverRandom.value self.generateKeys() self.sendClientNewLicenseRequest() + return False elif licPacket.bMsgtype.value == MessageType.PLATFORM_CHALLENGE: self._serverEncryptedChallenge = licPacket.licensingMessage.encryptedPlatformChallenge.blobData.value self.sendClientChallengeResponse() + return False #yes get a new license elif licPacket.bMsgtype.value == MessageType.NEW_LICENSE: @@ -325,7 +326,7 @@ class LicenseManager(object): """ #decrypt server challenge #it should be TEST word in unicode format - serverChallenge = rc4.crypt(self._rc4LicenseKey, self._serverEncryptedChallenge) + serverChallenge = rc4.crypt(rc4.RC4Key(self._licenseKey), self._serverEncryptedChallenge) #generate hwid s = Stream() @@ -334,7 +335,7 @@ class LicenseManager(object): message = ClientPLatformChallengeResponse() message.encryptedPlatformChallengeResponse.blobData.value = self._serverEncryptedChallenge - message.encryptedHWID.blobData.value = rc4.crypt(self._rc4LicenseKey, hwid) + message.encryptedHWID.blobData.value = rc4.crypt(rc4.RC4Key(self._licenseKey), hwid) message.MACData.value = sec.macData(self._macSalt, serverChallenge + hwid) self._transport.sendFlagged(sec.SecurityFlag.SEC_LICENSE_PKT, LicPacket(message)) \ No newline at end of file diff --git a/rdpy/protocol/rdp/pdu/layer.py b/rdpy/protocol/rdp/pdu/layer.py index 3bee01a..4e63fc0 100644 --- a/rdpy/protocol/rdp/pdu/layer.py +++ b/rdpy/protocol/rdp/pdu/layer.py @@ -144,7 +144,7 @@ class Client(PDULayer): """ @summary: Connect message in client automata """ - self._gccCore = self._transport._transport.getGCCClientSettings().getBlock(gcc.MessageType.CS_CORE) + self._gccCore = self._transport._transport.getGCCClientSettings().CS_CORE self.setNextState(self.recvDemandActivePDU) #check if client support fast path message self._clientFastPathSupported = False diff --git a/rdpy/protocol/rdp/sec.py b/rdpy/protocol/rdp/sec.py index 17de239..3b93189 100644 --- a/rdpy/protocol/rdp/sec.py +++ b/rdpy/protocol/rdp/sec.py @@ -26,6 +26,7 @@ import gcc, lic, tpkt from rdpy.core.type import CompositeType, Stream, UInt32Le, UInt16Le, String, sizeof, UInt8 from rdpy.core.layer import LayerAutomata, IStreamSender from rdpy.core.error import InvalidExpectedDataException +from rdpy.core import log class SecurityFlag(object): """ @@ -131,16 +132,17 @@ def finalHash(key, random1, random2): md5Digest.update(random2) return md5Digest.digest() -def generateMicrosoftKeyABBCCC(secret, random1, random2): +def masterSecret(secret, random1, random2): """ @summary: Generate master secret - @param secret: secret - @param clientRandom : client random - @param serverRandom : server random + @param secret: {str} secret + @param clientRandom : {str} client random + @param serverRandom : {str} server random + @see: http://msdn.microsoft.com/en-us/library/cc241992.aspx """ return saltedHash("A", secret, random1, random2) + saltedHash("BB", secret, random1, random2) + saltedHash("CCC", secret, random1, random2) -def generateMicrosoftKeyXYYZZZ(secret, random1, random2): +def sessionKeyBlob(secret, random1, random2): """ @summary: Generate master secret @param secret: secret @@ -174,11 +176,67 @@ def macData(macSaltKey, data): return md5Digest.digest() +def gen40bits(data): + """ + @summary: generate 40 bits data from 128 bits data + @param data: {str} 128 bits data + @return: {str} 40 bits data + @see: http://msdn.microsoft.com/en-us/library/cc240785.aspx + """ + return "\xd1\x26\x9e" + data[:8][-5:] + +def gen56bits(data): + """ + @summary: generate 56 bits data from 128 bits data + @param data: {str} 128 bits data + @return: {str} 56 bits data + @see: http://msdn.microsoft.com/en-us/library/cc240785.aspx + """ + return "\xd1" + data[:8][-7:] + +def generateKeys(clientRandom, serverRandom, method): + """ + @param method: {gcc.Encryption} + @param clientRandom: {str[32]} client random + @param serverRandom: {str[32]} server random + @see: http://msdn.microsoft.com/en-us/library/cc240785.aspx + @return: MACKey, initialFirstKey128(ClientdecryptKey, serverEncryptKey), initialSecondKey128(ServerDecryptKey, ClientEncryptKey) + """ + preMasterHash = clientRandom[:24] + serverRandom[:24] + masterHash = masterSecret(preMasterHash, clientRandom, serverRandom) + sessionKey = sessionKeyBlob(masterHash, clientRandom, serverRandom) + macKey128 = sessionKey[:16] + initialFirstKey128 = finalHash(sessionKey[16:32], clientRandom, serverRandom) + initialSecondKey128 = finalHash(sessionKey[32:48], clientRandom, serverRandom) + + #generate valid key + if method == gcc.Encryption.ENCRYPTION_FLAG_40BIT: + return gen40bits(macKey128), gen40bits(initialFirstKey128), gen40bits(initialSecondKey128) + + elif method == gcc.Encryption.ENCRYPTION_FLAG_56BIT: + return gen56bits(macKey128), gen56bits(initialFirstKey128), gen56bits(initialSecondKey128) + + elif method == gcc.Encryption.ENCRYPTION_FLAG_128BIT: + return macKey128, initialFirstKey128, initialSecondKey128 + + raise InvalidExpectedDataException("Bad encryption method") + +def updateKeys(initialKey, currentKey, method): + """ + @summary: update session key + @param initialKey: {str} Initial key + @param currentKey: {str} Current key + @return newKey: {str} key to use + @see: http://msdn.microsoft.com/en-us/library/cc240792.aspx + """ + tempKey128 = macData(initialKey, currentKey) + return rc4.crypt(rc4.RC4Key(tempKey128), tempKey128) + def bin2bn(b): """ @summary: convert binary string to bignum - @param b: binary string - @return bignum + @param b: {str} binary string + @return: {long} bignum """ l = 0L for ch in b: @@ -207,7 +265,7 @@ class RDPInfo(CompositeType): #code page self.codePage = UInt32Le() #support flag - self.flag = UInt32Le(InfoFlag.INFO_MOUSE | InfoFlag.INFO_UNICODE | InfoFlag.INFO_LOGONNOTIFY | InfoFlag.INFO_LOGONERRORS) + self.flag = UInt32Le(InfoFlag.INFO_MOUSE | InfoFlag.INFO_UNICODE | InfoFlag.INFO_LOGONNOTIFY | InfoFlag.INFO_LOGONERRORS | InfoFlag.INFO_DISABLECTRLALTDEL) self.cbDomain = UInt16Le(lambda:sizeof(self.domain) - 2) self.cbUserName = UInt16Le(lambda:sizeof(self.userName) - 2) self.cbPassword = UInt16Le(lambda:sizeof(self.password) - 2) @@ -241,7 +299,7 @@ class RDPExtendedInfo(CompositeType): class SecLayer(LayerAutomata, IStreamSender, tpkt.IFastPathListener, tpkt.IFastPathSender): """ - @summary: Basic RDP security manager + @summary: Standard RDP security layer This layer is Transparent as possible for upper layer """ def __init__(self, presentation): @@ -254,31 +312,26 @@ class SecLayer(LayerAutomata, IStreamSender, tpkt.IFastPathListener, tpkt.IFastP self._fastPathPresentation = None #credentials - self._info = RDPInfo(extendedInfoConditional = lambda:(self._transport.getGCCServerSettings().getBlock(gcc.MessageType.SC_CORE).rdpVersion.value == gcc.Version.RDP_VERSION_5_PLUS)) + self._info = RDPInfo(extendedInfoConditional = lambda:(self._transport.getGCCServerSettings().SC_CORE.rdpVersion.value == gcc.Version.RDP_VERSION_5_PLUS)) #True if classic encryption is enable self._enableEncryption = False - #crypto random - self._clientRandom = None - self._serverRandom = None #initialise decrypt and encrypt keys - self._decryt = None - self._encrypt = None - #current rc4 map key + self._macKey = None + self._initialDecrytKey = None + self._initialEncryptKey = None + self._currentDecrytKey = None + self._currentEncryptKey = None + + #counter before update + self._nbEncryptedPacket = 0 + self._nbDecryptedPacket = 0 + + #current rc4 tab self._decryptRc4 = None self._encryptRc4 = None - def generateKeys(self): - """ - @see: http://msdn.microsoft.com/en-us/library/cc240785.aspx - @return: finalHash(second128bit(sessionkey), sthird128bit(sessionkey)) - """ - preMasterSecret = self._clientRandom[:24] + self._serverRandom[:24] - masterSecret = generateMicrosoftKeyABBCCC(preMasterSecret, self._clientRandom, self._serverRandom) - self._sessionKey = generateMicrosoftKeyXYYZZZ(masterSecret, self._clientRandom, self._serverRandom) - self._macKey128 = self._sessionKey[:16] - return (finalHash(self._sessionKey[16:32], self._clientRandom, self._serverRandom), finalHash(self._sessionKey[32:48], self._clientRandom, self._serverRandom)) def readEncryptedPayload(self, s): """ @@ -286,13 +339,24 @@ class SecLayer(LayerAutomata, IStreamSender, tpkt.IFastPathListener, tpkt.IFastP @param s: {Stream} encrypted stream @return: {Stream} decrypted """ + #if update is needed + if self._nbDecryptedPacket == 4096: + log.debug("Update decrypt key") + self._currentDecrytKey = updateKeys(self._initialDecrytKey, self._currentDecrytKey, None) + self._decryptRc4 = rc4.RC4Key(self._currentDecrytKey) + self._nbDecryptedPacket = 0 + signature = String(readLen = UInt8(8)) encryptedPayload = String() s.readType((signature, encryptedPayload)) decrypted = rc4.crypt(self._decryptRc4, encryptedPayload.value) + #ckeck signature - if macData(self._macKey128, decrypted)[:8] != signature.value: + if macData(self._macKey, decrypted)[:8] != signature.value: raise InvalidExpectedDataException("Bad packet signature") + + #count + self._nbDecryptedPacket += 1 return Stream(decrypted) @@ -302,9 +366,16 @@ class SecLayer(LayerAutomata, IStreamSender, tpkt.IFastPathListener, tpkt.IFastP @param s: {Stream} raw stream @return: {Tuple} (signature, encryptedData) """ + if self._nbEncryptedPacket == 4096: + log.debug("Update encrypt key") + self._currentEncryptKey = updateKeys(self._initialEncryptKey, self._currentEncryptKey, None) + self._encryptRc4 = rc4.RC4Key(self._currentEncryptKey) + self._nbEncryptedPacket = 0 + + self._nbEncryptedPacket += 1 s = Stream() s.writeType(data) - return (String(macData(self._macKey128, s.getvalue())[:8]), String(rc4.crypt(self._encryptRc4, s.getvalue()))) + return (String(macData(self._macKey, s.getvalue())[:8]), String(rc4.crypt(self._encryptRc4, s.getvalue()))) def recv(self, data): """ @@ -396,26 +467,30 @@ class Client(SecLayer): """ @summary: send client random """ - if self._transport.getGCCClientSettings().getBlock(gcc.MessageType.CS_CORE).serverSelectedProtocol == 0: + + self._enableEncryption = self._transport.getGCCClientSettings().CS_CORE.serverSelectedProtocol == 0 + + if self._enableEncryption: #generate client random - self._clientRandom = rsa.randnum.read_random_bits(256) - self._serverRandom = self._transport.getGCCServerSettings().getBlock(gcc.MessageType.SC_SECURITY).serverRandom.value - self._decrypt, self._encrypt = self.generateKeys() - self._decryptRc4 = rc4.RC4Key(self._decrypt) - self._encryptRc4 = rc4.RC4Key(self._encrypt) + clientRandom = rsa.randnum.read_random_bits(256) + self._macKey, self._initialDecrytKey, self._initialEncryptKey = generateKeys( clientRandom, + self._transport.getGCCServerSettings().SC_SECURITY.serverRandom.value, + self._transport.getGCCServerSettings().SC_SECURITY.encryptionMethod.value) + #initialize keys + self._currentDecrytKey = self._initialDecrytKey + self._currentEncryptKey = self._initialEncryptKey + self._decryptRc4 = rc4.RC4Key(self._currentDecrytKey) + self._encryptRc4 = rc4.RC4Key(self._currentEncryptKey) #send client random encrypted with - certificate = self._transport.getGCCServerSettings().getBlock(gcc.MessageType.SC_SECURITY).serverCertificate.certData + certificate = self._transport.getGCCServerSettings().SC_SECURITY.serverCertificate.certData #reverse because bignum in little endian serverPublicKey = rsa.PublicKey(bin2bn(certificate.PublicKeyBlob.modulus.value[::-1]), certificate.PublicKeyBlob.pubExp.value) message = ClientSecurityExchangePDU() #reverse because bignum in little endian - message.encryptedClientRandom.value = rsa.encrypt(self._clientRandom[::-1], serverPublicKey)[::-1] + message.encryptedClientRandom.value = rsa.encrypt(clientRandom[::-1], serverPublicKey)[::-1] self.sendFlagged(SecurityFlag.SEC_EXCHANGE_PKT, message) - - #now all messages must be encrypted - self._enableEncryption = True secFlag = SecurityFlag.SEC_INFO_PKT if self._enableEncryption: diff --git a/test/test_base_const.py b/test/test_core_const.py similarity index 98% rename from test/test_base_const.py rename to test/test_core_const.py index d7fe6d3..0920bbf 100644 --- a/test/test_base_const.py +++ b/test/test_core_const.py @@ -29,7 +29,7 @@ import unittest import rdpy.core.const import rdpy.core.type -class ConstCase(unittest.TestCase): +class ConstTest(unittest.TestCase): ''' represent test case for all classes and function present in rdpy.base.const diff --git a/test/test_network_layer.py b/test/test_core_layer.py similarity index 87% rename from test/test_network_layer.py rename to test/test_core_layer.py index 1ed5119..b24d5ea 100644 --- a/test/test_network_layer.py +++ b/test/test_core_layer.py @@ -28,7 +28,7 @@ sys.path.insert(1, os.path.join(sys.path[0], '..')) import unittest import rdpy.core.layer -class LayerCase(unittest.TestCase): +class LayerTest(unittest.TestCase): """ @summary: represent test case for all classes and function present in rdpy.core.layer @@ -46,9 +46,9 @@ class LayerCase(unittest.TestCase): """ class TestConnect(rdpy.core.layer.Layer): def connect(self): - raise LayerCase.LayerCaseException() + raise LayerTest.LayerCaseException() - self.assertRaises(LayerCase.LayerCaseException, rdpy.core.layer.Layer(presentation = TestConnect()).connect) + self.assertRaises(LayerTest.LayerCaseException, rdpy.core.layer.Layer(presentation = TestConnect()).connect) def test_layer_automata_more_than_expected(self): """ @@ -57,11 +57,11 @@ class LayerCase(unittest.TestCase): class TestAutomata(rdpy.core.layer.RawLayer): def expectedCallBack(self, data): if data.dataLen() == 4: - raise LayerCase.LayerCaseException() + raise LayerTest.LayerCaseException() t = TestAutomata() t.expect(4, t.expectedCallBack) - self.assertRaises(LayerCase.LayerCaseException, t.dataReceived, "\x00\x00\x00\x00\x00") + self.assertRaises(LayerTest.LayerCaseException, t.dataReceived, "\x00\x00\x00\x00\x00") def test_layer_automata_less_than_expected(self): """ @@ -70,7 +70,7 @@ class LayerCase(unittest.TestCase): class TestAutomata(rdpy.core.layer.RawLayer): def expectedCallBack(self, data): if data.dataLen() == 4: - raise LayerCase.LayerCaseException() + raise LayerTest.LayerCaseException() t = TestAutomata() t.expect(4, t.expectedCallBack) diff --git a/test/test_network_type.py b/test/test_core_type.py similarity index 99% rename from test/test_network_type.py rename to test/test_core_type.py index 582482e..1ca9064 100644 --- a/test/test_network_type.py +++ b/test/test_core_type.py @@ -29,7 +29,7 @@ import unittest import rdpy.core.type from rdpy.core.error import InvalidSize -class TypeCase(unittest.TestCase): +class TypeTest(unittest.TestCase): """ @summary: represent test case for all classes and function present in rdpy.network.type diff --git a/test/test_protocol_rdp_ber.py b/test/test_protocol_rdp_ber.py index 6085123..73ea751 100644 --- a/test/test_protocol_rdp_ber.py +++ b/test/test_protocol_rdp_ber.py @@ -30,7 +30,7 @@ import rdpy.protocol.rdp.ber as ber import rdpy.core.type as type import rdpy.core.error as error -class BERCase(unittest.TestCase): +class BERTest(unittest.TestCase): """ @summary: test case for ber layer (RDP) """ diff --git a/test/test_protocol_rdp_lic.py b/test/test_protocol_rdp_lic.py new file mode 100644 index 0000000..589105b --- /dev/null +++ b/test/test_protocol_rdp_lic.py @@ -0,0 +1,79 @@ +# +# Copyright (c) 2014 Sylvain Peyrefitte +# +# This file is part of rdpy. +# +# rdpy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +unit test for rdpy.protocol.rdp.lic automata +""" + +import os, sys +# Change path so we find rdpy +sys.path.insert(1, os.path.join(sys.path[0], '..')) + +import unittest +from rdpy.protocol.rdp import lic, sec +import rdpy.core.type as type + +class TestLic(unittest.TestCase): + """ + @summary: Test case for MCS automata + """ + + class LIC_PASS(Exception): + """ + @summary: for OK tests + """ + pass + + class LIC_FAIL(Exception): + """ + @summary: for KO tests + """ + pass + + def test_valid_client_licensing_error_message(self): + l = lic.LicenseManager(None) + s = type.Stream() + s.writeType(lic.createValidClientLicensingErrorMessage()) + #reinit position + s.pos = 0 + + self.assertTrue(l.recv(s), "Manager can retrieve valid case") + + def test_new_license(self): + class Transport(object): + def __init__(self): + self._state = False + def sendFlagged(self, flag, message): + if flag != sec.SecurityFlag.SEC_LICENSE_PKT: + return + s = type.Stream() + s.writeType(message) + s.pos = 0 + s.readType(lic.LicPacket(lic.ClientNewLicenseRequest())) + self._state = True + + t = Transport() + l = lic.LicenseManager(t) + + s = type.Stream() + s.writeType(lic.LicPacket(lic.ServerLicenseRequest())) + #reinit position + s.pos = 0 + + self.assertFalse(l.recv(s) and t._state, "Bad message after license request") \ No newline at end of file diff --git a/test/test_protocol_rdp_mcs.py b/test/test_protocol_rdp_mcs.py new file mode 100644 index 0000000..dbaef23 --- /dev/null +++ b/test/test_protocol_rdp_mcs.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2014 Sylvain Peyrefitte +# +# This file is part of rdpy. +# +# rdpy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +unit test for rdpy.protocol.rdp.mcs automata +""" + +import os, sys +# Change path so we find rdpy +sys.path.insert(1, os.path.join(sys.path[0], '..')) + +import unittest + +class MCSTest(unittest.TestCase): + """ + @summary: test case for per layer (RDP) + """ + + def test_per_readLength(self): + pass \ No newline at end of file diff --git a/test/test_protocol_rdp_per.py b/test/test_protocol_rdp_per.py index 57ba349..c1c2513 100644 --- a/test/test_protocol_rdp_per.py +++ b/test/test_protocol_rdp_per.py @@ -30,7 +30,7 @@ import rdpy.protocol.rdp.per as per import rdpy.core.type as type import rdpy.core.error as error -class PERCase(unittest.TestCase): +class PERTest(unittest.TestCase): """ @summary: test case for per layer (RDP) """ diff --git a/test/test_protocol_rdp_rc4.py b/test/test_protocol_rdp_rc4.py new file mode 100644 index 0000000..4f6cf7d --- /dev/null +++ b/test/test_protocol_rdp_rc4.py @@ -0,0 +1,48 @@ +# +# Copyright (c) 2014 Sylvain Peyrefitte +# +# This file is part of rdpy. +# +# rdpy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +unit test for rdpy.protocol.rdp.rc4 module +""" + +import os, sys +# Change path so we find rdpy +sys.path.insert(1, os.path.join(sys.path[0], '..')) + +import unittest +import rdpy.protocol.rdp.rc4 as rc4 + + +class RC4Test(unittest.TestCase): + """ + @summary: unit tests for rc4 + @see: http://fr.wikipedia.org/wiki/RC4 + """ + + def test_rc4_key_plaintext(self): + self.assertEqual("\xBB\xF3\x16\xE8\xD9\x40\xAF\x0A\xD3", rc4.crypt(rc4.RC4Key("Key"), "Plaintext"), "RC4 bad crypt") + self.assertEqual("Plaintext", rc4.crypt(rc4.RC4Key("Key"), "\xBB\xF3\x16\xE8\xD9\x40\xAF\x0A\xD3"), "RC4 bad crypt") + + def test_rc4_wiki_pedia(self): + self.assertEqual("\x10\x21\xBF\x04\x20", rc4.crypt(rc4.RC4Key("Wiki"), "pedia"), "RC4 bad crypt") + self.assertEqual("pedia", rc4.crypt(rc4.RC4Key("Wiki"), "\x10\x21\xBF\x04\x20"), "RC4 bad crypt") + + def test_rc4_secret_attack_at_down(self): + self.assertEqual("\x45\xA0\x1F\x64\x5F\xC3\x5B\x38\x35\x52\x54\x4B\x9B\xF5", rc4.crypt(rc4.RC4Key("Secret"), "Attack at dawn"), "RC4 bad crypt") + self.assertEqual("Attack at dawn", rc4.crypt(rc4.RC4Key("Secret"), "\x45\xA0\x1F\x64\x5F\xC3\x5B\x38\x35\x52\x54\x4B\x9B\xF5"), "RC4 bad crypt") diff --git a/test/test_protocol_rdp_tpkt.py b/test/test_protocol_rdp_tpkt.py index c5c1bee..9e27792 100644 --- a/test/test_protocol_rdp_tpkt.py +++ b/test/test_protocol_rdp_tpkt.py @@ -30,7 +30,7 @@ import rdpy.protocol.rdp.tpkt as tpkt import rdpy.core.type as type import rdpy.core.error as error -class TPKTCase(unittest.TestCase): +class TPKTTest(unittest.TestCase): """ @summary: test case for tpkt layer (RDP) """ @@ -47,10 +47,10 @@ class TPKTCase(unittest.TestCase): """ class Presentation(object): def connect(self): - raise TPKTCase.TPKT_PASS() + raise TPKTTest.TPKT_PASS() - layer = tpkt.TPKT(Presentation(), None) - self.assertRaises(TPKTCase.TPKT_PASS, layer.connect) + layer = tpkt.TPKT(Presentation()) + self.assertRaises(TPKTTest.TPKT_PASS, layer.connect) def test_tpkt_layer_recv(self): """ @@ -61,16 +61,16 @@ class TPKTCase(unittest.TestCase): pass def recv(self, data): data.readType(type.String("test_tpkt_layer_recv", constant = True)) - raise TPKTCase.TPKT_PASS() + raise TPKTTest.TPKT_PASS() message = type.String("test_tpkt_layer_recv") s = type.Stream() s.writeType((type.UInt8(tpkt.Action.FASTPATH_ACTION_X224), type.UInt8(), type.UInt16Be(type.sizeof(message) + 4), message)) - layer = tpkt.TPKT(Presentation(), None) + layer = tpkt.TPKT(Presentation()) layer.connect() - self.assertRaises(TPKTCase.TPKT_PASS, layer.dataReceived, s.getvalue()) + self.assertRaises(TPKTTest.TPKT_PASS, layer.dataReceived, s.getvalue()) def test_tpkt_layer_recv_fastpath(self): """ @@ -79,18 +79,19 @@ class TPKTCase(unittest.TestCase): class FastPathLayer(tpkt.IFastPathListener): def setFastPathSender(self, fastPathSender): pass - def recvFastPath(self, fastPathS): + def recvFastPath(self, secFlag, fastPathS): fastPathS.readType(type.String("test_tpkt_layer_recv_fastpath", constant = True)) - raise TPKTCase.TPKT_PASS() + raise TPKTTest.TPKT_PASS() message = type.String("test_tpkt_layer_recv_fastpath") s = type.Stream() s.writeType((type.UInt8(tpkt.Action.FASTPATH_ACTION_FASTPATH), type.UInt8(type.sizeof(message) + 2), message)) - layer = tpkt.TPKT(None, FastPathLayer()) + layer = tpkt.TPKT(None) + layer.initFastPath(FastPathLayer()) layer.connect() - self.assertRaises(TPKTCase.TPKT_PASS, layer.dataReceived, s.getvalue()) + self.assertRaises(TPKTTest.TPKT_PASS, layer.dataReceived, s.getvalue()) def test_tpkt_layer_recv_fastpath_ext_length(self): """ @@ -99,15 +100,16 @@ class TPKTCase(unittest.TestCase): class FastPathLayer(tpkt.IFastPathListener): def setFastPathSender(self, fastPathSender): pass - def recvFastPath(self, fastPathS): + def recvFastPath(self, secflag, fastPathS): fastPathS.readType(type.String("test_tpkt_layer_recv_fastpath_ext_length", constant = True)) - raise TPKTCase.TPKT_PASS() + raise TPKTTest.TPKT_PASS() message = type.String("test_tpkt_layer_recv_fastpath_ext_length") s = type.Stream() s.writeType((type.UInt8(tpkt.Action.FASTPATH_ACTION_FASTPATH), type.UInt16Be((type.sizeof(message) + 3) | 0x8000), message)) - layer = tpkt.TPKT(None, FastPathLayer()) + layer = tpkt.TPKT(None) + layer.initFastPath(FastPathLayer()) layer.connect() - self.assertRaises(TPKTCase.TPKT_PASS, layer.dataReceived, s.getvalue()) + self.assertRaises(TPKTTest.TPKT_PASS, layer.dataReceived, s.getvalue()) diff --git a/test/test_protocol_rdp_x224.py b/test/test_protocol_rdp_x224.py index 84bfd2d..b56cc59 100644 --- a/test/test_protocol_rdp_x224.py +++ b/test/test_protocol_rdp_x224.py @@ -30,7 +30,7 @@ import rdpy.protocol.rdp.x224 as x224 import rdpy.core.type as type import rdpy.core.error as error -class X224Case(unittest.TestCase): +class X224Test(unittest.TestCase): """ @summary: test case for x224 layer (RDP) """ @@ -54,7 +54,7 @@ class X224Case(unittest.TestCase): class Presentation(object): def recv(self, data): data.readType(type.String('test_x224_layer_recvData', constant = True)) - raise X224Case.X224_PASS() + raise X224Test.X224_PASS() layer = x224.X224Layer(Presentation()) s = type.Stream() @@ -62,7 +62,7 @@ class X224Case(unittest.TestCase): #reinit position s.pos = 0 - self.assertRaises(X224Case.X224_PASS, layer.recvData, s) + self.assertRaises(X224Test.X224_PASS, layer.recvData, s) def test_x224_layer_send(self): """ @@ -75,12 +75,12 @@ class X224Case(unittest.TestCase): s.pos = 0 s.readType(x224.X224DataHeader()) s.readType(type.String('test_x224_layer_send', constant = True)) - raise X224Case.X224_PASS() + raise X224Test.X224_PASS() layer = x224.X224Layer(None) layer._transport = Transport() - self.assertRaises(X224Case.X224_PASS, layer.send, type.String('test_x224_layer_send')) + self.assertRaises(X224Test.X224_PASS, layer.send, type.String('test_x224_layer_send')) def test_x224_client_connect(self): """ @@ -95,22 +95,22 @@ class X224Case(unittest.TestCase): s.readType(t) if t.protocolNeg.code != x224.NegociationType.TYPE_RDP_NEG_REQ: - raise X224Case.X224_FAIL() + raise X224Test.X224_FAIL() def nextAutomata(data): - raise X224Case.X224_PASS() + raise X224Test.X224_PASS() layer = x224.Client(None) layer._transport = Transport() layer.recvConnectionConfirm = nextAutomata layer.connect() - self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02')) + self.assertRaises(X224Test.X224_PASS, layer.recv, type.String('\x01\x02')) def test_x224_client_recvConnectionConfirm_negotiation_bad_protocol(self): """ @summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function - Server ask another protocol than SSL + Server ask another protocol than SSL or RDP """ message = x224.ServerConnectionConfirm() message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_HYBRID @@ -119,6 +119,19 @@ class X224Case(unittest.TestCase): s.pos = 0 layer = x224.Client(None) self.assertRaises(error.InvalidExpectedDataException, layer.recvConnectionConfirm, s) + + def test_x224_client_recvConnectionConfirm_negotiation_failure(self): + """ + @summary: unit test for X224Client.recvConnectionConfirm and sendConnectionRequest function + check negotiation failure + """ + message = x224.ServerConnectionConfirm() + message.protocolNeg.code.value = x224.NegociationType.TYPE_RDP_NEG_FAILURE + s = type.Stream() + s.writeType(message) + s.pos = 0 + layer = x224.Client(None) + self.assertRaises(error.RDPSecurityNegoFail, layer.recvConnectionConfirm, s) def test_x224_client_recvConnectionConfirm_ok(self): """ @@ -141,7 +154,7 @@ class X224Case(unittest.TestCase): presentation_connect = True def recvData(data): - raise X224Case.X224_PASS() + raise X224Test.X224_PASS() message = x224.ServerConnectionConfirm() message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_SSL @@ -157,7 +170,7 @@ class X224Case(unittest.TestCase): self.assertTrue(tls_begin, "TLS is not started") self.assertTrue(presentation_connect, "connect event is not forwarded") - self.assertRaises(X224Case.X224_PASS, layer.recv, type.String('\x01\x02')) + self.assertRaises(X224Test.X224_PASS, layer.recv, type.String('\x01\x02')) def test_x224_server_recvConnectionRequest_invalid_old_client(self): """ @@ -199,9 +212,9 @@ class X224Case(unittest.TestCase): class Transport(object): def send(self, data): if not isinstance(data, x224.ServerConnectionConfirm): - raise X224Case.X224_FAIL() + raise X224Test.X224_FAIL() if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_FAILURE or data.protocolNeg.failureCode.value != x224.NegotiationFailureCode.SSL_REQUIRED_BY_SERVER: - raise X224Case.X224_FAIL() + raise X224Test.X224_FAIL() message = x224.ClientConnectionRequestPDU() message.protocolNeg.selectedProtocol.value = x224.Protocols.PROTOCOL_HYBRID @@ -239,9 +252,9 @@ class X224Case(unittest.TestCase): def send(self, data): if not isinstance(data, x224.ServerConnectionConfirm): - raise X224Case.X224_FAIL() + raise X224Test.X224_FAIL() if data.protocolNeg.code.value != x224.NegociationType.TYPE_RDP_NEG_RSP or data.protocolNeg.selectedProtocol.value != x224.Protocols.PROTOCOL_SSL: - raise X224Case.X224_FAIL() + raise X224Test.X224_FAIL() class Presentation(object): def connect(self):