Files
Typo3Scan/lib/thread_pool.py
Jan Rude ec592fafb5 v0.4.5.1
2017-02-09 16:32:26 +01:00

132 lines
3.8 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# Typo3 Enumerator - Automatic Typo3 Enumeration Tool
# Copyright (c) 2014-2017 Jan Rude
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/)
#-------------------------------------------------------------------------------
import threading
from queue import Queue
class ThreadPoolSentinel:
pass
class ThreadPool:
"""
Generic Thread Pool used for searching extensions and changelog/readme.
Any found extension or changelog/readme goes to the result queue:
work_queue: Working queue
result_queue: Result queue
active_threads: Number of active threads
thread_list: List of worker threads
"""
def __init__(self):
self.__work_queue = Queue()
self.__result_queue = Queue()
self.__active_threads = 0
self.__thread_list = []
def add_job(self, job):
# Load job in queue
self.__work_queue.put(job)
def get_result(self):
active_threads = self.__active_threads
while (active_threads) or (not self.__result_queue.empty()):
result = self.__result_queue.get()
if isinstance(result, ThreadPoolSentinel): # One thread was done
active_threads -= 1
self.__result_queue.task_done()
continue
else: # Getting an actual result
self.__result_queue.task_done()
yield result
def start(self, threads, version_search=False):
if self.__active_threads:
raise Exception('Threads already started.')
if not version_search:
# Create thread pool
for _ in range(threads):
worker = threading.Thread(
target=_work_function,
args=(self.__work_queue, self.__result_queue))
worker.start()
self.__thread_list.append(worker)
self.__active_threads += 1
else:
for _ in range(threads):
worker = threading.Thread(
target=_work_function_version,
args=(self.__work_queue, self.__result_queue))
worker.start()
self.__thread_list.append(worker)
self.__active_threads += 1
# Put sentinels to let the threads know when there's no more jobs
[self.__work_queue.put(ThreadPoolSentinel()) for worker in self.__thread_list]
def join(self): # Clean exit
self.__work_queue.join()
[worker.join() for worker in self.__thread_list]
self.__active_threads = 0
self.__result_queue.join()
def _work_function(job_q, result_q):
"""Work function expected to run within threads."""
while True:
job = job_q.get()
if isinstance(job, ThreadPoolSentinel): # All the work is done, get out
result_q.put(ThreadPoolSentinel())
job_q.task_done()
break
function = job[0]
args = job[1]
try:
result = function(*args)
except Exception as e:
print(e)
else:
if result == ('301' or '200' or '403'):
result_q.put((job))
finally:
job_q.task_done()
def _work_function_version(job_q, result_q):
"""Work function expected to run within threads."""
while True:
job = job_q.get()
if isinstance(job, ThreadPoolSentinel): # All the work is done, get out
result_q.put(ThreadPoolSentinel())
job_q.task_done()
break
function = job[0]
args = job[1]
try:
result = function(*args)
except Exception as e:
print(e)
else:
if result == ('200'):
result_q.put((job))
finally:
job_q.task_done()