103 lines
2.8 KiB
Python
103 lines
2.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import pickle
|
|
import logging
|
|
|
|
from time import sleep
|
|
from modules import IModule
|
|
from kombu import Connection
|
|
from kombu.simple import Empty
|
|
from kombu.exceptions import SerializationError
|
|
|
|
from ...libs.core.structs import CommonData, AppSettings
|
|
from ...libs.core.models import IntegerField, StringField, SelectField, validators
|
|
|
|
log = logging.getLogger()
|
|
|
|
REDIS = "10.211.55.69"
|
|
|
|
|
|
class ModuleModel(CommonData):
|
|
interval = IntegerField(default=4)
|
|
target = StringField([validators.required()])
|
|
export_results = StringField(default="")
|
|
import_results = StringField(default=None)
|
|
broker_type = SelectField(default="redis", choices=[
|
|
("redis", "Redis server"),
|
|
("zmq", "ZeroMQ"),
|
|
("amqp", "RabbitMQ broker")
|
|
])
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
class RemoteProcessModule(IModule):
|
|
"""
|
|
Try to extract information from remote processes
|
|
"""
|
|
__model__ = ModuleModel
|
|
|
|
name = "dump"
|
|
description = "connect to remote server/s and dumps all available information"
|
|
|
|
# ----------------------------------------------------------------------
|
|
def run(self, config):
|
|
# --------------------------------------------------------------------------
|
|
# Ver dirty monkey patch to avoid kombu write into screen
|
|
# --------------------------------------------------------------------------
|
|
try:
|
|
import sys
|
|
sys.stderr = open("/dev/null")
|
|
except IOError:
|
|
pass
|
|
|
|
dump_from_celery(config)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
def dump_from_celery(config):
|
|
"""
|
|
Start dumping information
|
|
"""
|
|
URL = '%s://%s' % (config.broker_type, config.target)
|
|
|
|
# with Connection('redis://%s' % REDIS) as conn:
|
|
with Connection(URL) as conn:
|
|
in_queue = conn.SimpleQueue('celery')
|
|
|
|
while 1:
|
|
try:
|
|
while 1:
|
|
message = in_queue.get(block=False, timeout=1)
|
|
# message = in_queue.get(block=False, timeout=1)
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Try to deserialize
|
|
# --------------------------------------------------------------------------
|
|
# Is Pickle info?
|
|
try:
|
|
deserialized = pickle.loads(message.body)
|
|
except SerializationError:
|
|
pass
|
|
|
|
# Read info
|
|
remote_process = deserialized['task'].split(".")[-1]
|
|
remote_args = deserialized['args']
|
|
|
|
# Show info
|
|
_show_info(remote_process, remote_args)
|
|
|
|
except Empty:
|
|
# Queue is empty -> wait
|
|
log.error("No more messages from server. Waiting for %s seconds and try again.." % config.interval)
|
|
sleep(2)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
def _show_info(process, args):
|
|
|
|
log.error("Found process information:")
|
|
log.error(" - Remote process name: '%s'" % process)
|
|
log.error(" - Input parameters:")
|
|
for i, x in enumerate(args):
|
|
log.error(" -> P%s: %s" % (i, x))
|