finish ber functions
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
'''
|
'''
|
||||||
@author: sylvain
|
@author: sylvain
|
||||||
'''
|
'''
|
||||||
from rdpy.protocol.network.type import UInt8, UInt16Be
|
from rdpy.protocol.network.type import UInt8, UInt16Be, UInt32Be, String
|
||||||
from rdpy.utils.const import ConstAttributes
|
from rdpy.utils.const import ConstAttributes
|
||||||
from rdpy.protocol.network.error import InvalidExpectedDataException
|
from rdpy.protocol.network.error import InvalidExpectedDataException
|
||||||
|
|
||||||
@@ -35,6 +35,8 @@ def berPC(pc):
|
|||||||
'''
|
'''
|
||||||
return BER_CONSTRUCT if true
|
return BER_CONSTRUCT if true
|
||||||
BER_PRIMITIVE if false
|
BER_PRIMITIVE if false
|
||||||
|
@param pc: boolean
|
||||||
|
@return: BerPc value
|
||||||
'''
|
'''
|
||||||
if pc:
|
if pc:
|
||||||
return BerPc.BER_CONSTRUCT
|
return BerPc.BER_CONSTRUCT
|
||||||
@@ -45,6 +47,8 @@ def readLength(s):
|
|||||||
'''
|
'''
|
||||||
read length of ber structure
|
read length of ber structure
|
||||||
length be on 1 2 or 3 bytes
|
length be on 1 2 or 3 bytes
|
||||||
|
@param s: stream
|
||||||
|
@return: Uint8 or UInt16Be length
|
||||||
'''
|
'''
|
||||||
size = None
|
size = None
|
||||||
byte = UInt8()
|
byte = UInt8()
|
||||||
@@ -65,15 +69,20 @@ def readLength(s):
|
|||||||
def writeLength(size):
|
def writeLength(size):
|
||||||
'''
|
'''
|
||||||
return strcture length as expected in Ber specification
|
return strcture length as expected in Ber specification
|
||||||
|
@param size: UInt8 or UInt16Be size to write
|
||||||
|
@return: UInt8 or (UInt8(0x82), UInt16Be)
|
||||||
'''
|
'''
|
||||||
if size > UInt16Be(0x7f):
|
if size > UInt16Be(0x7f):
|
||||||
return (UInt8(0x82), size)
|
return (UInt8(0x82), UInt16Be(size.value))
|
||||||
else:
|
else:
|
||||||
return UInt8(size.value)
|
return UInt8(size.value)
|
||||||
|
|
||||||
def readUniversalTag(s, tag, pc):
|
def readUniversalTag(s, tag, pc):
|
||||||
'''
|
'''
|
||||||
read tag of ber packet
|
read tag of ber packet
|
||||||
|
@param tag: Tag class attributes
|
||||||
|
@param pc: boolean
|
||||||
|
@return: true if tag is correctly read
|
||||||
'''
|
'''
|
||||||
byte = UInt8()
|
byte = UInt8()
|
||||||
s.readType(byte)
|
s.readType(byte)
|
||||||
@@ -82,12 +91,18 @@ def readUniversalTag(s, tag, pc):
|
|||||||
def writeUniversalTag(tag, pc):
|
def writeUniversalTag(tag, pc):
|
||||||
'''
|
'''
|
||||||
return universal tag byte
|
return universal tag byte
|
||||||
|
@param tag: tag class attributes
|
||||||
|
@param pc: boolean
|
||||||
|
@return: UInt8
|
||||||
'''
|
'''
|
||||||
return (Class.BER_CLASS_UNIV | berPC(pc) | (Tag.BER_TAG_MASK & tag))
|
return (Class.BER_CLASS_UNIV | berPC(pc) | (Tag.BER_TAG_MASK & tag))
|
||||||
|
|
||||||
def readApplicationTag(s, tag):
|
def readApplicationTag(s, tag):
|
||||||
'''
|
'''
|
||||||
read application tag
|
read application tag
|
||||||
|
@param s: stream
|
||||||
|
@param tag: tag class attributes
|
||||||
|
@return: true if tag is read with success
|
||||||
'''
|
'''
|
||||||
byte = UInt8()
|
byte = UInt8()
|
||||||
s.readType(byte)
|
s.readType(byte)
|
||||||
@@ -106,6 +121,8 @@ def readApplicationTag(s, tag):
|
|||||||
def writeApplicationTag(tag, size):
|
def writeApplicationTag(tag, size):
|
||||||
'''
|
'''
|
||||||
return struct that represent ber application tag
|
return struct that represent ber application tag
|
||||||
|
@param tag: tag class attribute
|
||||||
|
@param size: size to rest of packet
|
||||||
'''
|
'''
|
||||||
if tag > UInt8(30):
|
if tag > UInt8(30):
|
||||||
return (((Class.BER_CLASS_APPL | BerPc.BER_CONSTRUCT) | Tag.BER_TAG_MASK), tag, writeLength(size))
|
return (((Class.BER_CLASS_APPL | BerPc.BER_CONSTRUCT) | Tag.BER_TAG_MASK), tag, writeLength(size))
|
||||||
@@ -115,6 +132,8 @@ def writeApplicationTag(tag, size):
|
|||||||
def readBoolean(s):
|
def readBoolean(s):
|
||||||
'''
|
'''
|
||||||
return boolean
|
return boolean
|
||||||
|
@param s: stream
|
||||||
|
@return: boolean
|
||||||
'''
|
'''
|
||||||
if not readUniversalTag(s, Tag.BER_TAG_BOOLEAN, False):
|
if not readUniversalTag(s, Tag.BER_TAG_BOOLEAN, False):
|
||||||
raise InvalidExpectedDataException("bad boolean tag")
|
raise InvalidExpectedDataException("bad boolean tag")
|
||||||
@@ -128,5 +147,86 @@ def readBoolean(s):
|
|||||||
def writeBoolean(b):
|
def writeBoolean(b):
|
||||||
'''
|
'''
|
||||||
return structure that represent boolean in ber specification
|
return structure that represent boolean in ber specification
|
||||||
|
@param b: boolean
|
||||||
|
@return: ber boolean structure
|
||||||
'''
|
'''
|
||||||
return (writeUniversalTag(Tag.BER_TAG_BOOLEAN, False), writeLength(UInt8(1)), UInt8(int(b)))
|
return (writeUniversalTag(Tag.BER_TAG_BOOLEAN, False), writeLength(UInt8(1)), UInt8(int(b)))
|
||||||
|
|
||||||
|
def readInteger(s):
|
||||||
|
'''
|
||||||
|
read integer structure from stream
|
||||||
|
@param s: stream
|
||||||
|
@return: UInt8, UInt16Be, UInt32Be
|
||||||
|
'''
|
||||||
|
if not readUniversalTag(s, Tag.BER_TAG_INTEGER, False):
|
||||||
|
raise InvalidExpectedDataException("bad integer tag")
|
||||||
|
|
||||||
|
size = readLength(s)
|
||||||
|
|
||||||
|
if size == UInt16Be(1):
|
||||||
|
integer = UInt8()
|
||||||
|
s.readType(integer)
|
||||||
|
return integer
|
||||||
|
elif size == UInt16Be(2):
|
||||||
|
integer = UInt16Be()
|
||||||
|
s.readType(integer)
|
||||||
|
return integer
|
||||||
|
elif size == UInt16Be(3):
|
||||||
|
integer1 = UInt8()
|
||||||
|
integer2 = UInt16Be()
|
||||||
|
s.readType(integer1)
|
||||||
|
s.readType(integer2)
|
||||||
|
return UInt32Be(integer2.value + (integer1.value << 16))
|
||||||
|
elif size == UInt16Be(4):
|
||||||
|
integer = UInt32Be()
|
||||||
|
s.readType(integer)
|
||||||
|
return integer
|
||||||
|
else:
|
||||||
|
raise InvalidExpectedDataException("wrong integer size")
|
||||||
|
|
||||||
|
def writeInteger(value):
|
||||||
|
'''
|
||||||
|
write integer value
|
||||||
|
@param param: UInt32Be
|
||||||
|
@return ber interger structure
|
||||||
|
'''
|
||||||
|
if value < UInt32Be(0xff):
|
||||||
|
return (writeUniversalTag(Tag.BER_TAG_INTEGER, False), writeLength(UInt8(1)), UInt8(value.value))
|
||||||
|
elif value < UInt32Be(0xff80):
|
||||||
|
return (writeUniversalTag(Tag.BER_TAG_INTEGER, False), writeLength(UInt8(2)), UInt16Be(value.value))
|
||||||
|
else:
|
||||||
|
return (writeUniversalTag(Tag.BER_TAG_INTEGER, False), writeLength(UInt8(4)), UInt32Be(value.value))
|
||||||
|
|
||||||
|
def readOctetString(s):
|
||||||
|
'''
|
||||||
|
read ber string structure
|
||||||
|
@param s: stream
|
||||||
|
@return: String
|
||||||
|
'''
|
||||||
|
if not readUniversalTag(s, Tag.BER_TAG_OCTET_STRING, False):
|
||||||
|
raise InvalidExpectedDataException("unexpected ber tag")
|
||||||
|
size = readLength(s)
|
||||||
|
return String(s.read(size.value))
|
||||||
|
|
||||||
|
def writeOctetstring(value):
|
||||||
|
'''
|
||||||
|
write string in ber representation
|
||||||
|
@param value: String
|
||||||
|
@return: string ber structure
|
||||||
|
'''
|
||||||
|
return (writeUniversalTag(Tag.BER_TAG_OCTET_STRING, False), writeLength(UInt32Be(len(value.value))), value)
|
||||||
|
|
||||||
|
def readEnumerated(s):
|
||||||
|
'''
|
||||||
|
read enumerated structure
|
||||||
|
@param s: Stream
|
||||||
|
@return: UInt8 that represent ber enumerate
|
||||||
|
'''
|
||||||
|
if not readUniversalTag(s, Tag.BER_TAG_ENUMERATED, False):
|
||||||
|
raise InvalidExpectedDataException("invalid ber tag")
|
||||||
|
size = readLength(s)
|
||||||
|
if size != UInt32Be(1):
|
||||||
|
raise InvalidExpectedDataException("enumerate size is wrong")
|
||||||
|
enumer = UInt8()
|
||||||
|
s.readType(enumer)
|
||||||
|
return enumer
|
||||||
|
|||||||
Reference in New Issue
Block a user