This commit is contained in:
citronneur
2014-06-02 22:40:01 +02:00
parent 3c0a8e2505
commit e7f1cdfc87
8 changed files with 428 additions and 12 deletions

View File

@@ -4,7 +4,7 @@
import sys, os
# Change path so we find rdpy
sys.path.insert(1, os.path.join(sys.path[0], '..'))
import unittest, rdpy.test.network.type, rdpy.test.network.const, rdpy.test.network.layer
import unittest, rdpy.tests.network.type, rdpy.tests.network.const, rdpy.tests.network.layer
def headerTest(name):
print "*"*70
@@ -13,14 +13,14 @@ def headerTest(name):
if __name__ == '__main__':
headerTest("Test case rdpy.test.network.type.TypeCase")
suite = unittest.TestLoader().loadTestsFromTestCase(rdpy.test.network.type.TypeCase)
suite = unittest.TestLoader().loadTestsFromTestCase(rdpy.tests.network.type.TypeCase)
unittest.TextTestRunner(verbosity=2).run(suite)
headerTest("Test case rdpy.test.network.const.ConstCase")
suite = unittest.TestLoader().loadTestsFromTestCase(rdpy.test.network.const.ConstCase)
suite = unittest.TestLoader().loadTestsFromTestCase(rdpy.tests.network.const.ConstCase)
unittest.TextTestRunner(verbosity=2).run(suite)
headerTest("Test case rdpy.test.network.type.layer.LayerCase")
suite = unittest.TestLoader().loadTestsFromTestCase(rdpy.test.network.layer.LayerCase)
suite = unittest.TestLoader().loadTestsFromTestCase(rdpy.tests.network.layer.LayerCase)
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@@ -409,12 +409,6 @@ class CompositeType(Type):
raise e
readLen += sizeof(self.__dict__[name])
selfSize = sizeof(self)
if readLen != selfSize:
#read end padding
s.read(selfSize - readLen)
print "Warning: type %s still have data after read, use as padding"%self.__class__
def __write__(self, s):
'''

View File

@@ -160,10 +160,10 @@ class TPDU(LayerAutomata):
message = TPDUConnectMessage()
data.readType(message)
if message.code != MessageType.X224_TPDU_CONNECTION_REQUEST:
raise InvalidExpectedDataException("expect connection packet")
raise InvalidExpectedDataException("Expect connection packet")
if not message.protocolNeg._is_readed or message.protocolNeg.failureCode._is_readed:
raise InvalidExpectedDataException("too older rdp client")
raise InvalidExpectedDataException("Too older rdp client")
self._requestedProtocol = message.protocolNeg.selectedProtocol

0
rdpy/tests/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,34 @@
'''
@author: sylvain
'''
import unittest
import rdpy.network.const
import rdpy.network.type
class ConstCase(unittest.TestCase):
'''
represent test case for all classes and function
present in rdpy.network.const
'''
def test_type_attributes(self):
'''
test if type attributes decorator works
'''
@rdpy.network.const.TypeAttributes(rdpy.network.type.UInt16Le)
class Test:
MEMBER_1 = 1
MEMBER_2 = 2
self.assertIsInstance(Test.MEMBER_1, rdpy.network.type.UInt16Le, "MEMBER_1 is not in correct type")
self.assertIsInstance(Test.MEMBER_2, rdpy.network.type.UInt16Le, "MEMBER_2 is not in correct type")
def test_const(self):
'''
test if get on const class member generate new object each
'''
@rdpy.network.const.ConstAttributes
class Test:
MEMBER_1 = 1
MEMBER_2 = 2
self.assertEquals(Test.MEMBER_1, Test.MEMBER_1, "handle same type of object")

View File

@@ -0,0 +1,38 @@
'''
@author: sylvain
'''
import unittest
import rdpy.network.layer
class LayerCase(unittest.TestCase):
'''
represent test case for all classes and function
present in rdpy.network.layer
'''
class LayerCaseException(Exception):
'''
exception use for event base test
'''
pass
def test_layer_connect_event(self):
'''
test if connect event is send from transport to presentation
'''
class TestConnect(rdpy.network.layer.Layer):
def connect(self):
raise LayerCase.LayerCaseException()
self.assertRaises(LayerCase.LayerCaseException, rdpy.network.layer.Layer(presentation = TestConnect()).connect)
def test_layer_receive_event(self):
'''
test if recv event is send from transport to presentation
'''
class TestConnect(rdpy.network.layer.Layer):
def recv(self, s):
if s == "message":
raise LayerCase.LayerCaseException()
self.assertRaises(LayerCase.LayerCaseException, rdpy.network.layer.Layer(presentation = TestConnect()).recv, "message")

350
rdpy/tests/network/type.py Normal file
View File

@@ -0,0 +1,350 @@
'''
@author: sylvain
'''
import unittest
import rdpy.network.type
class TypeCase(unittest.TestCase):
'''
represent test case for all classes and function
present in rdpy.network.type
'''
def test_callable_value_const(self):
'''
test if callable value with const ctor doesn't change value
'''
c = rdpy.network.type.CallableValue(5)
self.assertEqual(c.value, 5, "invalid callable const")
def test_callable_value_lambda(self):
'''
test if callable value with lambda ctor return dynamic value
'''
c = rdpy.network.type.CallableValue(lambda:5)
self.assertEqual(c.value, 5, "invalid callable lambda")
def test_type_write_conditional_true(self):
'''
test when write is obligatory call write function
'''
class TestType(rdpy.network.type.Type):
def __write__(self, s):
raise Exception()
s = rdpy.network.type.Stream()
self.assertRaises(Exception, s.writeType, TestType(conditional = lambda:True))
@unittest.expectedFailure
def test_type_write_conditional_false(self):
'''
test when write doesn't needed, doesn't call write function
'''
class TestType(rdpy.network.type.Type):
def __write__(self, s):
raise Exception()
s = rdpy.network.type.Stream()
self.assertRaises(Exception, s.writeType, TestType(conditional = lambda:False))
def test_type_read_conditional_true(self):
'''
test when read is obligatory call write function
'''
class TestType(rdpy.network.type.Type):
def __read__(self, s):
raise Exception()
s = rdpy.network.type.Stream()
self.assertRaises(Exception, s.readType, TestType(conditional = lambda:True))
@unittest.expectedFailure
def test_type_read_conditional_false(self):
'''
test when read doesn't needed, doesn't call read function
'''
class TestType(rdpy.network.type.Type):
def __read__(self, s):
raise Exception()
s = rdpy.network.type.Stream()
self.assertRaises(Exception, s.readType, TestType(conditional = lambda:False))
def test_sizeof_conditional_true(self):
'''
test if sizeof of simple type is init value(4) when type is conditional true
'''
v = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:True)
self.assertEqual(rdpy.network.type.sizeof(v), 4, "invalid sizeof")
def test_sizeof_conditional_false(self):
'''
test if sizeof of simple type is 0 when type is conditional false
'''
v = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:False)
self.assertEqual(rdpy.network.type.sizeof(v), 0, "invalid sizeof")
def test_sizeof_list(self):
'''
test call sizeof on list of type
'''
v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le()]
self.assertEqual(rdpy.network.type.sizeof(v), 7, "invalid sizeof")
def test_sizeof_list_conditional(self):
'''
test call sizeof on list of type with one type hidden
'''
v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(conditional = lambda:False), rdpy.network.type.UInt32Le()]
self.assertEqual(rdpy.network.type.sizeof(v), 5, "invalid sizeof")
def test_sizeof_tuple(self):
'''
test call sizeof on tuple of type
'''
v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le()]
self.assertEqual(rdpy.network.type.sizeof(v), 7, "invalid sizeof")
def test_sizeof_tuple_conditional(self):
'''
test call sizeof on tuple of type with one type hidden
'''
v = (rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le(conditional = lambda:False))
self.assertEqual(rdpy.network.type.sizeof(v), 3, "invalid sizeof")
def test_stream_write_uint8_type(self):
'''
test write uint8 in stream
'''
s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt8(1))
self.assertEqual(''.join(s.buflist), '\x01', "invalid stream write")
def test_stream_write_uint16Le_type(self):
'''
test write UInt16Le in stream
'''
s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt16Le(1))
self.assertEqual(''.join(s.buflist), '\x01\x00', "invalid stream write")
def test_stream_write_uint16Be_type(self):
'''
test write UInt16Be in stream
'''
s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt16Be(1))
self.assertEqual(''.join(s.buflist), '\x00\x01', "invalid stream write")
def test_stream_write_uint24Le_type(self):
'''
test write UInt24Le in stream
'''
s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt24Le(1))
self.assertEqual(''.join(s.buflist), '\x01\x00\x00', "invalid stream write")
def test_stream_write_uint24Be_type(self):
'''
test write uint24Be in stream
'''
s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt24Be(1))
self.assertEqual(''.join(s.buflist), '\x00\x00\x01', "invalid stream write")
def test_stream_write_uint32Le_type(self):
'''
test write UInt32Le in stream
'''
s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt32Le(1))
self.assertEqual(''.join(s.buflist), '\x01\x00\x00\x00', "invalid stream write")
def test_stream_write_uint32Be_type(self):
'''
test write UInt32Be in stream
'''
s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt32Be(1))
self.assertEqual(''.join(s.buflist), '\x00\x00\x00\x01', "invalid stream write")
def test_stream_read_uint8_type(self):
'''
test read UInt8 type from stream
'''
s = rdpy.network.type.Stream('\x01')
t = rdpy.network.type.UInt8()
s.readType(t)
self.assertEqual(t.value, 1, "invalid stream read value")
self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint16Le_type(self):
'''
test read UInt16Le type from stream
'''
s = rdpy.network.type.Stream('\x01\x00')
t = rdpy.network.type.UInt16Le()
s.readType(t)
self.assertEqual(t.value, 1, "invalid stream read value")
self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint16Be_type(self):
'''
test read UInt16Be type from stream
'''
s = rdpy.network.type.Stream('\x00\x01')
t = rdpy.network.type.UInt16Be()
s.readType(t)
self.assertEqual(t.value, 1, "invalid stream read value")
self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint24Le_type(self):
'''
test read UInt24Le type from stream
'''
s = rdpy.network.type.Stream('\x01\x00\x00')
t = rdpy.network.type.UInt24Le()
s.readType(t)
self.assertEqual(t.value, 1, "invalid stream read value")
self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint24Be_type(self):
'''
test read UInt24Be type from stream
'''
s = rdpy.network.type.Stream('\x00\x00\x01')
t = rdpy.network.type.UInt24Be()
s.readType(t)
self.assertEqual(t.value, 1, "invalid stream read")
self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint32Le_type(self):
'''
test read UInt32Le type from stream
'''
s = rdpy.network.type.Stream('\x01\x00\x00\x00')
t = rdpy.network.type.UInt32Le()
s.readType(t)
self.assertEqual(t.value, 1, "invalid stream read value")
self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint32Be_type(self):
'''
test read UInt32Be type from stream
'''
s = rdpy.network.type.Stream('\x00\x00\x00\x01')
t = rdpy.network.type.UInt32Be()
s.readType(t)
self.assertEqual(t.value, 1, "invalid stream read")
self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_optional_singletype(self):
'''
test optional option in case of simple type reading
'''
#unsigned int case
t = rdpy.network.type.SimpleType("I", 4, False, 0, optional = True)
#empty stream
s1 = rdpy.network.type.Stream()
s1.readType(t)
self.assertEqual(t.value, 0, "invalid stream read optional value")
def test_stream_read_conditional_singletype_false(self):
'''
test conditional option in case of simple type reading and when condition is false (not read)
'''
#unsigned int case
t = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:False)
s1 = rdpy.network.type.Stream("\x01\x00\x00\x00")
s1.readType(t)
self.assertEqual(t.value, 0, "invalid stream read conditional value")
def test_stream_read_conditional_singletype_true(self):
'''
test conditional option in case of simple type reading and when condition is true (must be read)
'''
#unsigned int case
t = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:True)
s1 = rdpy.network.type.Stream("\x01\x00\x00\x00")
s1.readType(t)
self.assertEqual(t.value, 1, "invalid stream read conditional value")
def test_strem_read_rollback_constant_constraint(self):
'''
test if constant constraint fail, the reading stream is correctly rollback
'''
class TestComposite(rdpy.network.type.CompositeType):
def __init__(self):
rdpy.network.type.CompositeType.__init__(self)
self.padding = rdpy.network.type.UInt32Le(0)
self.constraint = rdpy.network.type.UInt32Le(1, constant = True)
s = rdpy.network.type.Stream("\x00\x00\x00\x00\x00\x00\x00\x00")
try:
s.readType(TestComposite())
except Exception:
self.assertEqual(s.readLen(), 0, "invalid stream roll back operation")
return
self.assertTrue(False, "Constant constraint fail")
def test_strem_read_rollback_constant_constraint_recurcive(self):
'''
test if constant constraint fail even in recurcive composite type,
the reading stream is correctly rollback
'''
class TestSubComposite(rdpy.network.type.CompositeType):
def __init__(self):
rdpy.network.type.CompositeType.__init__(self)
self.padding = rdpy.network.type.UInt32Le(0)
self.constraint = rdpy.network.type.UInt32Le(1, constant = True)
class TestComposite(rdpy.network.type.CompositeType):
def __init__(self):
rdpy.network.type.CompositeType.__init__(self)
self.padding = rdpy.network.type.UInt32Le(0)
self.recurcive = TestSubComposite()
s = rdpy.network.type.Stream("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
try:
s.readType(TestComposite())
except Exception:
self.assertEqual(s.readLen(), 0, "invalid stream roll back operation")
return
self.assertTrue(False, "Constant constraint fail")
def test_stream_read_rollback_not_enough_data(self):
'''
test if constant constraint fail even in recurcive composite type,
the reading stream is correctly rollback
'''
class TestSubComposite(rdpy.network.type.CompositeType):
def __init__(self):
rdpy.network.type.CompositeType.__init__(self)
self.padding = rdpy.network.type.UInt32Le(0)
self.constraint = rdpy.network.type.UInt32Le(1)
class TestComposite(rdpy.network.type.CompositeType):
def __init__(self):
rdpy.network.type.CompositeType.__init__(self)
self.padding = rdpy.network.type.UInt32Le(0)
self.recurcive = TestSubComposite()
s = rdpy.network.type.Stream("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
try:
s.readType(TestComposite())
except Exception:
self.assertEqual(s.readLen(), 0, "invalid stream roll back operation")
return
self.assertTrue(False, "Constant constraint fail")
def test_stream_composite_type_force_read_length_optional(self):
'''
test where type have readLen forced and have optional subtype which have
length greater than last subtype of composite type
'''
class TestType(rdpy.network.type.CompositeType):
def __init__(self, readLen = None):
rdpy.network.type.CompositeType.__init__(self, readLen = readLen)
self.t1 = rdpy.network.type.UInt32Le(0, optional = True)
self.t2 = rdpy.network.type.UInt16Le(0, optional = True)
s = rdpy.network.type.Stream("\x00\x00\x00\x00\x00\x00\x00")
t = TestType(readLen = rdpy.network.type.UInt8(2))
s.readType(t)
self.assertTrue(not t.t1._is_readed and t.t2._is_readed, "Invalid optional reading when length is forced")