bug fix + tests

This commit is contained in:
speyrefitte
2014-08-26 18:02:01 +02:00
parent 26087620a0
commit e56612ba4d
4 changed files with 176 additions and 127 deletions

View File

@@ -62,13 +62,16 @@ class ProxyServer(rdp.RDPServerObserver):
width, height = self._controller.getScreen() width, height = self._controller.getScreen()
self._window = view.Window(width, height, QtGui.QColor(8, 24, 66)) self._window = view.Window(width, height, QtGui.QColor(8, 24, 66))
self._window.addView(view.Anchor(width / 2 - 250, 100, view.Label("Please select following server", self._window.addView(view.Anchor(width / 2 - 250, 100,
500, 50, QtGui.QFont('arial', 18, QtGui.QFont.Bold), view.Label("Please select following server",
backgroundColor = QtGui.QColor(8, 24, 66)))) 500, 50, QtGui.QFont('arial', 18, QtGui.QFont.Bold),
backgroundColor = QtGui.QColor(8, 24, 66))))
self._window.addView(view.Anchor(width / 2 - 250, 150,
view.List(["%s:%s"%(ip, port) for ip, port in machines],
500, 500, self.onSelectMachine,
QtGui.QColor(8, 24, 66))), True)
self._window.addView(view.Anchor(width / 2 - 250, 150, view.List(["%s:%s"%(ip, port) for ip, port in machines],
500, 500, self.onSelectMachine,
QtGui.QColor(8, 24, 66))), True)
self._window.update(view.RDPRenderer(self._controller), True) self._window.update(view.RDPRenderer(self._controller), True)
def onSelectMachine(self, index): def onSelectMachine(self, index):
@@ -99,10 +102,12 @@ class ProxyServer(rdp.RDPServerObserver):
width, height = self._controller.getScreen() width, height = self._controller.getScreen()
popup = view.Window(width, height, QtGui.QColor(8, 24, 66)) popup = view.Window(width, height, QtGui.QColor(8, 24, 66))
popup.addView(view.Anchor(width / 2 - 250, height / 2 - 25, popup.addView(view.Anchor(width / 2 - 250, height / 2 - 25,
view.Label(message, 500, 50, view.Label(message, 500, 50,
QtGui.QFont('arial', 18, QtGui.QFont.Bold), QtGui.QFont('arial', 18, QtGui.QFont.Bold),
backgroundColor = QtGui.QColor(8, 24, 66)))) backgroundColor = QtGui.QColor(8, 24, 66))))
popup.update(view.RDPRenderer(self._controller), True) popup.update(view.RDPRenderer(self._controller), True)
def onReady(self): def onReady(self):
@@ -220,6 +225,7 @@ class ProxyClient(rdp.RDPClientObserver):
rdp.RDPClientObserver.__init__(self, controller) rdp.RDPClientObserver.__init__(self, controller)
self._server = server self._server = server
self._name = name self._name = name
self._connected = False
def onReady(self): def onReady(self):
""" """
@@ -227,6 +233,14 @@ class ProxyClient(rdp.RDPClientObserver):
Inform ProxyServer that i'm connected Inform ProxyServer that i'm connected
@see: rdp.RDPClientObserver.onReady @see: rdp.RDPClientObserver.onReady
""" """
#prevent multiple on ready event
#because each deactive-reactive sequence
#launch an onReady message
if self._connected:
return
else:
self._connected = True
if not self._name is None: if not self._name is None:
ProxyClient._CONNECTED_.append(self) ProxyClient._CONNECTED_.append(self)
self._server.clientConnected(self) self._server.clientConnected(self)
@@ -236,7 +250,8 @@ class ProxyClient(rdp.RDPClientObserver):
@summary: Event inform that stack is close @summary: Event inform that stack is close
@see: rdp.RDPClientObserver.onClose @see: rdp.RDPClientObserver.onClose
""" """
ProxyClient._CONNECTED_.remove(self) if not self._name is None:
ProxyClient._CONNECTED_.remove(self)
def onUpdate(self, destLeft, destTop, destRight, destBottom, width, height, bitsPerPixel, isCompress, data): def onUpdate(self, destLeft, destTop, destRight, destBottom, width, height, bitsPerPixel, isCompress, data):
""" """
@@ -267,7 +282,7 @@ class ProxyClientFactory(rdp.ClientFactory):
@param username: username session @param username: username session
@param password: password session @param password: password session
""" """
self._controller = server self._server = server
self._width = width self._width = width
self._height = height self._height = height
self._domain = domain self._domain = domain
@@ -288,8 +303,7 @@ class ProxyClientFactory(rdp.ClientFactory):
controller.setDomain(self._domain) controller.setDomain(self._domain)
controller.setUsername(self._username) controller.setUsername(self._username)
controller.setPassword(self._password) controller.setPassword(self._password)
proxy = ProxyClient(controller, self._controller, "%s\\%s on %s"%(self._domain, self._username, addr)) return ProxyClient(controller, self._server, "%s\\%s on %s"%(self._domain, self._username, addr))
return proxy
def startedConnecting(self, connector): def startedConnecting(self, connector):
pass pass
@@ -329,13 +343,15 @@ class ProxyAdmin(rdp.RDPServerObserver):
width, height = self._controller.getScreen() width, height = self._controller.getScreen()
self._window = view.Window(width, height, QtGui.QColor(8, 24, 66)) self._window = view.Window(width, height, QtGui.QColor(8, 24, 66))
self._window.addView(view.Anchor(width / 2 - 250, 100, view.Label("Please select following session", self._window.addView(view.Anchor(width / 2 - 250, 100,
500, 50, QtGui.QFont('arial', 18, QtGui.QFont.Bold), view.Label("Please select following session",
backgroundColor = QtGui.QColor(8, 24, 66)))) 500, 50, QtGui.QFont('arial', 18, QtGui.QFont.Bold),
backgroundColor = QtGui.QColor(8, 24, 66))))
self._window.addView(view.Anchor(width / 2 - 250, 150, view.List([p._name for p in self._sessions], self._window.addView(view.Anchor(width / 2 - 250, 150,
500, 500, self.onSelect, view.List([p._name for p in self._sessions],
QtGui.QColor(8, 24, 66))), True) 500, 500, self.onSelect,
QtGui.QColor(8, 24, 66))), True)
def clientConnected(self, client): def clientConnected(self, client):
pass pass
@@ -470,7 +486,7 @@ def help():
""" """
@summary: Print help in console @summary: Print help in console
""" """
print "Usage: rdpy-rdpproxy -f credential_file_path -k private_key_file_path -c certificate_file_path listen_port" print "Usage: rdpy-rdpproxy -f credential_file_path -k private_key_file_path -c certificate_file_path [-i admin_ip[:admin_port]] listen_port"
def loadConfig(configFilePath): def loadConfig(configFilePath):
""" """

