From 3fed68a795568c0538cc1d1149be4fe9c47a8736 Mon Sep 17 00:00:00 2001 From: cr0hn Date: Thu, 18 Feb 2016 15:43:09 +0100 Subject: [PATCH] add: listing remote functions and export function templates --- .idea/workspace.xml | 322 ++++++++---------- enteletaor_lib/modules/proc/__init__.py | 8 +- enteletaor_lib/modules/proc/cmd_actions.py | 12 +- .../modules/proc/proc_list_process.py | 118 +++++++ enteletaor_lib/modules/proc/proc_raw_dump.py | 69 +--- enteletaor_lib/modules/proc/utils.py | 66 ++++ 6 files changed, 370 insertions(+), 225 deletions(-) create mode 100644 enteletaor_lib/modules/proc/proc_list_process.py create mode 100644 enteletaor_lib/modules/proc/utils.py diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 6075826..ff86c76 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,10 +2,12 @@ - + + - - + + + @@ -41,60 +43,21 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + - - + - + - - - + + + + + - - - + + + + @@ -717,7 +677,13 @@ @@ -727,7 +693,7 @@ - + @@ -764,7 +730,8 @@ - @@ -774,6 +741,11 @@ 29 @@ -794,23 +766,6 @@ - - - - - - - - - - - - - - - - - @@ -1271,17 +1226,6 @@ - - - - - - - - - - - @@ -1298,16 +1242,6 @@ - - - - - - - - - - @@ -1318,14 +1252,6 @@ - - - - - - - - @@ -1347,7 +1273,7 @@ - + @@ -1357,25 +1283,77 @@ - - - - - - - - - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/enteletaor_lib/modules/proc/__init__.py b/enteletaor_lib/modules/proc/__init__.py index a69f24b..ee22d10 100644 --- a/enteletaor_lib/modules/proc/__init__.py +++ b/enteletaor_lib/modules/proc/__init__.py @@ -7,8 +7,9 @@ from modules import IModule from libs.core.structs import CommonData from libs.core.models import IntegerField, StringField, SelectField +from .cmd_actions import parser_proc_raw_dump, parser_proc_list_process from .proc_raw_dump import action_proc_raw_dump -from .cmd_actions import parser_proc_raw_dump +from .proc_list_process import action_proc_list_process log = logging.getLogger() @@ -38,6 +39,11 @@ class RemoteProcessModule(IModule): cmd_args=parser_proc_raw_dump, action=action_proc_raw_dump ), + 'list-process': dict( + help="list remote process and their params", + cmd_args=parser_proc_list_process, + action=action_proc_list_process + ), } name = "proc" diff --git a/enteletaor_lib/modules/proc/cmd_actions.py b/enteletaor_lib/modules/proc/cmd_actions.py index 6048efe..db05f66 100644 --- a/enteletaor_lib/modules/proc/cmd_actions.py +++ b/enteletaor_lib/modules/proc/cmd_actions.py @@ -9,7 +9,17 @@ This file contains command line actions for argparser def parser_proc_raw_dump(parser): gr = parser.add_argument_group("custom raw dump options") - gr.add_argument("--tail", action="store_true", dest="tail_mode", default=False, + gr.add_argument("--streaming", action="store_true", dest="streaming_mode", default=False, help="although all information be dumped do not stop") gr.add_argument("-I", dest="interval", type=float, default=4, help="timeout interval between tow connections") + + +# ---------------------------------------------------------------------- +def parser_proc_list_process(parser): + gr = parser.add_argument_group("process exporting options") + + gr.add_argument("-T", "--make-template", dest="template", type=str, + help="export process as a JSON template format, ready to make injections") + gr.add_argument("-F", "--function-name", dest="function_name", type=str, + help="only export this function name") diff --git a/enteletaor_lib/modules/proc/proc_list_process.py b/enteletaor_lib/modules/proc/proc_list_process.py new file mode 100644 index 0000000..6722032 --- /dev/null +++ b/enteletaor_lib/modules/proc/proc_list_process.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- + +import os +import six +import json +import logging + +from kombu import Connection + +from .utils import list_remote_process + +log = logging.getLogger() + + +# ---------------------------------------------------------------------- +def get_param_type(value): + """ + Try to identify the parameter type by their value + + :return: string with type. Valid values: str, int, float, dict, list, bytes, object + :rtype: str + + """ + try: + # Distinguish between int and float + if int(value) == value: + return "int" + else: + return "float" + + except ValueError: + + # If raises type must be string or complex data + if type(value) == dict: + return "dict" + elif type(value) == list: + return "list" + elif type(value) == bytes: + try: + value.decode() + + return "bytes" + except Exception: + return "str" + + elif type(value) == str: + return "str" + else: + return "object" + + +# ---------------------------------------------------------------------- +def action_proc_list_process(config): + + log.warning(" - Trying to connect with server...") + + url = '%s://%s' % (config.broker_type, config.target) + + with Connection(url) as conn: + in_queue = conn.SimpleQueue('celery') + + process_info = {} + + # Get remote process + for remote_process, remote_args in list_remote_process(config, in_queue): + + if remote_process not in process_info: + process_info[remote_process] = remote_args + + # Try to identify parameters types + + # Display info + log.error(" - Remote process found:") + for p, v in six.iteritems(process_info): + log.error(" -> %s (%s)" % ( + p, + ", ".join("param_%s:%s" % (i, get_param_type(x)) for i, x in enumerate(v)) + )) + + # Export to template enabled? + if config.template is not None: + log.warning(" - Building template...") + + export_data = [] + + for p, v in six.iteritems(process_info): + + # Function name restriction? + if config.function_name is not None and config.function_name != p: + continue + + # Extract function params + for i, l_p in enumerate(v): + l_params = { + 'param_position': i, + 'param_type': get_param_type(l_p) + } + + # Add to function information + l_process = { + 'function': p, + 'parameters': l_params + } + + # Add to all data + + export_data.append(l_process) + + # -------------------------------------------------------------------------- + # Save template + # -------------------------------------------------------------------------- + # Build path in current dir + export_path = "%s.json" % os.path.abspath(config.template) + + # dumps + json.dump(export_data, open(export_path, "w")) + + log.error(" - Template saved at: '%s'" % export_path) diff --git a/enteletaor_lib/modules/proc/proc_raw_dump.py b/enteletaor_lib/modules/proc/proc_raw_dump.py index 5e2dd1f..a7ea680 100644 --- a/enteletaor_lib/modules/proc/proc_raw_dump.py +++ b/enteletaor_lib/modules/proc/proc_raw_dump.py @@ -5,9 +5,8 @@ import logging from time import sleep from kombu import Connection -from kombu.simple import Empty -from six.moves.cPickle import loads -from kombu.exceptions import SerializationError + +from .utils import list_remote_process log = logging.getLogger() @@ -15,61 +14,29 @@ log = logging.getLogger() # ---------------------------------------------------------------------- def action_proc_raw_dump(config): + log.warning(" - Trying to connect with server...") + url = '%s://%s' % (config.broker_type, config.target) # with Connection('redis://%s' % REDIS) as conn: with Connection(url) as conn: in_queue = conn.SimpleQueue('celery') - to_inject = [] - already_processed = set() - while 1: - try: - while 1: - message = in_queue.get(block=False, timeout=1) - # -------------------------------------------------------------------------- - # Try to deserialize - # -------------------------------------------------------------------------- - # Is Pickle info? - try: - deserialized = loads(message.body) - except SerializationError: - pass + for remote_process, remote_args in list_remote_process(config, in_queue): + # Show info + log.error("Found process information:") + log.error(" - Remote process name: '%s'" % remote_process) + log.error(" - Input parameters:") - msg_id = deserialized['id'] + for i, x in enumerate(remote_args): + log.error(" -> P%s: %s" % (i, x)) - # Read info - if msg_id not in already_processed: - - remote_process = deserialized['task'].split(".")[-1] - remote_args = deserialized['args'] - - # Show info - log.error("Found process information:") - log.error(" - Remote process name: '%s'" % remote_process) - log.error(" - Input parameters:") - for i, x in enumerate(remote_args): - log.error(" -> P%s: %s" % (i, x)) - - # Store as processed - already_processed.add(msg_id) - - # -------------------------------------------------------------------------- - # Store message to re-send - # -------------------------------------------------------------------------- - to_inject.append(deserialized) - - except Empty: - # When Queue is Empty -> reinject all removed messages - for x in to_inject: - in_queue.put(x, serializer="pickle") - - # Queue is empty -> wait - if config.tail_mode: - log.error("No more messages from server. Waiting for %s seconds and try again.." % config.interval) - sleep(config.interval) - else: - log.error("No more messages from server. Exiting...") - return + # Queue is empty -> wait + if config.streaming_mode: + log.error("No more messages from server. Waiting for %s seconds and try again.." % config.interval) + sleep(config.interval) + else: + log.error("No more messages from server. Exiting...") + return diff --git a/enteletaor_lib/modules/proc/utils.py b/enteletaor_lib/modules/proc/utils.py new file mode 100644 index 0000000..c1746d2 --- /dev/null +++ b/enteletaor_lib/modules/proc/utils.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- + +from kombu.simple import Empty +from six.moves.cPickle import loads +from kombu.exceptions import SerializationError + + +# ---------------------------------------------------------------------- +def get_remote_messages(config, queue): + """ + Get all messages from queue without removing from it + + :return: yield raw deserialized messages + :rtype: json + """ + + to_inject = [] + + try: + while 1: + message = queue.get(block=False, timeout=1) + + # -------------------------------------------------------------------------- + # Try to deserialize + # -------------------------------------------------------------------------- + # Is Pickle info? + try: + deserialized = loads(message.body) + except SerializationError: + pass + + yield deserialized + + to_inject.append(deserialized) + + except Empty: + # When Queue is Empty -> reinject all removed messages + for x in to_inject: + queue.put(x, serializer="pickle") + + +# ---------------------------------------------------------------------- +def list_remote_process(config, queue): + """ + Get all messages from queue without removing from it + + :return: yield two values: remote_process name, remote args + :rtype: str, set + """ + + already_processed = set() + + for deserialized in get_remote_messages(config, queue): + + msg_id = deserialized['id'] + + # Read info + if msg_id not in already_processed: + + remote_process = deserialized['task'].split(".")[-1] + remote_args = deserialized['args'] + + # Store as processed + already_processed.add(msg_id) + + yield remote_process, remote_args