v0.5
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#-------------------------------------------------------------------------------
|
||||
# Typo3 Enumerator - Automatic Typo3 Enumeration Tool
|
||||
# Copyright (c) 2014-2017 Jan Rude
|
||||
# Copyright (c) 2014-2020 Jan Rude
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
@@ -15,86 +15,102 @@
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/)
|
||||
# along with this program. If not, see [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/)
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
import os.path
|
||||
import json
|
||||
from lib.request import Request
|
||||
from lib.output import Output
|
||||
import sqlite3
|
||||
from colorama import Fore
|
||||
import lib.request as request
|
||||
from lib.thread_pool import ThreadPool
|
||||
from pkg_resources import parse_version
|
||||
|
||||
class Extensions:
|
||||
"""
|
||||
Extension class
|
||||
"""
|
||||
def __init__(self, ext_state, top, path):
|
||||
self.__ext_state = ext_state
|
||||
self.__top = top
|
||||
self.__path = path
|
||||
"""
|
||||
Extension class
|
||||
"""
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def load_extensions(self):
|
||||
"""
|
||||
This method loads the defined extensions from the extension file.
|
||||
IF the extension file is not found, an error is raised.
|
||||
"""
|
||||
extensions = []
|
||||
for state in self.__ext_state:
|
||||
ext_file = state + '_extensions'
|
||||
if not os.path.isfile(os.path.join(self.__path, 'extensions', ext_file)):
|
||||
raise Exception("\n\nCould not find extension file " + ext_file + '!\nTry --update')
|
||||
def search_extension(self, domain, extensions, threads):
|
||||
"""
|
||||
This method loads the extensions from the database and searches for installed extensions.
|
||||
/typo3conf/ext/: Local installation path. This is where extensions usually get installed.
|
||||
/typo3/ext/: Global installation path (not used atm)
|
||||
/typo3/sysext/: Extensions shipped with core
|
||||
"""
|
||||
found_extensions = {}
|
||||
thread_pool = ThreadPool()
|
||||
for ext in extensions:
|
||||
thread_pool.add_job((request.head_request, ('{}/typo3conf/ext/{}/'.format(domain, ext))))
|
||||
thread_pool.add_job((request.head_request, ('{}/typo3/sysext/{}/'.format(domain, ext))))
|
||||
#thread_pool.add_job((request.head_request, ('{}/typo3/ext/{}/'.format(domain, ext))))
|
||||
thread_pool.start(threads)
|
||||
|
||||
with open(os.path.join(self.__path, 'extensions', ext_file), 'r') as f:
|
||||
count = 0
|
||||
for extension in f:
|
||||
if not(self.__top is None):
|
||||
if count < self.__top:
|
||||
extensions.append(extension.split('\n')[0])
|
||||
count += 1
|
||||
else:
|
||||
extensions.append(extension.split('\n')[0])
|
||||
f.close()
|
||||
return extensions
|
||||
for installed_extension in thread_pool.get_result():
|
||||
name = installed_extension[1][:-1]
|
||||
name = name[name.rfind('/')+1:]
|
||||
found_extensions[name] = {'url':installed_extension[1], 'version': None, 'file': None}
|
||||
return found_extensions
|
||||
|
||||
def search_extension(self, domain, extensions):
|
||||
"""
|
||||
This method searches for installed extensions.
|
||||
/typo3conf/ext/: Local installation path. This is where extensions get usually installed.
|
||||
/typo3/ext/: Global installation path (not used atm)
|
||||
/typo3/sysext/: Extensions shipped with core (not used atm)
|
||||
"""
|
||||
config = json.load(open(os.path.join(self.__path, 'lib', 'config.json')))
|
||||
thread_pool = ThreadPool()
|
||||
for ext in extensions:
|
||||
thread_pool.add_job((Request.head_request, (domain.get_name(), '/typo3conf/ext/' + ext)))
|
||||
#thread_pool.add_job((Request.head_request, (domain.get_name(), '/typo3/ext/' + ext)))
|
||||
#thread_pool.add_job((Request.head_request, (domain.get_name(), '/typo3/sysext/' + ext)))
|
||||
thread_pool.start(config['threads'])
|
||||
def search_ext_version(self, found_extensions, threads):
|
||||
"""
|
||||
This method adds a job for every installed extension.
|
||||
The goal is to find a file with version information.
|
||||
"""
|
||||
thread_pool = ThreadPool()
|
||||
for extension,values in found_extensions.items():
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'Documentation/ChangeLog/Index.rst', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'Documentation/Settings.cfg', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'Documentation/Settings.yml', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'Settings.yml', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'Documentation/Index.rst', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'composer.json', '(?:"dev-master":|"version":)\s?"([0-9]+\.[0-9]+\.[0-9x][0-9x]?)')))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'Index.rst', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'ChangeLog', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'CHANGELOG.md', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'ChangeLog.txt', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'Readme.txt', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'README.md', None)))
|
||||
thread_pool.add_job((request.version_information, (values['url'] + 'README.rst', None)))
|
||||
|
||||
thread_pool.start(threads, version_search=True)
|
||||
|
||||
for installed_extension in thread_pool.get_result():
|
||||
domain.set_installed_extensions(installed_extension[1][1])
|
||||
for version_path in thread_pool.get_result():
|
||||
path = version_path[0][0]
|
||||
version = version_path[1]
|
||||
name = version_path[0][0]
|
||||
if 'Documentation/' in name:
|
||||
name = name[:name.rfind('Documentation/')+1]
|
||||
name = name[name.find('ext/')+4:name.rfind('/')]
|
||||
found_extensions[name]['version'] = version
|
||||
found_extensions[name]['file'] = path
|
||||
return found_extensions
|
||||
|
||||
def search_ext_version(self, domain, extension_dict):
|
||||
"""
|
||||
This method adds a job for every installed extension.
|
||||
The goal is to find a ChangeLog or Readme in order to determine the version.
|
||||
"""
|
||||
config = json.load(open('lib/config.json'))
|
||||
thread_pool = ThreadPool()
|
||||
for extension_path in extension_dict:
|
||||
thread_pool.add_job((Request.head_request, (domain.get_name(), extension_path + '/ChangeLog')))
|
||||
thread_pool.add_job((Request.head_request, (domain.get_name(), extension_path + '/ChangeLog.txt')))
|
||||
thread_pool.add_job((Request.head_request, (domain.get_name(), extension_path + '/Readme.txt')))
|
||||
thread_pool.add_job((Request.head_request, (domain.get_name(), extension_path + '/README.md')))
|
||||
thread_pool.add_job((Request.head_request, (domain.get_name(), extension_path + '/README.rst')))
|
||||
|
||||
thread_pool.start(config['threads'], True)
|
||||
|
||||
for changelog_path in thread_pool.get_result():
|
||||
ext, path = self.parse_extension(changelog_path)
|
||||
domain.set_installed_extensions_version(path, ext[4])
|
||||
|
||||
def parse_extension(self, path):
|
||||
ext = (path[1][1]).split('/')
|
||||
path = '/' + ext[1] + '/' + ext[2] + '/' + ext[3]
|
||||
return (ext, path)
|
||||
def output(self, extension_dict, database):
|
||||
conn = sqlite3.connect(database)
|
||||
c = conn.cursor()
|
||||
print('\n\n [+] Extension information\n \\')
|
||||
for extension,info in extension_dict.items():
|
||||
c.execute('SELECT title FROM extensions where extensionkey=?', (extension,))
|
||||
title = c.fetchone()[0]
|
||||
print(' [+] Name: {}'.format(Fore.GREEN + extension + Fore.RESET))
|
||||
print(' \u251c Title: {}'.format(title))
|
||||
if info['version']:
|
||||
c.execute('SELECT advisory, vulnerability, affected_version_max, affected_version_min FROM extension_vulns WHERE (extensionkey=? AND ?<=affected_version_max AND ?>=affected_version_min)', (extension, info['version'], info['version'],))
|
||||
data = c.fetchall()
|
||||
print(' \u251c Version: {}'.format(Fore.GREEN + info['version'] + Fore.RESET))
|
||||
if data:
|
||||
print(' \u251c see: {}'.format(info['file']))
|
||||
print(' \u2514 Known vulnerabilities\n \\')
|
||||
for vuln in data:
|
||||
if parse_version(info['version']) <= parse_version(vuln[2]):
|
||||
print(' [!] {}'.format(Fore.RED + vuln[0] + Fore.RESET))
|
||||
print(' \u251c Vulnerability Type:'.ljust(29), vuln[1])
|
||||
print(' \u2514 Affected Versions:'.ljust(29), '{} - {}'.format(vuln[2], vuln[3]))
|
||||
else:
|
||||
print(' \u2514 see: {}'.format(info['file']))
|
||||
else:
|
||||
print(' \u2514 Version: -unknown-')
|
||||
print()
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user