View File

@@ -18,9 +18,9 @@
# #
""" """
Raw type use un RDPY Raw type use in RDPY
It's a basic implementation looks like protobuf but dynamically It's a basic implementation looks like google protobuf but dynamically
We are in python! We are in python!
""" """
@@ -130,9 +130,9 @@ class Type(object):
class CallableValue(object): class CallableValue(object):
""" """
@summary: Expression evaluate when is get or set @summary: Expression evaluate when is get or set
Ex: Type contain lenght of array and array Ex: Type contain length of array and array
To know the size of array you need to read To know the size of array you need to read
length field before. A ctor time no length was readed. length field before. At ctor time no length was read.
You need a callable object that will be evaluate when it will be used You need a callable object that will be evaluate when it will be used
""" """
def __init__(self, value): def __init__(self, value):
@@ -191,7 +191,7 @@ class SimpleType(Type, CallableValue):
@param structFormat: letter that represent type in struct package @param structFormat: letter that represent type in struct package
@param typeSize: size in byte of type @param typeSize: size in byte of type
@param signed: true if type represent a signed type @param signed: true if type represent a signed type
@param value: value recorded in this object (raw python type | lambda | function) @param value: value recorded in this object (raw Python type | lambda | function)
@param conditional : Callable object @param conditional : Callable object
Read and Write operation depend on return of this function Read and Write operation depend on return of this function
@param optional: If there is no enough byte in current stream @param optional: If there is no enough byte in current stream
@@ -437,7 +437,7 @@ class CompositeType(Type):
""" """
@summary: Track Type field @summary: Track Type field
For Type field record it in same order as declared For Type field record it in same order as declared
Keep other but bot handle in read or write functino Keep other but bot handle in read or write function
@param name: name of new attribute @param name: name of new attribute
@param value: value of new attribute @param value: value of new attribute
""" """
@@ -452,7 +452,7 @@ class CompositeType(Type):
And check read length parameter And check read length parameter
If an error occurred rollback type already read If an error occurred rollback type already read
@param s: Stream @param s: Stream
@raise InvalidSize: if stream is greatter than readLen parameter @raise InvalidSize: if stream is greater than readLen parameter
""" """
readLen = 0 readLen = 0
for name in self._typeName: for name in self._typeName:

