From 1dd70221c89aaf13b6e22bec585180fc15e2d12e Mon Sep 17 00:00:00 2001 From: cr0hn Date: Fri, 19 Feb 2016 01:40:54 +0100 Subject: [PATCH] add: process injection features add: remove remote processes fix: minor fixes and reformats --- .idea/workspace.xml | 505 +++++++----------- enteletaor_lib/modules/proc/__init__.py | 14 +- enteletaor_lib/modules/proc/cmd_actions.py | 11 + .../modules/proc/proc_inject_process.py | 76 +++ .../modules/proc/proc_list_process.py | 86 +-- enteletaor_lib/modules/proc/proc_remove.py | 26 + enteletaor_lib/modules/proc/utils.py | 87 ++- 7 files changed, 437 insertions(+), 368 deletions(-) create mode 100644 enteletaor_lib/modules/proc/proc_inject_process.py create mode 100644 enteletaor_lib/modules/proc/proc_remove.py diff --git a/.idea/workspace.xml b/.idea/workspace.xml index ff6aebf..089c888 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -1,9 +1,14 @@ - + + + + + + @@ -28,6 +33,7 @@ + @@ -39,15 +45,20 @@ - - - + + + - - + + - - + + + + + + + @@ -71,9 +82,6 @@ @@ -133,8 +144,8 @@ @@ -161,6 +172,8 @@ + + @@ -211,6 +224,28 @@ + + + + + + + + + + + + - - @@ -249,13 +282,6 @@ - - - - - - - @@ -263,8 +289,15 @@ + + + + + + + - + - + + + @@ -609,7 +660,8 @@ - + + @@ -685,27 +737,33 @@ - + + - - - - - - + + + + - + + @@ -734,7 +792,8 @@ - @@ -744,11 +803,6 @@ 29 @@ -769,61 +823,10 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -831,7 +834,6 @@ - @@ -843,22 +845,10 @@ - - - - - - - - - - - - @@ -882,7 +872,6 @@ - @@ -890,7 +879,6 @@ - @@ -898,10 +886,6 @@ - - - - @@ -909,10 +893,6 @@ - - - - @@ -920,7 +900,6 @@ - @@ -928,9 +907,6 @@ - - - @@ -938,7 +914,6 @@ - @@ -946,21 +921,6 @@ - - - - - - - - - - - - - - - @@ -968,9 +928,6 @@ - - - @@ -1021,104 +978,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1126,7 +985,6 @@ - @@ -1134,11 +992,6 @@ - - - - - @@ -1156,16 +1009,6 @@ - - - - - - - - - - @@ -1207,17 +1050,6 @@ - - - - - - - - - - - @@ -1245,16 +1077,6 @@ - - - - - - - - - - @@ -1268,8 +1090,8 @@ - - + + @@ -1279,9 +1101,9 @@ - + - + @@ -1291,12 +1113,11 @@ - + - - - - + + + @@ -1314,49 +1135,137 @@ - + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - + - - + + - + - - + + + + + + + + + + + + + + + + + + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/enteletaor_lib/modules/proc/__init__.py b/enteletaor_lib/modules/proc/__init__.py index ee22d10..e1653c9 100644 --- a/enteletaor_lib/modules/proc/__init__.py +++ b/enteletaor_lib/modules/proc/__init__.py @@ -7,9 +7,11 @@ from modules import IModule from libs.core.structs import CommonData from libs.core.models import IntegerField, StringField, SelectField -from .cmd_actions import parser_proc_raw_dump, parser_proc_list_process +from .cmd_actions import parser_proc_raw_dump, parser_proc_list_process, parser_proc_inject_process +from .proc_remove import action_proc_remove from .proc_raw_dump import action_proc_raw_dump from .proc_list_process import action_proc_list_process +from .proc_inject_process import action_proc_inject_process log = logging.getLogger() @@ -44,6 +46,16 @@ class RemoteProcessModule(IModule): cmd_args=parser_proc_list_process, action=action_proc_list_process ), + 'inject': dict( + help="list remote process and their params", + cmd_args=parser_proc_inject_process, + action=action_proc_inject_process + ), + 'remove': dict( + help="remove remote processes in server", + cmd_args=None, + action=action_proc_remove + ), } name = "proc" diff --git a/enteletaor_lib/modules/proc/cmd_actions.py b/enteletaor_lib/modules/proc/cmd_actions.py index db05f66..1a2286e 100644 --- a/enteletaor_lib/modules/proc/cmd_actions.py +++ b/enteletaor_lib/modules/proc/cmd_actions.py @@ -17,9 +17,20 @@ def parser_proc_raw_dump(parser): # ---------------------------------------------------------------------- def parser_proc_list_process(parser): + parser.add_argument("-N", "--no-stream", dest="no_stream", action="store_true", default=False, + help="force to not listen until message is received") + gr = parser.add_argument_group("process exporting options") gr.add_argument("-T", "--make-template", dest="template", type=str, help="export process as a JSON template format, ready to make injections") gr.add_argument("-F", "--function-name", dest="function_name", type=str, help="only export this function name") + + +# ---------------------------------------------------------------------- +def parser_proc_inject_process(parser): + gr = parser.add_argument_group("process importing options") + + gr.add_argument("-f", "--function-file", dest="function_files", type=str, required=True, + help="import process info from JSON file") diff --git a/enteletaor_lib/modules/proc/proc_inject_process.py b/enteletaor_lib/modules/proc/proc_inject_process.py new file mode 100644 index 0000000..9b8ced9 --- /dev/null +++ b/enteletaor_lib/modules/proc/proc_inject_process.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- + + +import six +import uuid +import json +import logging + +from kombu import Connection +from collections import OrderedDict + + +log = logging.getLogger() + + +# ---------------------------------------------------------------------- +def action_proc_inject_process(config): + + if config.function_files is None: + log.warning(" - input .json file with process files is needed") + return + + # -------------------------------------------------------------------------- + # Load process information + # -------------------------------------------------------------------------- + with open(config.function_files, "r") as f: + f_info = json.load(f) + + log.warning(" - Building process...") + + # Search and inject process + injections = [] + for p in f_info: + + parameters = OrderedDict({x["param_position"]: x["param_value"] for x in p['parameters']}) + + # -------------------------------------------------------------------------- + # Fill process information + # -------------------------------------------------------------------------- + inject_process = { + "args": [x for x, y in six.iteritems(parameters)], + "callbacks": None, + "chord": None, + "errbacks": None, + "eta": None, + "expires": None, + "id": uuid.uuid1(), + "kwargs": {}, + "retries": 0, + "task": "tasks.%s" % p["function"], + "taskset": None, + "timelimit": [ + None, + None + ], + "utc": True + } + + injections.append(inject_process) + + # -------------------------------------------------------------------------- + # Re-inject messages + # -------------------------------------------------------------------------- + log.warning(" - Trying to connect with server...") + + url = '%s://%s' % (config.broker_type, config.target) + + with Connection(url) as conn: + in_queue = conn.SimpleQueue('celery') + + log.warning(" - Sending processes to '%s'" % config.target) + + for i, e in enumerate(injections, 1): + log.warning(" %s) %s" % (i, e['task'])) + # pass + in_queue.put(e, serializer="pickle") diff --git a/enteletaor_lib/modules/proc/proc_list_process.py b/enteletaor_lib/modules/proc/proc_list_process.py index 984987d..c9c5fde 100644 --- a/enteletaor_lib/modules/proc/proc_list_process.py +++ b/enteletaor_lib/modules/proc/proc_list_process.py @@ -5,50 +5,14 @@ import six import json import logging +from time import sleep from kombu import Connection -from .utils import list_remote_process +from .utils import list_remote_process, get_param_type, export_process log = logging.getLogger() -# ---------------------------------------------------------------------- -def get_param_type(value): - """ - Try to identify the parameter type by their value - - :return: string with type. Valid values: str, int, float, dict, list, bytes, object - :rtype: str - - """ - try: - # Distinguish between int and float - if int(value) == value: - return "int" - else: - return "float" - - except ValueError: - - # If raises type must be string or complex data - if type(value) == dict: - return "dict" - elif type(value) == list: - return "list" - elif type(value) == bytes: - try: - value.decode() - - return "bytes" - except Exception: - return "str" - - elif type(value) == str: - return "str" - else: - return "object" - - # ---------------------------------------------------------------------- def action_proc_list_process(config): @@ -57,17 +21,31 @@ def action_proc_list_process(config): url = '%s://%s' % (config.broker_type, config.target) with Connection(url) as conn: + in_queue = conn.SimpleQueue('celery') process_info = {} # Get remote process - for remote_process, remote_args in list_remote_process(config, in_queue): + first_msg = True + while 1: + for remote_process, remote_args in list_remote_process(config, in_queue): - if remote_process not in process_info: - process_info[remote_process] = remote_args + if remote_process not in process_info: + process_info[remote_process] = remote_args + if config.no_stream is False and not process_info: + if first_msg is True: + log.error(" -> Not messages found. Waiting ...") + first_msg = False + + sleep(0.1) + else: + break + + # -------------------------------------------------------------------------- # Try to identify parameters types + # -------------------------------------------------------------------------- # Display info log.error(" - Remote process found:") @@ -81,31 +59,7 @@ def action_proc_list_process(config): if config.template is not None: log.warning(" - Building template...") - export_data = [] - - for p, v in six.iteritems(process_info): - - # Function name restriction? - if config.function_name is not None and config.function_name != p: - continue - - # Extract function params - l_params = {} - for i, l_p in enumerate(v): - l_params = { - 'param_position': i, - 'param_type': get_param_type(l_p) - } - - # Add to function information - l_process = { - 'function': p, - 'parameters': l_params - } - - # Add to all data - - export_data.append(l_process) + export_data = export_process(process_info, config) # -------------------------------------------------------------------------- # Save template diff --git a/enteletaor_lib/modules/proc/proc_remove.py b/enteletaor_lib/modules/proc/proc_remove.py new file mode 100644 index 0000000..d9c601e --- /dev/null +++ b/enteletaor_lib/modules/proc/proc_remove.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +import logging + +from kombu import Connection + +from .utils import get_remote_messages + +log = logging.getLogger() + + +# ---------------------------------------------------------------------- +def action_proc_remove(config): + + log.warning(" - Trying to connect with server...") + + url = '%s://%s' % (config.broker_type, config.target) + + with Connection(url) as conn: + in_queue = conn.SimpleQueue('celery') + + # Get remote process + for _ in get_remote_messages(config, in_queue, False): + pass + + log.error(" - All processes removed from '%s'" % config.target) diff --git a/enteletaor_lib/modules/proc/utils.py b/enteletaor_lib/modules/proc/utils.py index c1746d2..3787ccd 100644 --- a/enteletaor_lib/modules/proc/utils.py +++ b/enteletaor_lib/modules/proc/utils.py @@ -1,12 +1,92 @@ # -*- coding: utf-8 -*- +import six + from kombu.simple import Empty from six.moves.cPickle import loads from kombu.exceptions import SerializationError # ---------------------------------------------------------------------- -def get_remote_messages(config, queue): +def get_param_type(value): + """ + Try to identify the parameter type by their value + + :return: string with type. Valid values: str, int, float, dict, list, bytes, object + :rtype: str + + """ + try: + # Distinguish between int and float + if int(value) == value: + return "int" + else: + return "float" + + except ValueError: + + # If raises type must be string or complex data + if type(value) == dict: + return "dict" + elif type(value) == list: + return "list" + elif type(value) == bytes: + try: + value.decode() + + return "bytes" + except Exception: + return "str" + + elif type(value) == str: + return "str" + else: + return "object" + + +# ---------------------------------------------------------------------- +# Import/export process information +# ---------------------------------------------------------------------- +def export_process(process_info, config): + """ + Export process info to json file + + :return: return a dict JSON compatible + :rtype: dict + """ + + export_data = [] + + for p, v in six.iteritems(process_info): + + # Function name restriction? + if config.function_name is not None and config.function_name != p: + continue + + # Extract function params + params = [] + for i, l_p in enumerate(v): + l_params = { + 'param_position': i, + 'param_type': get_param_type(l_p), + 'param_value': None + } + params.append(l_params) + + # Add to function information + l_process = { + 'function': p, + 'parameters': params + } + + # Add to all data + export_data.append(l_process) + + return export_data + + +# ---------------------------------------------------------------------- +def get_remote_messages(config, queue, fill=True, block=False): """ Get all messages from queue without removing from it @@ -35,8 +115,9 @@ def get_remote_messages(config, queue): except Empty: # When Queue is Empty -> reinject all removed messages - for x in to_inject: - queue.put(x, serializer="pickle") + if fill is True: + for x in to_inject: + queue.put(x, serializer="pickle") # ----------------------------------------------------------------------