View File

@@ -35,7 +35,7 @@ class LayerCase(unittest.TestCase):
if data.dataLen() == 4: if data.dataLen() == 4:
raise LayerCase.LayerCaseException() raise LayerCase.LayerCaseException()
t = TestAutomata(rdpy.network.layer.LayerMode.NONE) t = TestAutomata()
t.expect(4, t.expectedCallBack) t.expect(4, t.expectedCallBack)
self.assertRaises(LayerCase.LayerCaseException, t.dataReceived, "\x00\x00\x00\x00\x00") self.assertRaises(LayerCase.LayerCaseException, t.dataReceived, "\x00\x00\x00\x00\x00")
@@ -48,6 +48,6 @@ class LayerCase(unittest.TestCase):
if data.dataLen() == 4: if data.dataLen() == 4:
raise LayerCase.LayerCaseException() raise LayerCase.LayerCaseException()
t = TestAutomata(rdpy.network.layer.LayerMode.NONE) t = TestAutomata()
t.expect(4, t.expectedCallBack) t.expect(4, t.expectedCallBack)
self.assertEqual(t.dataReceived("\x00\x00\x00"), None, "Not enough dada") self.assertEqual(t.dataReceived("\x00\x00\x00"), None, "Not enough dada")

View File

@@ -3,30 +3,31 @@
''' '''
import unittest import unittest
import rdpy.network.type import rdpy.network.type
from rdpy.base.error import InvalidSize
class TypeCase(unittest.TestCase): class TypeCase(unittest.TestCase):
''' """
represent test case for all classes and function @summary: represent test case for all classes and function
present in rdpy.network.type present in rdpy.network.type
''' """
def test_callable_value_const(self): def test_callable_value_const(self):
''' """
test if callable value with const ctor doesn't change value @summary: test if callable value with const ctor doesn't change value
''' """
c = rdpy.network.type.CallableValue(5) c = rdpy.network.type.CallableValue(5)
self.assertEqual(c.value, 5, "invalid callable const") self.assertEqual(c.value, 5, "invalid callable const")
def test_callable_value_lambda(self): def test_callable_value_lambda(self):
''' '''
test if callable value with lambda ctor return dynamic value @summary: test if callable value with lambda ctor return dynamic value
''' '''
c = rdpy.network.type.CallableValue(lambda:5) c = rdpy.network.type.CallableValue(lambda:5)
self.assertEqual(c.value, 5, "invalid callable lambda") self.assertEqual(c.value, 5, "invalid callable lambda")
def test_type_write_conditional_true(self): def test_type_write_conditional_true(self):
''' """
test when write is obligatory call write function @summary: test when write is obligatory call write function
''' """
class TestType(rdpy.network.type.Type): class TestType(rdpy.network.type.Type):
def __write__(self, s): def __write__(self, s):
raise Exception() raise Exception()
@@ -35,9 +36,9 @@ class TypeCase(unittest.TestCase):
@unittest.expectedFailure @unittest.expectedFailure
def test_type_write_conditional_false(self): def test_type_write_conditional_false(self):
''' """
test when write doesn't needed, doesn't call write function @summary: test when write doesn't needed, doesn't call write function
''' """
class TestType(rdpy.network.type.Type): class TestType(rdpy.network.type.Type):
def __write__(self, s): def __write__(self, s):
raise Exception() raise Exception()
@@ -45,9 +46,9 @@ class TypeCase(unittest.TestCase):
self.assertRaises(Exception, s.writeType, TestType(conditional = lambda:False)) self.assertRaises(Exception, s.writeType, TestType(conditional = lambda:False))
def test_type_read_conditional_true(self): def test_type_read_conditional_true(self):
''' """
test when read is obligatory call write function @summary: test when read is obligatory call write function
''' """
class TestType(rdpy.network.type.Type): class TestType(rdpy.network.type.Type):
def __read__(self, s): def __read__(self, s):
raise Exception() raise Exception()
@@ -56,9 +57,9 @@ class TypeCase(unittest.TestCase):
@unittest.expectedFailure @unittest.expectedFailure
def test_type_read_conditional_false(self): def test_type_read_conditional_false(self):
''' """
test when read doesn't needed, doesn't call read function @summary: test when read doesn't needed, doesn't call read function
''' """
class TestType(rdpy.network.type.Type): class TestType(rdpy.network.type.Type):
def __read__(self, s): def __read__(self, s):
raise Exception() raise Exception()
@@ -67,107 +68,107 @@ class TypeCase(unittest.TestCase):
def test_sizeof_conditional_true(self): def test_sizeof_conditional_true(self):
''' """
test if sizeof of simple type is init value(4) when type is conditional true @summary: test if sizeof of simple type is init value(4) when type is conditional true
''' """
v = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:True) v = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:True)
self.assertEqual(rdpy.network.type.sizeof(v), 4, "invalid sizeof") self.assertEqual(rdpy.network.type.sizeof(v), 4, "invalid sizeof")
def test_sizeof_conditional_false(self): def test_sizeof_conditional_false(self):
''' """
test if sizeof of simple type is 0 when type is conditional false @summary: test if sizeof of simple type is 0 when type is conditional false
''' """
v = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:False) v = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:False)
self.assertEqual(rdpy.network.type.sizeof(v), 0, "invalid sizeof") self.assertEqual(rdpy.network.type.sizeof(v), 0, "invalid sizeof")
def test_sizeof_list(self): def test_sizeof_list(self):
''' """
test call sizeof on list of type @summary: test call sizeof on list of type
''' """
v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le()] v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le()]
self.assertEqual(rdpy.network.type.sizeof(v), 7, "invalid sizeof") self.assertEqual(rdpy.network.type.sizeof(v), 7, "invalid sizeof")
def test_sizeof_list_conditional(self): def test_sizeof_list_conditional(self):
''' """
test call sizeof on list of type with one type hidden @summary: test call sizeof on list of type with one type hidden
''' """
v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(conditional = lambda:False), rdpy.network.type.UInt32Le()] v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(conditional = lambda:False), rdpy.network.type.UInt32Le()]
self.assertEqual(rdpy.network.type.sizeof(v), 5, "invalid sizeof") self.assertEqual(rdpy.network.type.sizeof(v), 5, "invalid sizeof")
def test_sizeof_tuple(self): def test_sizeof_tuple(self):
''' """
test call sizeof on tuple of type @summary: test call sizeof on tuple of type
''' """
v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le()] v = [rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le()]
self.assertEqual(rdpy.network.type.sizeof(v), 7, "invalid sizeof") self.assertEqual(rdpy.network.type.sizeof(v), 7, "invalid sizeof")
def test_sizeof_tuple_conditional(self): def test_sizeof_tuple_conditional(self):
''' """
test call sizeof on tuple of type with one type hidden @summary: test call sizeof on tuple of type with one type hidden
''' """
v = (rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le(conditional = lambda:False)) v = (rdpy.network.type.UInt8(), rdpy.network.type.UInt16Le(), rdpy.network.type.UInt32Le(conditional = lambda:False))
self.assertEqual(rdpy.network.type.sizeof(v), 3, "invalid sizeof") self.assertEqual(rdpy.network.type.sizeof(v), 3, "invalid sizeof")
def test_stream_write_uint8_type(self): def test_stream_write_uint8_type(self):
''' """
test write uint8 in stream @summary: test write uint8 in stream
''' """
s = rdpy.network.type.Stream() s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt8(1)) s.writeType(rdpy.network.type.UInt8(1))
self.assertEqual(''.join(s.buflist), '\x01', "invalid stream write") self.assertEqual(''.join(s.buflist), '\x01', "invalid stream write")
def test_stream_write_uint16Le_type(self): def test_stream_write_uint16Le_type(self):
''' """
test write UInt16Le in stream @summary: test write UInt16Le in stream
''' """
s = rdpy.network.type.Stream() s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt16Le(1)) s.writeType(rdpy.network.type.UInt16Le(1))
self.assertEqual(''.join(s.buflist), '\x01\x00', "invalid stream write") self.assertEqual(''.join(s.buflist), '\x01\x00', "invalid stream write")
def test_stream_write_uint16Be_type(self): def test_stream_write_uint16Be_type(self):
''' """
test write UInt16Be in stream @summary: test write UInt16Be in stream
''' """
s = rdpy.network.type.Stream() s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt16Be(1)) s.writeType(rdpy.network.type.UInt16Be(1))
self.assertEqual(''.join(s.buflist), '\x00\x01', "invalid stream write") self.assertEqual(''.join(s.buflist), '\x00\x01', "invalid stream write")
def test_stream_write_uint24Le_type(self): def test_stream_write_uint24Le_type(self):
''' """
test write UInt24Le in stream @summary: test write UInt24Le in stream
''' """
s = rdpy.network.type.Stream() s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt24Le(1)) s.writeType(rdpy.network.type.UInt24Le(1))
self.assertEqual(''.join(s.buflist), '\x01\x00\x00', "invalid stream write") self.assertEqual(''.join(s.buflist), '\x01\x00\x00', "invalid stream write")
def test_stream_write_uint24Be_type(self): def test_stream_write_uint24Be_type(self):
''' """
test write uint24Be in stream @summary: test write uint24Be in stream
''' """
s = rdpy.network.type.Stream() s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt24Be(1)) s.writeType(rdpy.network.type.UInt24Be(1))
self.assertEqual(''.join(s.buflist), '\x00\x00\x01', "invalid stream write") self.assertEqual(''.join(s.buflist), '\x00\x00\x01', "invalid stream write")
def test_stream_write_uint32Le_type(self): def test_stream_write_uint32Le_type(self):
''' """
test write UInt32Le in stream @summary: test write UInt32Le in stream
''' """
s = rdpy.network.type.Stream() s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt32Le(1)) s.writeType(rdpy.network.type.UInt32Le(1))
self.assertEqual(''.join(s.buflist), '\x01\x00\x00\x00', "invalid stream write") self.assertEqual(''.join(s.buflist), '\x01\x00\x00\x00', "invalid stream write")
def test_stream_write_uint32Be_type(self): def test_stream_write_uint32Be_type(self):
''' """
test write UInt32Be in stream @summary: test write UInt32Be in stream
''' """
s = rdpy.network.type.Stream() s = rdpy.network.type.Stream()
s.writeType(rdpy.network.type.UInt32Be(1)) s.writeType(rdpy.network.type.UInt32Be(1))
self.assertEqual(''.join(s.buflist), '\x00\x00\x00\x01', "invalid stream write") self.assertEqual(''.join(s.buflist), '\x00\x00\x00\x01', "invalid stream write")
def test_stream_read_uint8_type(self): def test_stream_read_uint8_type(self):
''' """
test read UInt8 type from stream @summary: test read UInt8 type from stream
''' """
s = rdpy.network.type.Stream('\x01') s = rdpy.network.type.Stream('\x01')
t = rdpy.network.type.UInt8() t = rdpy.network.type.UInt8()
s.readType(t) s.readType(t)
@@ -175,9 +176,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(s.dataLen(), 0, "not read all stream") self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint16Le_type(self): def test_stream_read_uint16Le_type(self):
''' """
test read UInt16Le type from stream @summary: test read UInt16Le type from stream
''' """
s = rdpy.network.type.Stream('\x01\x00') s = rdpy.network.type.Stream('\x01\x00')
t = rdpy.network.type.UInt16Le() t = rdpy.network.type.UInt16Le()
s.readType(t) s.readType(t)
@@ -185,9 +186,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(s.dataLen(), 0, "not read all stream") self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint16Be_type(self): def test_stream_read_uint16Be_type(self):
''' """
test read UInt16Be type from stream @summary: test read UInt16Be type from stream
''' """
s = rdpy.network.type.Stream('\x00\x01') s = rdpy.network.type.Stream('\x00\x01')
t = rdpy.network.type.UInt16Be() t = rdpy.network.type.UInt16Be()
s.readType(t) s.readType(t)
@@ -195,9 +196,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(s.dataLen(), 0, "not read all stream") self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint24Le_type(self): def test_stream_read_uint24Le_type(self):
''' """
test read UInt24Le type from stream @summary: test read UInt24Le type from stream
''' """
s = rdpy.network.type.Stream('\x01\x00\x00') s = rdpy.network.type.Stream('\x01\x00\x00')
t = rdpy.network.type.UInt24Le() t = rdpy.network.type.UInt24Le()
s.readType(t) s.readType(t)
@@ -205,9 +206,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(s.dataLen(), 0, "not read all stream") self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint24Be_type(self): def test_stream_read_uint24Be_type(self):
''' """
test read UInt24Be type from stream @summary: test read UInt24Be type from stream
''' """
s = rdpy.network.type.Stream('\x00\x00\x01') s = rdpy.network.type.Stream('\x00\x00\x01')
t = rdpy.network.type.UInt24Be() t = rdpy.network.type.UInt24Be()
s.readType(t) s.readType(t)
@@ -215,9 +216,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(s.dataLen(), 0, "not read all stream") self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint32Le_type(self): def test_stream_read_uint32Le_type(self):
''' """
test read UInt32Le type from stream @summary: test read UInt32Le type from stream
''' """
s = rdpy.network.type.Stream('\x01\x00\x00\x00') s = rdpy.network.type.Stream('\x01\x00\x00\x00')
t = rdpy.network.type.UInt32Le() t = rdpy.network.type.UInt32Le()
s.readType(t) s.readType(t)
@@ -225,9 +226,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(s.dataLen(), 0, "not read all stream") self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_uint32Be_type(self): def test_stream_read_uint32Be_type(self):
''' """
test read UInt32Be type from stream @summary: test read UInt32Be type from stream
''' """
s = rdpy.network.type.Stream('\x00\x00\x00\x01') s = rdpy.network.type.Stream('\x00\x00\x00\x01')
t = rdpy.network.type.UInt32Be() t = rdpy.network.type.UInt32Be()
s.readType(t) s.readType(t)
@@ -235,9 +236,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(s.dataLen(), 0, "not read all stream") self.assertEqual(s.dataLen(), 0, "not read all stream")
def test_stream_read_optional_singletype(self): def test_stream_read_optional_singletype(self):
''' """
test optional option in case of simple type reading @summary: test optional option in case of simple type reading
''' """
#unsigned int case #unsigned int case
t = rdpy.network.type.SimpleType("I", 4, False, 0, optional = True) t = rdpy.network.type.SimpleType("I", 4, False, 0, optional = True)
#empty stream #empty stream
@@ -246,9 +247,9 @@ class TypeCase(unittest.TestCase):
self.assertEqual(t.value, 0, "invalid stream read optional value") self.assertEqual(t.value, 0, "invalid stream read optional value")
def test_stream_read_conditional_singletype_false(self): def test_stream_read_conditional_singletype_false(self):
''' """
test conditional option in case of simple type reading and when condition is false (not read) @summary: test conditional option in case of simple type reading and when condition is false (not read)
''' """
#unsigned int case #unsigned int case
t = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:False) t = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:False)
s1 = rdpy.network.type.Stream("\x01\x00\x00\x00") s1 = rdpy.network.type.Stream("\x01\x00\x00\x00")
@@ -256,19 +257,19 @@ class TypeCase(unittest.TestCase):
self.assertEqual(t.value, 0, "invalid stream read conditional value") self.assertEqual(t.value, 0, "invalid stream read conditional value")
def test_stream_read_conditional_singletype_true(self): def test_stream_read_conditional_singletype_true(self):
''' """
test conditional option in case of simple type reading and when condition is true (must be read) @summary: test conditional option in case of simple type reading and when condition is true (must be read)
''' """
#unsigned int case #unsigned int case
t = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:True) t = rdpy.network.type.SimpleType("I", 4, False, 0, conditional = lambda:True)
s1 = rdpy.network.type.Stream("\x01\x00\x00\x00") s1 = rdpy.network.type.Stream("\x01\x00\x00\x00")
s1.readType(t) s1.readType(t)
self.assertEqual(t.value, 1, "invalid stream read conditional value") self.assertEqual(t.value, 1, "invalid stream read conditional value")
def test_strem_read_rollback_constant_constraint(self): def test_stream_read_rollback_constant_constraint(self):
''' """
test if constant constraint fail, the reading stream is correctly rollback @summary: test if constant constraint fail, the reading stream is correctly rollback
''' """
class TestComposite(rdpy.network.type.CompositeType): class TestComposite(rdpy.network.type.CompositeType):
def __init__(self): def __init__(self):
rdpy.network.type.CompositeType.__init__(self) rdpy.network.type.CompositeType.__init__(self)
@@ -283,11 +284,11 @@ class TypeCase(unittest.TestCase):
return return
self.assertTrue(False, "Constant constraint fail") self.assertTrue(False, "Constant constraint fail")
def test_strem_read_rollback_constant_constraint_recurcive(self): def test_stream_read_rollback_constant_constraint_recurcive(self):
''' """
test if constant constraint fail even in recurcive composite type, @summary: test if constant constraint fail even in recurcive composite type,
the reading stream is correctly rollback the reading stream is correctly rollback
''' """
class TestSubComposite(rdpy.network.type.CompositeType): class TestSubComposite(rdpy.network.type.CompositeType):
def __init__(self): def __init__(self):
rdpy.network.type.CompositeType.__init__(self) rdpy.network.type.CompositeType.__init__(self)
@@ -309,10 +310,10 @@ class TypeCase(unittest.TestCase):
self.assertTrue(False, "Constant constraint fail") self.assertTrue(False, "Constant constraint fail")
def test_stream_read_rollback_not_enough_data(self): def test_stream_read_rollback_not_enough_data(self):
''' """
test if constant constraint fail even in recurcive composite type, @summary: test if constant constraint fail even in recurcive composite type,
the reading stream is correctly rollback the reading stream is correctly rollback
''' """
class TestSubComposite(rdpy.network.type.CompositeType): class TestSubComposite(rdpy.network.type.CompositeType):
def __init__(self): def __init__(self):
rdpy.network.type.CompositeType.__init__(self) rdpy.network.type.CompositeType.__init__(self)
@@ -331,4 +332,36 @@ class TypeCase(unittest.TestCase):
except Exception: except Exception:
self.assertEqual(s.readLen(), 0, "invalid stream roll back operation") self.assertEqual(s.readLen(), 0, "invalid stream roll back operation")
return return
self.assertTrue(False, "Constant constraint fail") self.assertTrue(False, "Constant constraint fail")
def test_stream_read_with_static_length_superior(self):
"""
@summary: test read stream with a length forced
if total stream read length < to forced read length
the trash must be read as padding
"""
class TestReadLength(rdpy.network.type.CompositeType):
def __init__(self, readLen):
rdpy.network.type.CompositeType.__init__(self, readLen = readLen)
self.padding = rdpy.network.type.UInt32Le(0)
s = rdpy.network.type.Stream("\x00" * 10)
s.readType(TestReadLength(rdpy.network.type.UInt8(10)))
self.assertEqual(s.dataLen(), 0, "invalid stream read trash data as padding")
def test_stream_read_with_static_length_inferior(self):
"""
@summary: test read stream with a length forced
if total stream read length > to forced read length
an InvalidSize exception is throw
"""
class TestReadLength(rdpy.network.type.CompositeType):
def __init__(self, readLen):
rdpy.network.type.CompositeType.__init__(self, readLen = readLen)
self.padding = rdpy.network.type.UInt32Le(0)
s = rdpy.network.type.Stream("\x00" * 10)
self.assertRaises(InvalidSize, s.readType, TestReadLength(rdpy.network.type.UInt8(2)))
def test_stream_read_string(self):
"""
@summary: read stream as string buffer
"""