Python Forum
Need help in optimising code, please.
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Need help in optimising code, please.
#1
I need help in optimizing below code. I am a beginner in Python and used multiprocessing for the first time. Below script execution is taking a long time and causing issues for me to keep up with real-time representation.

We have a logs server and I am trying to convert lines and alerts (we can create alerts in logs server) into metrics. But, as there are like 2000 lines every 30 seconds, the conversion of logs to metrics is taking more than 5 minutes but I want it under 30 seconds itself to keep up with real time.

Example:
log pattern - ".*(?P<REQUEST>GET|POST)\s+(?P<URI>/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*)[a-zA-Z0-9/_-]*\s+HTTP/\d+.\d+'\s+'[^:/ ]+.?[0-9]*'\s+(?P<RESPONSE>\d+)(.*\s+)(?P<RESPONSETIME>\d+.\d+)\s+"

I convert above one to metric with value as response time and other match group items as labels to that metric

We call each line as an event, hence code has the naming convention 'event'

#!/usr/bin/env python

import datetime
import socket
import subprocess
import time
import shlex
import logging.handlers
import os
import sys
import yaml
import base64
import re
import importlib
import json
import math
import gc
import threading
import multiprocessing
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Logging related variables
LOG_FILE_NAME = "/var/log/loginsightWavefrontIntegration.log"
LOGGING_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
handler = logging.handlers.RotatingFileHandler(
    LOG_FILE_NAME, mode="a", maxBytes=10000000, backupCount=5
)
handler.setFormatter(formatter)

log = logging.getLogger("LogInsightWavefrontIntegration")
log.setLevel(LOGGING_LEVEL)
log.addHandler(handler)

stdoutAndstderr = logging.StreamHandler()
stdoutAndstderr.setLevel(LOGGING_LEVEL)
stdoutAndstderr.setFormatter(formatter)
log.addHandler(stdoutAndstderr)

# system level variables
hostname = socket.gethostname()
wavefrontProxyServiceName = os.environ.get("WAVEFRONT_PROXY_SERVICE_NAME")
# SEND_METRICS_OF supported values - all, events, alerts
sendMetricsOf = os.environ.get("SEND_METRICS_OF")

# Files related variables
checkFileChange = {
    "users": {
        "filePath": os.environ.get("USER_DETAILS_FILE"),
        "fileContents": None
    },
    "alerts": {
        "filePath": os.environ.get("LOGINSIGHT_ALERT_DETAILS_FILE"),
        "fileContents": None
    },
    "events": {
        "filePath": os.environ.get("LOGINSIGHT_EVENT_DETAILS_FILE"),
        "fileContents": None
    }
}

sessionID = None
HTTP_RESPONSE_LOGIN_TIMEOUT_CODE = 440
POST = 'POST'
GET = 'GET'
ZERO = 0

confirmedAlertIDsToMonitor = []
WF_METRIC_ALERTS = {}
licfResponse = {}
licfResponseConversion = {}
match = {}
fullMessages = {}
alertResponse = {}
status = {}
allEvents = {}
eventData = {}
populateEventData = {}
threads = {}
setLastAlertEpochTime = {}
Semaphore = None
activeProcesses = []
epoch_time = int(datetime.datetime.now().strftime("%s"))
EVENT_LAST_POPULATED_EPOCH_TIME = 0
EVENT_POPULATE_TIME_GREATER = 0
EVENT_POPULATE_TIME_LESS = 0
# Time backwards from current epoch time to verify metric sent to wavefront
TIME_MINUS = 120


# Multithreading class
# A thread gets initialized when the class is invoked with thread name
class ProcessClassToPushMetrics(multiprocessing.Process):
    def __init__(self, data):
        log.info("[ProcessClassToPushMetrics,__init__]: data variable content, "
            "'{data}'".format(data=data))
        self.dataName = data['name']
        if self.dataName == "alerts":
            multiprocessing.Process.__init__(self, name="Thread-Alert-" + data['alertID'])
            self.alertID = data['alertID']
        elif self.dataName == "events":
            multiprocessing.Process.__init__(self, name="Thread-Event-" + data['threadName'])
            self.threadName = data['threadName']
            self.eventName = data['eventName']
            self.startTime = data['startTime']
            self.endTime = data['endTime']

        del data
        gc.collect()

    # run event related threads
    def _run_event_process(self, process_name):
        global allEvents, eventData, status, \
            EVENT_POPULATE_TIME_GREATER, EVENT_POPULATE_TIME_LESS

        self.metricName = "events.{name}".format(name=self.eventName)
        log.info("[{logprefix}]: Metric name to be used is '{metricName}'".format(
            logprefix=process_name,
            metricName=self.metricName)
        )

        status[self.threadName] = _populate_event_data(
            name=self.eventName,
            threadName = self.threadName,
            logprefix=process_name,
            greaterthan=self.startTime,
            lessthan=self.endTime
        )

        if status[self.threadName]:
            if eventData[self.eventName][self.threadName]:
                for self.event in eventData[self.eventName][self.threadName]:
                    _send_to_wavefront(
                        metricName=self.metricName,
                        time=self.event['epoch_time'],
                        value=self.event['value'],
                        logprefix=process_name,
                        labelsAsString=self.event['labelsAsString']
                    )

        del process_name, status[self.threadName]
        gc.collect()

    # run alert related threads
    def _run_alert_process(self, process_name):
        global WF_METRIC_ALERTS, alertResponse, status, setLastAlertEpochTime

        _metricName = "alerts.{name}".format(
            name=WF_METRIC_ALERTS[self.alertID]['metricName']
        )

        log.info("[{logprefix}]: Metric name to be used is '{metricName}'".format(
            logprefix=process_name,
            metricName=_metricName)
        )

        status[self.alertID] = _populate_alert_history(alertID=self.alertID,
                                                       logprefix=process_name)

        alertResponse[self.alertID]['labelsAsString'] = ""
        if 'labels' in WF_METRIC_ALERTS[self.alertID]:
            for key in WF_METRIC_ALERTS[self.alertID]['labels']:
                keyPlusValue = '\\"{key}\\"=\\"{value}\\"'.format(
                    key=key, value=WF_METRIC_ALERTS[self.alertID]['labels'][key]
                )
                alertResponse[self.alertID]['labelsAsString'] += " " + keyPlusValue

        if status[self.alertID]:
            for timestamp in alertResponse[self.alertID].keys():
                _send_to_wavefront(
                    metricName=_metricName,
                    time=timestamp,
                    value=alertResponse[self.alertID][timestamp],
                    logprefix=process_name,
                    labelsAsString=alertResponse[self.alertID]['labelsAsString']
                )
        else:
            _send_to_wavefront(
                metricName=_metricName,
                time=epoch_time,
                value=ZERO,
                logprefix=process_name,
                labelsAsString=alertResponse[self.alertID]['labelsAsString']
            )

        alertResponse[self.alertID] = {}
        WF_METRIC_ALERTS[self.alertID]['lastAlertEpochTime'] = \
            setLastAlertEpochTime[self.alertID]

        del _metricName, status[self.alertID], process_name
        gc.collect()

    # run function will be executed when we use process_name.start() cmd
    def run(self):
        global user_details, Semaphore, activeProcesses
        self.processName = ""

        Semaphore.acquire()
        self.processName = multiprocessing.current_process().name
        activeProcesses.append(self.processName)
        log.info("[Semaphore,Acquire]: Running: {activeProcessesCount} Processes"
            ".".format(activeProcessesCount=len(activeProcesses)))
        try:
            if self.dataName == "alerts":
                self._run_alert_process(process_name=self.processName)
            elif self.dataName == "events":
                self._run_event_process(process_name=self.processName)
        finally:
            activeProcesses.remove(self.processName)
            log.info("[Semaphore,Release]: Running: {activeProcessesCount} Processes"
                ".".format(activeProcessesCount=len(activeProcesses)))
            Semaphore.release()

        gc.collect()


# send integration active status to wavefront
def _send_integration_status_to_wavefront(active):
    _send_to_wavefront(
        metricName="integration.symphony.status.podactive",
        time=epoch_time,
        value=active,
        logprefix="_send_integration_status_to_wavefront"
    )

# verify metric points of particular hostname
def _verify_metric_of(number, name):
    _status = False
    _epoch_minus = epoch_time - TIME_MINUS
    _epoch_minus_in_millisec = _epoch_minus * 1000
    _last_update = 0
    _last_update_trim = 0
    _url = "https://{wfinstance}/api/v2/chart/raw?host={hostname}&" \
        "metric=loginsight.integration.symphony.status.podactive&startTime={starttime}".format(
            wfinstance=user_details.wavefront['WAVEFRONT_INSTANCE'],
            hostname=name + '-' + str(number),
            starttime=str(_epoch_minus_in_millisec)
        )
    _headers={"Authorization": "Bearer {apikey}".format(
        apikey=base64.b64decode(
            user_details.wavefront['WAVEFRONT_INSTANCE_API_KEY']).decode())
    }

    response = requests.get(_url, headers=_headers)
    log.info("[_verify_metric_of]: Metric Chart Raw api '{url}' response content "
        "'{content}'".format(
            url=_url,
            content=response.content
        )
    )

    response_json = response.json()
    if response_json:
        _points = response_json[0]['points']
        for _point in _points:
            if _last_update < _point['timestamp']:
                _last_update = _point['timestamp']

        # last update time is in milli seconds. so rounding it to seconds.
        _last_update_trim = int(str(_last_update)[0:len(str(_epoch_minus))])
        log.info("[_verify_metric_of]: Last update trimmed value is {value}".format(
            value=_last_update_trim))
        log.info("[_verify_metric_of]: Current Epoch Time - {epoch_time} and Epoch "
            "Time with minus {time_minus} seconds {epoch_minus}".format(
                epoch_time=epoch_time,
                time_minus=TIME_MINUS,
                epoch_minus=_epoch_minus
            )
        )
        # metric last update comparison
        if  _last_update_trim < _epoch_minus:
            log.warning("Last update is more than {sec} seconds, "
                "hence returning false".format(sec=TIME_MINUS))
        else:
            log.info("Last update is less than {sec} seconds, "
                "hence returning true.".format(sec=TIME_MINUS))
            _status = True

    try:
        return _status
    finally:
        del _status, _url, _headers, _epoch_minus, _last_update, _last_update_trim, \
            response_json, response
        gc.collect()

# Verify metrics getting pushed to wavefront
def _verify_metrics_in_wavefront():
    _status = False
    _statefulset_number = None
    _statefulset_name = None

    match = re.match("(?P<STATEFULSETNAME>.*)-(?P<STATEFULSETNUMBER>\d+)", hostname)

    if match and int(match.groupdict()['STATEFULSETNUMBER']):
        _statefulset_number = int(match.groupdict()['STATEFULSETNUMBER'])
        _statefulset_name = match.groupdict()['STATEFULSETNAME']
        for replica in range(0, _statefulset_number):
            if _verify_metric_of(number=replica, name=_statefulset_name):
                _status = True
                break

    try:
        return _status
    finally:
        del _status, _statefulset_number, _statefulset_name
        gc.collect()


# python requests command execution
def _execute_licf_api_cmd(cmd, api, logprefix, data={}, headers={}):
    global sessionID
    _responseConversion = None
    _response = None
    _url = "https://{srvName}:{srvPort}/{api}".format(
        srvName=user_details.loginsight['LOGINSIGHT_SERVER_NAME'],
        srvPort=user_details.loginsight['LOGINSIGHT_SERVER_PORT'],
        api=api
    )

    try:
        if cmd == POST:
            if data:
                _response = requests.post(url=_url, json=data, verify=False)
            else:
                raise Exception("data variable passed to this function was empty for "
                    "POST requests: '{url}'".format(url=_url)
                )
        elif cmd == GET:
            headers['Authorization'] = "Bearer {sessionID}".format(sessionID=sessionID)
            _response = requests.get(url=_url, headers=headers, verify=False)

        log.info("[_execute_licf_api_cmd,{logprefix}]: Executed LICF API command "
            "with,\n url: '{url}'\ncmd: '{cmd}'\ndata: {data}\n"
            "headers: {headers}\n\n".format(
                url=_url,
                cmd=cmd,
                data=data,
                headers=headers,
                logprefix=logprefix
            )
        )

        if _response.status_code == requests.codes.ok:
            _responseConversion = _response.content

            if type(_responseConversion) == bytes:
                log.info("[_execute_licf_api_cmd,{logprefix}]: Since response of licf "
                    "api is of type 'bytes', try decoding it...".format(
                        logprefix=logprefix
                    )
                )
                try:
                    _responseConversion = _responseConversion.decode('utf-8')
                except Exception as exception:
                    log.info("[_execute_licf_api_cmd,{logprefix}]: Can't decode since "
                        "'{ex}'".format(logprefix=logprefix, ex=exception))

                try:
                    _responseConversion = _responseConversion.replace("\r", '')
                except Exception as exception:
                    log.info("[_execute_licf_api_cmd,{logprefix}]: Can't replace '\\r' "
                        "since '{ex}'".format(logprefix=logprefix, ex=exception))

            _responseConversion = _responseConversion.replace('\\"', '\'')
            _responseConversion = json.dumps(_responseConversion)
            _responseConversion = yaml.safe_load(_responseConversion)
            if type(_responseConversion) in [bytes, bytearray, str]:
                _responseConversion = json.loads(_responseConversion)

            log.info("[_execute_licf_api_cmd,{logprefix}]: Exiting the function by "
                "returning response\n\n\n.".format(logprefix=logprefix))
            return _responseConversion
        elif _response.status_code == HTTP_RESPONSE_LOGIN_TIMEOUT_CODE:
            log.warning("[_execute_licf_api_cmd,{logprefix}]: LICF Rest API login "
                "session has timed out. Reconnecting....".format(logprefix=logprefix))
            _connect_to_loginsight_server()

            log.warning("[_execute_licf_api_cmd,{logprefix}]: Executing licf api "
                "command again...".format(logprefix=logprefix))
            return _execute_licf_api_cmd(cmd=cmd,
                                         api=api,
                                         logprefix=logprefix,
                                         data=data,
                                         headers=headers)
        else:
            log.error("[_execute_licf_api_cmd,{logprefix}]: Execution of licf api cmd "
                "has failed: {status} \nresponse was: {resp}\n\n\n".format(
                    status=_response.status_code,
                    resp=_response,
                    logprefix=logprefix
                )
            )
    except Exception as exception:
        log.warning("[_execute_licf_api_cmd,{logprefix}]: Excpetion occured, " 
            "'{ex}'".format(logprefix=logprefix, ex=exception))
        raise Exception("[_execute_licf_api_cmd,{logprefix}]: Excpetion occured, "
                        "'{ex}'".format(logprefix=logprefix, ex=exception))
    finally:
        del _response, _responseConversion, _url, cmd, api, logprefix, data, headers
        gc.collect()


# log alert details being used
def _log_alert_details():
    log.info("[_log_alert_details]: Array of dicts to be used as alert details for "
        "sending alert metrics to wavefront instance are,\n'{details}'".format(
            details=alert_details.userSpecifiedAlerts
        )
    )


# log event details being used
def _log_event_details():
    log.info("[_log_event_details]: Array of dicts to be used as event details for "
        "sending event metrics to wavefront instance are,\n'{details}'".format(
            details=event_details.userSpecifiedEvents
        )
    )


# Log Wavefront Server details being used
def _log_wf_server_details():
    _status = True
    _logText = ""

    if 'WAVEFRONT_INSTANCE' in user_details.wavefront.keys():
        _logText += "Wavefront Instance Name - {srvName}\n".format(
            srvName=user_details.wavefront['WAVEFRONT_INSTANCE']
        )
    else:
        _logText += "WAVEFRONT_INSTANCE key is not specified in " \
                   "user details module dict wavefront.\n"
        _status = False

    if 'WAVEFRONT_INSTANCE_API_KEY' in user_details.wavefront.keys():
        _logText += "Wavefront Instance API Key - {srvPort}\n".format(
            srvPort=user_details.wavefront['WAVEFRONT_INSTANCE_API_KEY']
        )
    else:
        _logText += "WAVEFRONT_INSTANCE_API_KEY key is not specified in " \
                   "user details module dict wavefront.\n"
        _status = False

    try:
        if _status:
            log.info("[_log_wf_server_details]: All required wavefront variables are "
                "specified in user details module.\n Values provided to connect to "
                "Wavefront Instance are,\n{logText}".format(logText=_logText))
        else:
            raise Exception("[_log_wf_server_details]: Not all required wavefront "
                "variables are specified in user details module.\n{logText}".format(
                    logText=_logText
                )
            )
    except Exception as exception:
        log.error(exception)
        raise Exception(exception)
    finally:
        del _logText, _status
        gc.collect()


# Log LICF Server details being used
def _log_licf_server_details():
    _status = True
    _logText = ""

    if 'LOGINSIGHT_SERVER_NAME' in user_details.loginsight.keys():
        _logText += "LogInsight Server Name - {srvName}\n".format(
            srvName=user_details.loginsight['LOGINSIGHT_SERVER_NAME']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_NAME key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_PORT' in user_details.loginsight.keys():
        _logText += "LogInsight Server Port - {srvPort}\n".format(
            srvPort=user_details.loginsight['LOGINSIGHT_SERVER_PORT']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_PORT key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_USER_NAME' in user_details.loginsight.keys():
        _logText += "LogInsight Server User Name - {userName}\n".format(
            userName=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_USER_NAME key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_USER_PASSWORD' in user_details.loginsight.keys():
        _logText += "LogInsight Server User Pwd - {userPwd}\n".format(
            userPwd=user_details.loginsight['LOGINSIGHT_SERVER_USER_PASSWORD']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_USER_PASSWORD key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_USER_PROVIDER' in user_details.loginsight.keys():
        _logText += "LogInsight Server User Provider - {provider}\n".format(
            provider=user_details.loginsight['LOGINSIGHT_SERVER_USER_PROVIDER']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_USER_PROVIDER key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    try:
        if _status:
            log.info("[_log_licf_server_details]: All required variables are specified "
                "in user details module.\n Values provided to connect to LogInsight "
                "Server are,\n{logText}".format(logText=_logText))
        else:
            raise Exception("[_log_licf_server_details]: Not all required variables "
                "are specified in user details module.\n{logText}".format(
                    logText=_logText
                )
            )
    except Exception as exception:
        log.error(exception)
        raise Exception(exception)
    finally:
        del _logText, _status
        gc.collect()


# get session ID for LICF server
def _connect_to_loginsight_server():
    global sessionID, user_details
    _response = None

    _data = {
        "username": base64.b64decode(
            user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']).decode(),
        "password": base64.b64decode(
            user_details.loginsight['LOGINSIGHT_SERVER_USER_PASSWORD']).decode(),
        "provider": user_details.loginsight['LOGINSIGHT_SERVER_USER_PROVIDER']
    }
    _response = _execute_licf_api_cmd(cmd=POST,
                                      api='api/v1/sessions',
                                      logprefix='_connect_to_loginsight_server',
                                      data=_data)

    sessionID = _response["sessionId"]
    log.info("[_connect_to_loginsight_server]: Session ID to connect to LICF server "
        "is, '{sessionID}'\nfor user ID: {userID}\n\n\n".format(
            sessionID=sessionID,
            userID=_response['userId']
        )
    )

    del _response, _data
    gc.collect()


# send metric data to wavefront
def _send_to_wavefront(metricName, time, value, logprefix, labelsAsString=""):
    # Formaing text to be sent to wavefront proxy.
    _sendText = (
        "loginsight.{metricName} {value} {epochTime} "
        'source=\\"{hostname}\\" {labels}'.format(
            metricName=metricName,
            value=value,
            epochTime=time,
            hostname=hostname,
            labels=labelsAsString
        )
    )
    log.info(
        "[_send_to_wavefront,{logprefix}]: Text to be sent is, {sendText}".format(
            logprefix=logprefix, sendText=_sendText
        )
    )

    # First we echo the text to be sent to pipe in 'p1' and then send
    #  it to 'p2' as input which uses netcat 'nc' to write to network
    #  connection which in this case is wavefront proxy.
    _p1 = subprocess.Popen(
        shlex.split("echo -e {sendText}".format(sendText=_sendText)),
        stdout=subprocess.PIPE,
    )
    _p2 = subprocess.Popen(
        shlex.split("nc -q -v {proxy} 2878".format(proxy=wavefrontProxyServiceName)),
        stdin=_p1.stdout,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )
    _process_output, _ = _p2.communicate()
    log.info(
        "[_send_to_wavefront,{logprefix}]: Output of nc command <EMPTY means success> "
        "is, {process_output}".format(logprefix=logprefix,
                                      process_output=_process_output)
    )

    del metricName, time, value, logprefix, labelsAsString, _sendText
    del _p1, _p2, _process_output


# check alerts specified in loginsight_alerts_details.py exists/visible to user
# user here is user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']
def _check_alerts_exists():
    global confirmedAlertIDsToMonitor, alert_details, WF_METRIC_ALERTS
    confirmedAlertIDsToMonitor = []
    _response = None
    _alertIDs = []
    _alertEnabled = {}
    _status = False

    log.info("[_check_alerts_exists]: Get details of all alerts.\n")
    _response = _execute_licf_api_cmd(cmd=GET,
                                     api='api/v1/alerts',
                                     logprefix='_check_alerts_exists')
    log.info("[_check_alerts_exists]: Received details of all alerts\n\n")

    for alert in _response:
        _alertIDs.append(alert['id'])
        _alertEnabled[alert['id']] = alert['enabled']
        log.info("[_check_alerts_exists]: Append id {id} to all alerts with its "
            "enabled status - {status}".format(id=alert['id'],
                                               status=alert['enabled']))

    log.info("[_check_alerts_exists]: User alert details - {userAlertDetails}\n\n"
        "All alerts IDs - {allAlertIDS}\n\n".format(
            userAlertDetails=alert_details.userSpecifiedAlerts,
            allAlertIDS=_alertIDs
        )
    )
    for alert in alert_details.userSpecifiedAlerts:
        log.info("[_check_alerts_exists]: Alert from user specified is - "
            "{alert}".format(alert=alert))

        if alert['id'] in _alertIDs and \
                str(_alertEnabled[alert['id']]).lower() == "true":
            log.info("[_check_alerts_exists]: Alert ID '{id}' specified in "
                "loginsight_alerts_details.py exists/visible for user {user} and is "
                "enabled.".format(
                    id=alert['id'],
                    user=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']
                )
            )
            confirmedAlertIDsToMonitor.append(alert['id'])
            if alert['id'] not in WF_METRIC_ALERTS.keys():
                WF_METRIC_ALERTS[alert['id']] = {}
                WF_METRIC_ALERTS[alert['id']]['lastAlertEpochTime'] = 0

            # metricName assignment is not done in above, if condition,
            #  to get value updated if ID already exists
            WF_METRIC_ALERTS[alert['id']]['metricName'] = alert['name']
            if 'labels' in alert.keys():
                if alert['labels']:
                    WF_METRIC_ALERTS[alert['id']]['labels'] = alert['labels']
            _status = True
        else:
            log.error("[_check_alerts_exists]: Alert ID '{id}' specified in "
                "loginsight_alerts_details.py either doesn't exist/visible for user "
                "{user} (OR) not enabled. Enabled key value for alert is "
                "'{enabled}'".format(
                    id=alert['id'],
                    user=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME'],
                    enabled=_alertEnabled[alert['id']]
                )
            )

    # Remove alerts that are not in userSpecifiedAlerts
    for alertID in WF_METRIC_ALERTS.keys():
        if alertID not in confirmedAlertIDsToMonitor:
            log.warning("[_check_alerts_exists]: Deleting alertID {alertID} from "
                "monitoring".format(alertID=alertID))
            del WF_METRIC_ALERTS[alertID]

    log.info("[_check_alerts_exists]: Populating alert details that needs monitoring "
        "is completed.\n\n\n")
    try:
        return _status
    finally:
        del _status, _response, _alertIDs, _alertEnabled
        gc.collect()


# populate alert data
def _populate_alertID_data(alertID, logprefix, timestamp):
    global alertResponse, setLastAlertEpochTime, alert_details
    _status = False
    _ndigits = int(math.log10(timestamp))+1
    try:
        timestamp = timestamp//int(10**(_ndigits-10))
        if timestamp > setLastAlertEpochTime[alertID]:
            log.info("[_populate_alertID_data,{logprefix}]: epochTime for alert "
                "{alertID} is {utime} which is greater than the highest epoch time in "
                "this iteration of results so far i.e., {savedTime}. Saving this as "
                "current highest in the iteration.".format(
                    logprefix=logprefix,
                    alertID=alertID,
                    utime=timestamp,
                    savedTime=setLastAlertEpochTime[alertID]
                )
            )
            setLastAlertEpochTime[alertID] = timestamp

        if timestamp > WF_METRIC_ALERTS[alertID]['lastAlertEpochTime']:
            _status = True
            # Check if there are more than 1 alert at same timestamp.
            if timestamp in alertResponse[alertID].keys():
                alertResponse[alertID][timestamp] += 1
            else:
                alertResponse[alertID][timestamp] = 1
        else:
            log.info("[_populate_alertID_data,{logprefix}]: alert at timestamp "
                "{timestamp} was sent to wavefront already".format(
                    logprefix=logprefix,
                    timestamp=timestamp
                )
            )
        return _status
    except ZeroDivisionError:
        log.info("[_populate_alertID_data,{logprefix}]: When removing milli seconds "
            "from timestamp to get epochtime in seconds (10 digits), "
            "'ZeroDivisionError' exception occured.")
        return _status
    finally:
        del _status, _ndigits, alertID, logprefix, timestamp
        gc.collect()


# get alert history and populate
def _populate_alert_history(alertID, logprefix):
    global alertResponse, setLastAlertEpochTime, licfResponse, \
        licfResponseConversion, fullMessages
    _status = False
    alertResponse[alertID] = {}
    licfResponse[alertID] = None
    licfResponseConversion[alertID] = {}
    fullMessages[alertID] = None

    setLastAlertEpochTime[alertID] = WF_METRIC_ALERTS[alertID]['lastAlertEpochTime']

    _api = "api/v1/alerts/{alertID}/history".format(alertID=alertID)
    log.info("[_populate_alert_history,{logprefix}]: Collect history of alert with "
        "ID, {alertID}".format(logprefix=logprefix, alertID=alertID))
    licfResponse[alertID] = _execute_licf_api_cmd(cmd=GET,
                                                  api=_api,
                                                  logprefix=logprefix)

    if not licfResponse[alertID]:
        log.warning("[_populate_alert_history,{logprefix}]: There is no history for "
            "alert ID '{alertID}'\n\n\n".format(logprefix=logprefix, alertID=alertID)
        )
    else:
        licfResponseConversion[alertID]['converted'] = None
        if isinstance(licfResponse[alertID], str):
            # Removing '\', double quotes before and after '[', ']' respectively
            licfResponse[alertID] = licfResponse[alertID].replace('\\','')
            licfResponse[alertID] = \
                str(licfResponse[alertID])[licfResponse[alertID].index('['):]
            licfResponse[alertID] = \
                str(licfResponse[alertID])[:licfResponse[alertID].rfind(']')]
            licfResponseConversion[alertID]['converted'] = \
                json.loads(licfResponse[alertID])
        else:
            licfResponseConversion[alertID]['converted'] = licfResponse[alertID]

        if isinstance(licfResponseConversion[alertID]['converted'][0]['messages'], str):
            fullMessages[alertID] = \
                json.loads(licfResponseConversion[alertID]['converted'][0]['messages'])
        else:
            fullMessages[alertID] = \
                licfResponseConversion[alertID]['converted'][0]['messages']

        log.info("[_populate_alert_history,{logprefix}]: Type of fullMessages variable "
            "is, '{typeresp}'. fullMessages content is,\n '{message}'\n\n\n\n".format(
                logprefix=logprefix,
                typeresp=type(fullMessages),
                message=fullMessages[alertID]
            )
        )

        if isinstance(fullMessages[alertID], list):
            for message in fullMessages[alertID]:
                if _populate_alertID_data(alertID=alertID,
                                          logprefix=logprefix,
                                          timestamp=message['timestamp']):
                    _status = True
        elif isinstance(fullMessages[alertID], dict):
            _status = _populate_alertID_data(
                alertID=alertID,
                logprefix=logprefix,
                timestamp=fullMessages[alertID]['timestamp']
            )
        else:
            log.error("[_populate_alert_history,{logprefix}]: messages key in LICF "
                "response is of type {typeresp} which is not 'list' or 'dict' to get "
                "timestamp of alert.".format(
                    logprefix=logprefix,
                    typeresp=type(fullMessages[alertID])
                )
            )

    try:
        return _status
    finally:
        del fullMessages[alertID], licfResponseConversion[alertID]
        del licfResponse[alertID], _status, _api, alertID, logprefix
        gc.collect()


# prepare alerts data to send to wavefront
def _send_alerts_data_to_wavefront():
    global confirmedAlertIDsToMonitor, threads
    threads['alerts'] = []
    _alertID = None
    _thread = None

    for _alertID in confirmedAlertIDsToMonitor:
        newThread = ProcessClassToPushMetrics(
            data={"name": "alerts",
                  "alertID": _alertID
            }
        )
        newThread.start()
        log.info("[_send_alerts_data_to_wavefront]: Started new thread for alert "
            "{alertID}".format(
                 alertID=_alertID
            )
        )
        threads['alerts'].append(newThread)

    del _alertID, _thread
    gc.collect()


# populate final event data
def _populate_eventname_data(name, threadName, text, timestamp, logprefix):
    global eventData, allEvents, licfResponseConversion, populateEventData, match
    populateEventData[logprefix] = {}
    _ndigits = int(math.log10(timestamp))+1

    try:
        timestamp = timestamp//int(10**(_ndigits-10))

        log.info("[_populate_eventname_data,{logprefix}]: Generating event data with,\n"
            "text '{text}'\nname '{name}'\npattern '{pattern}'".format(
                logprefix=logprefix,
                text=text,
                name=name,
                pattern=allEvents[name]['pattern']
            )
        )
        populateEventData[logprefix]['epoch_time'] = timestamp
        populateEventData[logprefix]['labels'] = {}
        populateEventData[logprefix]['labels'].update(
            licfResponseConversion[logprefix]['labels'])

        match[logprefix] = re.match(allEvents[name]['pattern'], text)
        if match[logprefix]:
            log.info("[_populate_eventname_data,{logprefix}]: matched groups, "
                "'{mgrp}'".format(
                    logprefix=logprefix,
                    mgrp=match[logprefix].groupdict()
                )
            )
            populateEventData[logprefix]['labels'].update(match[logprefix].groupdict())
            log.info("[_populate_eventname_data,{logprefix}]: label keys "
                "'{keys}'".format(
                    logprefix=logprefix,
                    keys=populateEventData[logprefix]['labels'].keys())
            )

            if allEvents[name]['metricValue'] in \
                    populateEventData[logprefix]['labels'].keys():
                populateEventData[logprefix]['value'] = \
                    populateEventData[logprefix]['labels'][allEvents[name]['metricValue']]
            else:
                return False

            del populateEventData[logprefix]['labels'][allEvents[name]['metricValue']]
            populateEventData[logprefix]['labelsAsString'] = ""
            for key in populateEventData[logprefix]['labels']:
                keyPlusValue = '\\"{key}\\"=\\"{value}\\"'.format(
                    key=key, value=populateEventData[logprefix]['labels'][key]
                )
                populateEventData[logprefix]['labelsAsString'] += " " + keyPlusValue
            eventData[name][threadName].append(populateEventData[logprefix])
            return True
    except ZeroDivisionError:
        log.info("[_populate_eventname_data,{logprefix}]: When removing milli seconds "
            "from timestamp to get epochtime in seconds (10 digits), "
            "'ZeroDivisionError' exception occured.".format(logprefix=logprefix))
        return False
    finally:
        del populateEventData[logprefix], _ndigits, name, text, timestamp, \
            match[logprefix], logprefix
        gc.collect()


# populate event data
def _populate_event_data(name, threadName, logprefix, greaterthan, lessthan):
    global allEvents, eventData, licfResponse, licfResponseConversion
    _status = False
    licfResponse[logprefix] = None
    licfResponseConversion[logprefix] = {}
    fullMessages[logprefix] = None

    _api = "api/v1/events/timestamp/%3E%3D{greaterthan}/timestamp/%3C%3D{lessthan}" \
        "{filters}?limit=9999".format(
            filters=allEvents[name]['filters'],
            greaterthan=greaterthan,
            lessthan=lessthan,
        )
    log.info("[_populate_event_data,{logprefix}]: Collect data of event with "
        "api '{api}'".format(
            logprefix=logprefix,
            api=_api
        )
    )
    licfResponse[logprefix] = _execute_licf_api_cmd(cmd=GET,
                                                    api=_api,
                                                    logprefix=logprefix)

    if not licfResponse[logprefix]:
        log.warning("[_populate_event_data,{logprefix}]: There is no event with "
            "filters '{filters}'\n\n\n".format(
                logprefix=logprefix,
                filters=allEvents[name]['filters']
            )
        )
    else:
        licfResponseConversion[logprefix]['converted'] = None
        if isinstance(licfResponse[logprefix], str):
            # Removing '\', double quotes before and after '[', ']' respectively
            licfResponse[logprefix] = licfResponse[logprefix].replace('\\','')
            licfResponse[logprefix] = \
                str(licfResponse[logprefix])[licfResponse[logprefix].index('['):]
            licfResponse[logprefix] = \
                str(licfResponse[logprefix])[:licfResponse[logprefix].rfind(']')]
            licfResponseConversion[logprefix]['converted'] = \
                json.loads(licfResponse[logprefix])
        else:
            licfResponseConversion[logprefix]['converted'] = licfResponse[logprefix]

        if isinstance(licfResponseConversion[logprefix]['converted']['events'], str):
            fullMessages[logprefix] = \
                json.loads(licfResponseConversion[logprefix]['converted']['events'])
        else:
            fullMessages[logprefix] = \
                licfResponseConversion[logprefix]['converted']['events']

        licfResponseConversion[logprefix]['labels'] = {}
        if 'customLabels'in allEvents[name]:
            licfResponseConversion[logprefix]['labels'].update(
                allEvents[name]['customLabels']
            )

        if isinstance(fullMessages[logprefix], list):
            for message in fullMessages[logprefix]:
                if isinstance(message['fields'], str):
                    licfResponseConversion[logprefix]['fields'] = \
                        json.loads(message['fields'])
                else:
                    licfResponseConversion[logprefix]['fields'] = \
                        message['fields']

                for field in licfResponseConversion[logprefix]['fields']:
                    if field['name'] in allEvents[name]['licfFieldsAsLabels']:
                        licfResponseConversion[logprefix]['labels'][field['name']] = \
                            field['content']

                if _populate_eventname_data(name=name,
                                            threadName=threadName,
                                            text=message['text'],
                                            timestamp=message['timestamp'],
                                            logprefix=logprefix):
                    _status = True
        elif isinstance(fullMessages[name], dict):
            if isinstance(fullMessages[name]['fields'], str):
                licfResponseConversion[name]['fields'] = \
                    json.loads(fullMessages[name]['fields'])
            else:
                licfResponseConversion[name]['fields'] = \
                    fullMessages[name]['fields']
            for field in licfResponseConversion[name]['fields']:
                if field['name'] in allEvents[name]['licfFieldsAsLabels']:
                    licfResponseConversion[name]['labels'][field['name']] = \
                        field['content']
            _status = _populate_eventname_data(name=name,
                                               threadName=threadName,
                                               text=fullMessages['text'],
                                               timestamp=fullMessages['timestamp'],
                                               logprefix=logprefix)
        else:
            log.error("[_populate_event_data,{logprefix}]: events key in LICF response"
                "is of type {typeresp} which is not 'list' or 'dict' to get events "
                "timestamp, text etc.".format(
                    logprefix=logprefix,
                    typeresp=type(fullMessages[logprefix])
                )
            )

    try:
        return _status
    finally:
        del licfResponseConversion[logprefix], _status, _api
        del licfResponse[logprefix], fullMessages[logprefix], name, logprefix
        gc.collect()


# send events data to wavefront
def _send_events_data_to_wavefront():
    global allEvents, threads
    threads['events'] = []
    _thread = None
    _eventName = None
    _threadName = None

    for _eventName in allEvents.keys():
        _start_time_of_event_data_to_be_populated = EVENT_POPULATE_TIME_GREATER
        _end_time_of_event_data_to_be_populated = EVENT_POPULATE_TIME_GREATER + 1000
        eventData[_eventName] = {}
        while _end_time_of_event_data_to_be_populated <= EVENT_POPULATE_TIME_LESS:
            log.info("[_send_events_data_to_wavefront]: Start and End time being used "
                "are, StartTime '{starttime}' and EndTime '{endtime}' which are less "
                "than the time till which we need to send metrics, '{time}'".format(
                    starttime=_start_time_of_event_data_to_be_populated,
                    endtime=_end_time_of_event_data_to_be_populated,
                    time=EVENT_POPULATE_TIME_LESS
                )
            )
            _threadName = _eventName + "-" + \
                str(_start_time_of_event_data_to_be_populated)
            eventData[_eventName][_threadName] = []
            newThread = ProcessClassToPushMetrics(
                data={
                    "name": "events",
                    "eventName": _eventName,
                    "threadName": _threadName,
                    "startTime": _start_time_of_event_data_to_be_populated,
                    "endTime": _end_time_of_event_data_to_be_populated
                }
            )
            newThread.start()
            log.info("[_send_events_data_to_wavefront]: Started new thread for event "
                "{eventName}".format(eventName=_eventName))
            threads['events'].append(newThread)
            _start_time_of_event_data_to_be_populated = \
                _end_time_of_event_data_to_be_populated
            _end_time_of_event_data_to_be_populated += 1000

    del _thread, _eventName, _threadName
    gc.collect()


# populate user defined events from file
def _populate_userdefined_events():
    global allEvents, event_details

    _status = False
    log.info("[_populate_userdefined_events]: User defined event details - \n"
        "{userEventDetails}\n\n".format(
            userEventDetails=event_details.userSpecifiedEvents
        )
    )

    for event in event_details.userSpecifiedEvents:
        allEvents[event['name']] = {}
        allEvents[event['name']]['filters'] = event['filters']
        allEvents[event['name']]['pattern'] = event['pattern']
        allEvents[event['name']]['metricValue'] = event['metricValue']
        if 'licfFieldsAsLabels' in event.keys():
            allEvents[event['name']]['licfFieldsAsLabels'] = event['licfFieldsAsLabels']
        if 'customLabels' in event.keys():
            allEvents[event['name']]['customLabels'] = event['customLabels']
        _status = True

    try:
        return _status
    finally:
        del _status, event


# check if file contents has changed
def _check_file_change(name):
    global checkFileChange
    fileContents = None

    try:
        log.info("[_check_file_change]: Load file '{file}' contents.".format(
            file=checkFileChange[name]['filePath'])
        )
        with open(checkFileChange[name]['filePath'], 'r') as inputfile:
            fileContents = inputfile.read()

        if fileContents == checkFileChange[name]['fileContents']:
            log.info("[_check_file_change]: File '{file}' contents have not "
                "changed.".format(file=checkFileChange[name]['filePath']))
            return False
        else:
            log.info("[_check_file_change]: File '{file}' contents have changed."
                "Hence, details will be reloaded.".format(
                    file=checkFileChange[name]['filePath']
                )
            )
            # Saving latest filecontents to check the change next time.
            checkFileChange[name]['fileContents'] = fileContents
            return True
    except Exception as exception:
        log.error("[_check_file_change]: Exception occured while checking if file, "
            "'{file}' has changed,\n Exception is, '{exception}'".format(
                file=name,
                exception=exception
            )
        )
    finally:
        del fileContents
        gc.collect()


# Integrate LICF and Wavefront
def _loginsight_wavefront_integration():
    global epoch_time, user_details, alert_details, event_details, Semaphore, \
        threads, EVENT_LAST_POPULATED_EPOCH_TIME, EVENT_POPULATE_TIME_GREATER, \
        EVENT_POPULATE_TIME_LESS, eventData
    _active = True
    _sendEventMetrics = True
    _sendAlertMetrics = True

    while 1:
        try:
            log.info("[_loginsight_wavefront_integration]: "
                "Start of a cycle in While 1 indefinite loop.\n\n")
            if _check_file_change(name="users"):
                # Reload the module to get latest values.
                user_details = importlib.reload(user_details)
                _log_licf_server_details()
                _log_wf_server_details()
                _connect_to_loginsight_server()
                #Semaphore = multiprocessing.BoundedSemaphore(
                #    user_details.NoOfThreadsExecutionAtOnce
                #)
                Semaphore = multiprocessing.BoundedSemaphore(9999)

            epoch_time = int(datetime.datetime.now().strftime("%s"))

            if _verify_metrics_in_wavefront():
                log.info("[_loginsight_wavefront_integration]: As other statefulset "
                    "pods are sending metrics, this instance will be in inactive mode")
                _send_integration_status_to_wavefront(active=0)
                time.sleep(5)
                _active = False
                continue
            else:
                log.info("[_loginsight_wavefront_integration]: This instance should "
                "send metrics, hence sending status as active")
                _send_integration_status_to_wavefront(active=1)

            if sendMetricsOf == "events" or sendMetricsOf == "all":
                if _check_file_change(name="events"):
                    # Reload the module to get latest values.
                    event_details = importlib.reload(event_details)
                    _log_event_details()

                    if not _populate_userdefined_events():
                        log.error("[_loginsight_wavefront_integration]: There are no "
                            "user defined events that exists in "
                            "loginsight_event_details.py for which metrics needs to "
                            "be sent to wavefront.")
                        _sendEventMetrics = False
                    else:
                        _sendEventMetrics = True

                if _sendEventMetrics:
                    log.info("[_loginsight_wavefront_integration]: There are user "
                        "defined events specified in loginsight_event_details.py "
                        "for which metrics needs to be sent to wavefront.")
                    if not EVENT_LAST_POPULATED_EPOCH_TIME:
                        EVENT_LAST_POPULATED_EPOCH_TIME = (epoch_time - 90) * 1000
                    EVENT_POPULATE_TIME_GREATER = EVENT_LAST_POPULATED_EPOCH_TIME
                    EVENT_POPULATE_TIME_LESS = (epoch_time - 60) * 1000
                    log.info("[_loginsight_wavefront_integration]: epoch time is "
                        "'{etime}'.\nTimes between which events needs to be populated, "
                        "'{gtime} - {ltime}'".format(
                            etime=epoch_time,
                            gtime=EVENT_POPULATE_TIME_GREATER,
                            ltime=EVENT_POPULATE_TIME_LESS
                        )
                    )
                    if EVENT_POPULATE_TIME_LESS > EVENT_POPULATE_TIME_GREATER:
                        try:
                            _send_events_data_to_wavefront()
                        finally:
                            EVENT_LAST_POPULATED_EPOCH_TIME = EVENT_POPULATE_TIME_LESS

            if sendMetricsOf == "alerts" or sendMetricsOf == "all":
                if _check_file_change(name="alerts"):
                    # Reload the module to get latest values.
                    alert_details = importlib.reload(alert_details)
                    _log_alert_details()

                    if not _check_alerts_exists():
                        log.error("[_loginsight_wavefront_integration]: There are no "
                            "alert IDs that exists/visible and enabled for which "
                            "metrics needs to be sent to wavefront.")
                        _sendAlertMetrics = False
                    else:
                        _sendAlertMetrics = True

                if _sendAlertMetrics:
                    log.info("[_loginsight_wavefront_integration]: There are user "
                        "defined alert IDs that exists/visible and enabled for which "
                        "metrics needs to be sent to wavefront.")
                    _send_alerts_data_to_wavefront()

        except Exception as exception:
            log.error("[_loginsight_wavefront_integration]: Exception occured - "
                "{exception}".format(exception=exception))
        finally:
            if _active and threads['alerts']:
                # Wait for all alert threads to complete
                log.info("[_loginsight_wavefront_integration]: "
                    "Wait for all alert threads to complete.")
                for _thread in threads['alerts']:
                    _thread.join()

                log.info("[_loginsight_wavefront_integration]: "
                    "All threads sending alerts data have completed.\n\n")
                threads['alerts'] = []

            if _active and threads['events']:
                #  Wait for all event threads to complete
                log.info("[_loginsight_wavefront_integration]: "
                    "Wait for all event threads to complete.")
                for _thread in threads['events']:
                    _thread.join()

                log.info("[_loginsight_wavefront_integration]: "
                    "All threads sending events data have completed.\n\n")
                threads['events'] = []

            log.info("[_loginsight_wavefront_integration]: "
                "End of a cycle in While 1 indefinite loop.\n\n\n\n\n")
            eventData = {}
            gc.collect()


if __name__ == "__main__":
    try:
        if os.path.isfile(checkFileChange['users']['filePath']):
            log.info("[Main]: User Details file '{file}' exists. "
                "Importing the module and log its content.".format(
                    file=checkFileChange['users']['filePath']
                )
            )

            if os.path.dirname(checkFileChange['users']['filePath']) not in \
                    sys.path:
                sys.path.append(os.path.dirname(
                    checkFileChange['users']['filePath']))
                log.info("[Main]: Added following path '{path}' to sys.path\n"
                    "'{syspath}'".format(
                    path=os.path.dirname(checkFileChange['users']['filePath']),
                    syspath=sys.path)
                )
            user_details = importlib.import_module(
                os.path.splitext(
                    os.path.basename(checkFileChange['users']['filePath']))[0])
        else:
            raise("User Details file doesn't exists, '{file}'".format(
                file=checkFileChange['users']['filePath'])
            )

        if sendMetricsOf == "all" or sendMetricsOf == "alerts":
            if os.path.isfile(checkFileChange['alerts']['filePath']):
                log.info("[Main]: Alert Details file '{file}' exists. "
                    "Importing the module and log its content".format(
                        file=checkFileChange['alerts']['filePath']
                    )
                )

                if os.path.dirname(checkFileChange['alerts']['filePath']) not in \
                        sys.path:
                    log.info("[Main]: Adding following path to sys, '{path}'".format(
                        path=checkFileChange['alerts']['filePath'])
                    )
                    sys.path.append(os.path.dirname(
                        checkFileChange['alerts']['filePath']))
                alert_details = importlib.import_module(
                    os.path.splitext(
                        os.path.basename(checkFileChange['alerts']['filePath']))[0])
        else:
            raise("Alert Details file doesn't exists, '{file}'".format(
                file=checkFileChange['alerts']['filePath'])
            )

        if sendMetricsOf == "all" or sendMetricsOf == "events":
            if os.path.isfile(checkFileChange['events']['filePath']):
                log.info("[Main]: Event Details file '{file}' exists. "
                    "Importing the module and log its content".format(
                        file=checkFileChange['events']['filePath']
                    )
                )

                if os.path.dirname(checkFileChange['events']['filePath']) not in \
                        sys.path:
                    log.info("[Main]: Adding following path to sys, '{path}'".format(
                        path=checkFileChange['events']['filePath'])
                    )
                    sys.path.append(os.path.dirname(
                        checkFileChange['events']['filePath']))
                event_details = importlib.import_module(
                    os.path.splitext(
                        os.path.basename(checkFileChange['events']['filePath']))[0])
        else:
            raise Exception("Alert Details file doesn't exists, '{file}'".format(
                file=checkFileChange['alerts']['filePath'])
            )

        _loginsight_wavefront_integration()
    except Exception as exception:
        log.error(
            "[Main]: LogInsight Wavefront Integration failed with exception,\n "
            "'{exp}'".format(exp=exception)
        )
Reply
#2
I think perhaps a good place for a task like this is under jobs section.
Reply
#3
I need help in optimizing below code. I am a beginner in Python and used multiprocessing for the first time. Below script execution is taking a long time and causing issues for me to keep up with real-time representation.

We have a logs server and I am trying to convert lines and alerts (we can create alerts in logs server) into metrics. But, as there are like 2000 lines every 30 seconds, the conversion of logs to metrics is taking more than 5 minutes but I want it under 30 seconds itself to keep up with real time.

Example:
log pattern - ".*(?P<REQUEST>GET|POST)\s+(?P<URI>/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*)[a-zA-Z0-9/_-]*\s+HTTP/\d+.\d+'\s+'[^:/ ]+.?[0-9]*'\s+(?P<RESPONSE>\d+)(.*\s+)(?P<RESPONSETIME>\d+.\d+)\s+"

I convert above one to metric with value as response time and other match group items as labels to that metric

We call each line as an event, hence code has the naming convention 'event'

#!/usr/bin/env python

import datetime
import socket
import subprocess
import time
import shlex
import logging.handlers
import os
import sys
import yaml
import base64
import re
import importlib
import json
import math
import gc
import threading
import multiprocessing
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Logging related variables
LOG_FILE_NAME = "/var/log/loginsightWavefrontIntegration.log"
LOGGING_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
handler = logging.handlers.RotatingFileHandler(
    LOG_FILE_NAME, mode="a", maxBytes=10000000, backupCount=5
)
handler.setFormatter(formatter)

log = logging.getLogger("LogInsightWavefrontIntegration")
log.setLevel(LOGGING_LEVEL)
log.addHandler(handler)

stdoutAndstderr = logging.StreamHandler()
stdoutAndstderr.setLevel(LOGGING_LEVEL)
stdoutAndstderr.setFormatter(formatter)
log.addHandler(stdoutAndstderr)

# system level variables
hostname = socket.gethostname()
wavefrontProxyServiceName = os.environ.get("WAVEFRONT_PROXY_SERVICE_NAME")
# SEND_METRICS_OF supported values - all, events, alerts
sendMetricsOf = os.environ.get("SEND_METRICS_OF")

# Files related variables
checkFileChange = {
    "users": {
        "filePath": os.environ.get("USER_DETAILS_FILE"),
        "fileContents": None
    },
    "alerts": {
        "filePath": os.environ.get("LOGINSIGHT_ALERT_DETAILS_FILE"),
        "fileContents": None
    },
    "events": {
        "filePath": os.environ.get("LOGINSIGHT_EVENT_DETAILS_FILE"),
        "fileContents": None
    }
}

sessionID = None
HTTP_RESPONSE_LOGIN_TIMEOUT_CODE = 440
POST = 'POST'
GET = 'GET'
ZERO = 0

confirmedAlertIDsToMonitor = []
WF_METRIC_ALERTS = {}
licfResponse = {}
licfResponseConversion = {}
match = {}
fullMessages = {}
alertResponse = {}
status = {}
allEvents = {}
eventData = {}
populateEventData = {}
threads = {}
setLastAlertEpochTime = {}
Semaphore = None
activeProcesses = []
epoch_time = int(datetime.datetime.now().strftime("%s"))
EVENT_LAST_POPULATED_EPOCH_TIME = 0
EVENT_POPULATE_TIME_GREATER = 0
EVENT_POPULATE_TIME_LESS = 0
# Time backwards from current epoch time to verify metric sent to wavefront
TIME_MINUS = 120


# Multithreading class
# A thread gets initialized when the class is invoked with thread name
class ProcessClassToPushMetrics(multiprocessing.Process):
    def __init__(self, data):
        log.info("[ProcessClassToPushMetrics,__init__]: data variable content, "
            "'{data}'".format(data=data))
        self.dataName = data['name']
        if self.dataName == "alerts":
            multiprocessing.Process.__init__(self, name="Thread-Alert-" + data['alertID'])
            self.alertID = data['alertID']
        elif self.dataName == "events":
            multiprocessing.Process.__init__(self, name="Thread-Event-" + data['threadName'])
            self.threadName = data['threadName']
            self.eventName = data['eventName']
            self.startTime = data['startTime']
            self.endTime = data['endTime']

        del data
        gc.collect()

    # run event related threads
    def _run_event_process(self, process_name):
        global allEvents, eventData, status, \
            EVENT_POPULATE_TIME_GREATER, EVENT_POPULATE_TIME_LESS

        self.metricName = "events.{name}".format(name=self.eventName)
        log.info("[{logprefix}]: Metric name to be used is '{metricName}'".format(
            logprefix=process_name,
            metricName=self.metricName)
        )

        status[self.threadName] = _populate_event_data(
            name=self.eventName,
            threadName = self.threadName,
            logprefix=process_name,
            greaterthan=self.startTime,
            lessthan=self.endTime
        )

        if status[self.threadName]:
            if eventData[self.eventName][self.threadName]:
                for self.event in eventData[self.eventName][self.threadName]:
                    _send_to_wavefront(
                        metricName=self.metricName,
                        time=self.event['epoch_time'],
                        value=self.event['value'],
                        logprefix=process_name,
                        labelsAsString=self.event['labelsAsString']
                    )

        del process_name, status[self.threadName]
        gc.collect()

    # run alert related threads
    def _run_alert_process(self, process_name):
        global WF_METRIC_ALERTS, alertResponse, status, setLastAlertEpochTime

        _metricName = "alerts.{name}".format(
            name=WF_METRIC_ALERTS[self.alertID]['metricName']
        )

        log.info("[{logprefix}]: Metric name to be used is '{metricName}'".format(
            logprefix=process_name,
            metricName=_metricName)
        )

        status[self.alertID] = _populate_alert_history(alertID=self.alertID,
                                                       logprefix=process_name)

        alertResponse[self.alertID]['labelsAsString'] = ""
        if 'labels' in WF_METRIC_ALERTS[self.alertID]:
            for key in WF_METRIC_ALERTS[self.alertID]['labels']:
                keyPlusValue = '\\"{key}\\"=\\"{value}\\"'.format(
                    key=key, value=WF_METRIC_ALERTS[self.alertID]['labels'][key]
                )
                alertResponse[self.alertID]['labelsAsString'] += " " + keyPlusValue

        if status[self.alertID]:
            for timestamp in alertResponse[self.alertID].keys():
                _send_to_wavefront(
                    metricName=_metricName,
                    time=timestamp,
                    value=alertResponse[self.alertID][timestamp],
                    logprefix=process_name,
                    labelsAsString=alertResponse[self.alertID]['labelsAsString']
                )
        else:
            _send_to_wavefront(
                metricName=_metricName,
                time=epoch_time,
                value=ZERO,
                logprefix=process_name,
                labelsAsString=alertResponse[self.alertID]['labelsAsString']
            )

        alertResponse[self.alertID] = {}
        WF_METRIC_ALERTS[self.alertID]['lastAlertEpochTime'] = \
            setLastAlertEpochTime[self.alertID]

        del _metricName, status[self.alertID], process_name
        gc.collect()

    # run function will be executed when we use process_name.start() cmd
    def run(self):
        global user_details, Semaphore, activeProcesses
        self.processName = ""

        Semaphore.acquire()
        self.processName = multiprocessing.current_process().name
        activeProcesses.append(self.processName)
        log.info("[Semaphore,Acquire]: Running: {activeProcessesCount} Processes"
            ".".format(activeProcessesCount=len(activeProcesses)))
        try:
            if self.dataName == "alerts":
                self._run_alert_process(process_name=self.processName)
            elif self.dataName == "events":
                self._run_event_process(process_name=self.processName)
        finally:
            activeProcesses.remove(self.processName)
            log.info("[Semaphore,Release]: Running: {activeProcessesCount} Processes"
                ".".format(activeProcessesCount=len(activeProcesses)))
            Semaphore.release()

        gc.collect()


# send integration active status to wavefront
def _send_integration_status_to_wavefront(active):
    _send_to_wavefront(
        metricName="integration.symphony.status.podactive",
        time=epoch_time,
        value=active,
        logprefix="_send_integration_status_to_wavefront"
    )

# verify metric points of particular hostname
def _verify_metric_of(number, name):
    _status = False
    _epoch_minus = epoch_time - TIME_MINUS
    _epoch_minus_in_millisec = _epoch_minus * 1000
    _last_update = 0
    _last_update_trim = 0
    _url = "https://{wfinstance}/api/v2/chart/raw?host={hostname}&" \
        "metric=loginsight.integration.symphony.status.podactive&startTime={starttime}".format(
            wfinstance=user_details.wavefront['WAVEFRONT_INSTANCE'],
            hostname=name + '-' + str(number),
            starttime=str(_epoch_minus_in_millisec)
        )
    _headers={"Authorization": "Bearer {apikey}".format(
        apikey=base64.b64decode(
            user_details.wavefront['WAVEFRONT_INSTANCE_API_KEY']).decode())
    }

    response = requests.get(_url, headers=_headers)
    log.info("[_verify_metric_of]: Metric Chart Raw api '{url}' response content "
        "'{content}'".format(
            url=_url,
            content=response.content
        )
    )

    response_json = response.json()
    if response_json:
        _points = response_json[0]['points']
        for _point in _points:
            if _last_update < _point['timestamp']:
                _last_update = _point['timestamp']

        # last update time is in milli seconds. so rounding it to seconds.
        _last_update_trim = int(str(_last_update)[0:len(str(_epoch_minus))])
        log.info("[_verify_metric_of]: Last update trimmed value is {value}".format(
            value=_last_update_trim))
        log.info("[_verify_metric_of]: Current Epoch Time - {epoch_time} and Epoch "
            "Time with minus {time_minus} seconds {epoch_minus}".format(
                epoch_time=epoch_time,
                time_minus=TIME_MINUS,
                epoch_minus=_epoch_minus
            )
        )
        # metric last update comparison
        if  _last_update_trim < _epoch_minus:
            log.warning("Last update is more than {sec} seconds, "
                "hence returning false".format(sec=TIME_MINUS))
        else:
            log.info("Last update is less than {sec} seconds, "
                "hence returning true.".format(sec=TIME_MINUS))
            _status = True

    try:
        return _status
    finally:
        del _status, _url, _headers, _epoch_minus, _last_update, _last_update_trim, \
            response_json, response
        gc.collect()

# Verify metrics getting pushed to wavefront
def _verify_metrics_in_wavefront():
    _status = False
    _statefulset_number = None
    _statefulset_name = None

    match = re.match("(?P<STATEFULSETNAME>.*)-(?P<STATEFULSETNUMBER>\d+)", hostname)

    if match and int(match.groupdict()['STATEFULSETNUMBER']):
        _statefulset_number = int(match.groupdict()['STATEFULSETNUMBER'])
        _statefulset_name = match.groupdict()['STATEFULSETNAME']
        for replica in range(0, _statefulset_number):
            if _verify_metric_of(number=replica, name=_statefulset_name):
                _status = True
                break

    try:
        return _status
    finally:
        del _status, _statefulset_number, _statefulset_name
        gc.collect()


# python requests command execution
def _execute_licf_api_cmd(cmd, api, logprefix, data={}, headers={}):
    global sessionID
    _responseConversion = None
    _response = None
    _url = "https://{srvName}:{srvPort}/{api}".format(
        srvName=user_details.loginsight['LOGINSIGHT_SERVER_NAME'],
        srvPort=user_details.loginsight['LOGINSIGHT_SERVER_PORT'],
        api=api
    )

    try:
        if cmd == POST:
            if data:
                _response = requests.post(url=_url, json=data, verify=False)
            else:
                raise Exception("data variable passed to this function was empty for "
                    "POST requests: '{url}'".format(url=_url)
                )
        elif cmd == GET:
            headers['Authorization'] = "Bearer {sessionID}".format(sessionID=sessionID)
            _response = requests.get(url=_url, headers=headers, verify=False)

        log.info("[_execute_licf_api_cmd,{logprefix}]: Executed LICF API command "
            "with,\n url: '{url}'\ncmd: '{cmd}'\ndata: {data}\n"
            "headers: {headers}\n\n".format(
                url=_url,
                cmd=cmd,
                data=data,
                headers=headers,
                logprefix=logprefix
            )
        )

        if _response.status_code == requests.codes.ok:
            _responseConversion = _response.content

            if type(_responseConversion) == bytes:
                log.info("[_execute_licf_api_cmd,{logprefix}]: Since response of licf "
                    "api is of type 'bytes', try decoding it...".format(
                        logprefix=logprefix
                    )
                )
                try:
                    _responseConversion = _responseConversion.decode('utf-8')
                except Exception as exception:
                    log.info("[_execute_licf_api_cmd,{logprefix}]: Can't decode since "
                        "'{ex}'".format(logprefix=logprefix, ex=exception))

                try:
                    _responseConversion = _responseConversion.replace("\r", '')
                except Exception as exception:
                    log.info("[_execute_licf_api_cmd,{logprefix}]: Can't replace '\\r' "
                        "since '{ex}'".format(logprefix=logprefix, ex=exception))

            _responseConversion = _responseConversion.replace('\\"', '\'')
            _responseConversion = json.dumps(_responseConversion)
            _responseConversion = yaml.safe_load(_responseConversion)
            if type(_responseConversion) in [bytes, bytearray, str]:
                _responseConversion = json.loads(_responseConversion)

            log.info("[_execute_licf_api_cmd,{logprefix}]: Exiting the function by "
                "returning response\n\n\n.".format(logprefix=logprefix))
            return _responseConversion
        elif _response.status_code == HTTP_RESPONSE_LOGIN_TIMEOUT_CODE:
            log.warning("[_execute_licf_api_cmd,{logprefix}]: LICF Rest API login "
                "session has timed out. Reconnecting....".format(logprefix=logprefix))
            _connect_to_loginsight_server()

            log.warning("[_execute_licf_api_cmd,{logprefix}]: Executing licf api "
                "command again...".format(logprefix=logprefix))
            return _execute_licf_api_cmd(cmd=cmd,
                                         api=api,
                                         logprefix=logprefix,
                                         data=data,
                                         headers=headers)
        else:
            log.error("[_execute_licf_api_cmd,{logprefix}]: Execution of licf api cmd "
                "has failed: {status} \nresponse was: {resp}\n\n\n".format(
                    status=_response.status_code,
                    resp=_response,
                    logprefix=logprefix
                )
            )
    except Exception as exception:
        log.warning("[_execute_licf_api_cmd,{logprefix}]: Excpetion occured, " 
            "'{ex}'".format(logprefix=logprefix, ex=exception))
        raise Exception("[_execute_licf_api_cmd,{logprefix}]: Excpetion occured, "
                        "'{ex}'".format(logprefix=logprefix, ex=exception))
    finally:
        del _response, _responseConversion, _url, cmd, api, logprefix, data, headers
        gc.collect()


# log alert details being used
def _log_alert_details():
    log.info("[_log_alert_details]: Array of dicts to be used as alert details for "
        "sending alert metrics to wavefront instance are,\n'{details}'".format(
            details=alert_details.userSpecifiedAlerts
        )
    )


# log event details being used
def _log_event_details():
    log.info("[_log_event_details]: Array of dicts to be used as event details for "
        "sending event metrics to wavefront instance are,\n'{details}'".format(
            details=event_details.userSpecifiedEvents
        )
    )


# Log Wavefront Server details being used
def _log_wf_server_details():
    _status = True
    _logText = ""

    if 'WAVEFRONT_INSTANCE' in user_details.wavefront.keys():
        _logText += "Wavefront Instance Name - {srvName}\n".format(
            srvName=user_details.wavefront['WAVEFRONT_INSTANCE']
        )
    else:
        _logText += "WAVEFRONT_INSTANCE key is not specified in " \
                   "user details module dict wavefront.\n"
        _status = False

    if 'WAVEFRONT_INSTANCE_API_KEY' in user_details.wavefront.keys():
        _logText += "Wavefront Instance API Key - {srvPort}\n".format(
            srvPort=user_details.wavefront['WAVEFRONT_INSTANCE_API_KEY']
        )
    else:
        _logText += "WAVEFRONT_INSTANCE_API_KEY key is not specified in " \
                   "user details module dict wavefront.\n"
        _status = False

    try:
        if _status:
            log.info("[_log_wf_server_details]: All required wavefront variables are "
                "specified in user details module.\n Values provided to connect to "
                "Wavefront Instance are,\n{logText}".format(logText=_logText))
        else:
            raise Exception("[_log_wf_server_details]: Not all required wavefront "
                "variables are specified in user details module.\n{logText}".format(
                    logText=_logText
                )
            )
    except Exception as exception:
        log.error(exception)
        raise Exception(exception)
    finally:
        del _logText, _status
        gc.collect()


# Log LICF Server details being used
def _log_licf_server_details():
    _status = True
    _logText = ""

    if 'LOGINSIGHT_SERVER_NAME' in user_details.loginsight.keys():
        _logText += "LogInsight Server Name - {srvName}\n".format(
            srvName=user_details.loginsight['LOGINSIGHT_SERVER_NAME']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_NAME key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_PORT' in user_details.loginsight.keys():
        _logText += "LogInsight Server Port - {srvPort}\n".format(
            srvPort=user_details.loginsight['LOGINSIGHT_SERVER_PORT']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_PORT key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_USER_NAME' in user_details.loginsight.keys():
        _logText += "LogInsight Server User Name - {userName}\n".format(
            userName=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_USER_NAME key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_USER_PASSWORD' in user_details.loginsight.keys():
        _logText += "LogInsight Server User Pwd - {userPwd}\n".format(
            userPwd=user_details.loginsight['LOGINSIGHT_SERVER_USER_PASSWORD']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_USER_PASSWORD key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    if 'LOGINSIGHT_SERVER_USER_PROVIDER' in user_details.loginsight.keys():
        _logText += "LogInsight Server User Provider - {provider}\n".format(
            provider=user_details.loginsight['LOGINSIGHT_SERVER_USER_PROVIDER']
        )
    else:
        _logText += "LOGINSIGHT_SERVER_USER_PROVIDER key is not specified in " \
            "user details module dict loginsight.\n"
        _status = False

    try:
        if _status:
            log.info("[_log_licf_server_details]: All required variables are specified "
                "in user details module.\n Values provided to connect to LogInsight "
                "Server are,\n{logText}".format(logText=_logText))
        else:
            raise Exception("[_log_licf_server_details]: Not all required variables "
                "are specified in user details module.\n{logText}".format(
                    logText=_logText
                )
            )
    except Exception as exception:
        log.error(exception)
        raise Exception(exception)
    finally:
        del _logText, _status
        gc.collect()


# get session ID for LICF server
def _connect_to_loginsight_server():
    global sessionID, user_details
    _response = None

    _data = {
        "username": base64.b64decode(
            user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']).decode(),
        "password": base64.b64decode(
            user_details.loginsight['LOGINSIGHT_SERVER_USER_PASSWORD']).decode(),
        "provider": user_details.loginsight['LOGINSIGHT_SERVER_USER_PROVIDER']
    }
    _response = _execute_licf_api_cmd(cmd=POST,
                                      api='api/v1/sessions',
                                      logprefix='_connect_to_loginsight_server',
                                      data=_data)

    sessionID = _response["sessionId"]
    log.info("[_connect_to_loginsight_server]: Session ID to connect to LICF server "
        "is, '{sessionID}'\nfor user ID: {userID}\n\n\n".format(
            sessionID=sessionID,
            userID=_response['userId']
        )
    )

    del _response, _data
    gc.collect()


# send metric data to wavefront
def _send_to_wavefront(metricName, time, value, logprefix, labelsAsString=""):
    # Formaing text to be sent to wavefront proxy.
    _sendText = (
        "loginsight.{metricName} {value} {epochTime} "
        'source=\\"{hostname}\\" {labels}'.format(
            metricName=metricName,
            value=value,
            epochTime=time,
            hostname=hostname,
            labels=labelsAsString
        )
    )
    log.info(
        "[_send_to_wavefront,{logprefix}]: Text to be sent is, {sendText}".format(
            logprefix=logprefix, sendText=_sendText
        )
    )

    # First we echo the text to be sent to pipe in 'p1' and then send
    #  it to 'p2' as input which uses netcat 'nc' to write to network
    #  connection which in this case is wavefront proxy.
    _p1 = subprocess.Popen(
        shlex.split("echo -e {sendText}".format(sendText=_sendText)),
        stdout=subprocess.PIPE,
    )
    _p2 = subprocess.Popen(
        shlex.split("nc -q -v {proxy} 2878".format(proxy=wavefrontProxyServiceName)),
        stdin=_p1.stdout,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )
    _process_output, _ = _p2.communicate()
    log.info(
        "[_send_to_wavefront,{logprefix}]: Output of nc command <EMPTY means success> "
        "is, {process_output}".format(logprefix=logprefix,
                                      process_output=_process_output)
    )

    del metricName, time, value, logprefix, labelsAsString, _sendText
    del _p1, _p2, _process_output


# check alerts specified in loginsight_alerts_details.py exists/visible to user
# user here is user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']
def _check_alerts_exists():
    global confirmedAlertIDsToMonitor, alert_details, WF_METRIC_ALERTS
    confirmedAlertIDsToMonitor = []
    _response = None
    _alertIDs = []
    _alertEnabled = {}
    _status = False

    log.info("[_check_alerts_exists]: Get details of all alerts.\n")
    _response = _execute_licf_api_cmd(cmd=GET,
                                     api='api/v1/alerts',
                                     logprefix='_check_alerts_exists')
    log.info("[_check_alerts_exists]: Received details of all alerts\n\n")

    for alert in _response:
        _alertIDs.append(alert['id'])
        _alertEnabled[alert['id']] = alert['enabled']
        log.info("[_check_alerts_exists]: Append id {id} to all alerts with its "
            "enabled status - {status}".format(id=alert['id'],
                                               status=alert['enabled']))

    log.info("[_check_alerts_exists]: User alert details - {userAlertDetails}\n\n"
        "All alerts IDs - {allAlertIDS}\n\n".format(
            userAlertDetails=alert_details.userSpecifiedAlerts,
            allAlertIDS=_alertIDs
        )
    )
    for alert in alert_details.userSpecifiedAlerts:
        log.info("[_check_alerts_exists]: Alert from user specified is - "
            "{alert}".format(alert=alert))

        if alert['id'] in _alertIDs and \
                str(_alertEnabled[alert['id']]).lower() == "true":
            log.info("[_check_alerts_exists]: Alert ID '{id}' specified in "
                "loginsight_alerts_details.py exists/visible for user {user} and is "
                "enabled.".format(
                    id=alert['id'],
                    user=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']
                )
            )
            confirmedAlertIDsToMonitor.append(alert['id'])
            if alert['id'] not in WF_METRIC_ALERTS.keys():
                WF_METRIC_ALERTS[alert['id']] = {}
                WF_METRIC_ALERTS[alert['id']]['lastAlertEpochTime'] = 0

            # metricName assignment is not done in above, if condition,
            #  to get value updated if ID already exists
            WF_METRIC_ALERTS[alert['id']]['metricName'] = alert['name']
            if 'labels' in alert.keys():
                if alert['labels']:
                    WF_METRIC_ALERTS[alert['id']]['labels'] = alert['labels']
            _status = True
        else:
            log.error("[_check_alerts_exists]: Alert ID '{id}' specified in "
                "loginsight_alerts_details.py either doesn't exist/visible for user "
                "{user} (OR) not enabled. Enabled key value for alert is "
                "'{enabled}'".format(
                    id=alert['id'],
                    user=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME'],
                    enabled=_alertEnabled[alert['id']]
                )
            )

    # Remove alerts that are not in userSpecifiedAlerts
    for alertID in WF_METRIC_ALERTS.keys():
        if alertID not in confirmedAlertIDsToMonitor:
            log.warning("[_check_alerts_exists]: Deleting alertID {alertID} from "
                "monitoring".format(alertID=alertID))
            del WF_METRIC_ALERTS[alertID]

    log.info("[_check_alerts_exists]: Populating alert details that needs monitoring "
        "is completed.\n\n\n")
    try:
        return _status
    finally:
        del _status, _response, _alertIDs, _alertEnabled
        gc.collect()


# populate alert data
def _populate_alertID_data(alertID, logprefix, timestamp):
    global alertResponse, setLastAlertEpochTime, alert_details
    _status = False
    _ndigits = int(math.log10(timestamp))+1
    try:
        timestamp = timestamp//int(10**(_ndigits-10))
        if timestamp > setLastAlertEpochTime[alertID]:
            log.info("[_populate_alertID_data,{logprefix}]: epochTime for alert "
                "{alertID} is {utime} which is greater than the highest epoch time in "
                "this iteration of results so far i.e., {savedTime}. Saving this as "
                "current highest in the iteration.".format(
                    logprefix=logprefix,
                    alertID=alertID,
                    utime=timestamp,
                    savedTime=setLastAlertEpochTime[alertID]
                )
            )
            setLastAlertEpochTime[alertID] = timestamp

        if timestamp > WF_METRIC_ALERTS[alertID]['lastAlertEpochTime']:
            _status = True
            # Check if there are more than 1 alert at same timestamp.
            if timestamp in alertResponse[alertID].keys():
                alertResponse[alertID][timestamp] += 1
            else:
                alertResponse[alertID][timestamp] = 1
        else:
            log.info("[_populate_alertID_data,{logprefix}]: alert at timestamp "
                "{timestamp} was sent to wavefront already".format(
                    logprefix=logprefix,
                    timestamp=timestamp
                )
            )
        return _status
    except ZeroDivisionError:
        log.info("[_populate_alertID_data,{logprefix}]: When removing milli seconds "
            "from timestamp to get epochtime in seconds (10 digits), "
            "'ZeroDivisionError' exception occured.")
        return _status
    finally:
        del _status, _ndigits, alertID, logprefix, timestamp
        gc.collect()


# get alert history and populate
def _populate_alert_history(alertID, logprefix):
    global alertResponse, setLastAlertEpochTime, licfResponse, \
        licfResponseConversion, fullMessages
    _status = False
    alertResponse[alertID] = {}
    licfResponse[alertID] = None
    licfResponseConversion[alertID] = {}
    fullMessages[alertID] = None

    setLastAlertEpochTime[alertID] = WF_METRIC_ALERTS[alertID]['lastAlertEpochTime']

    _api = "api/v1/alerts/{alertID}/history".format(alertID=alertID)
    log.info("[_populate_alert_history,{logprefix}]: Collect history of alert with "
        "ID, {alertID}".format(logprefix=logprefix, alertID=alertID))
    licfResponse[alertID] = _execute_licf_api_cmd(cmd=GET,
                                                  api=_api,
                                                  logprefix=logprefix)

    if not licfResponse[alertID]:
        log.warning("[_populate_alert_history,{logprefix}]: There is no history for "
            "alert ID '{alertID}'\n\n\n".format(logprefix=logprefix, alertID=alertID)
        )
    else:
        licfResponseConversion[alertID]['converted'] = None
        if isinstance(licfResponse[alertID], str):
            # Removing '\', double quotes before and after '[', ']' respectively
            licfResponse[alertID] = licfResponse[alertID].replace('\\','')
            licfResponse[alertID] = \
                str(licfResponse[alertID])[licfResponse[alertID].index('['):]
            licfResponse[alertID] = \
                str(licfResponse[alertID])[:licfResponse[alertID].rfind(']')]
            licfResponseConversion[alertID]['converted'] = \
                json.loads(licfResponse[alertID])
        else:
            licfResponseConversion[alertID]['converted'] = licfResponse[alertID]

        if isinstance(licfResponseConversion[alertID]['converted'][0]['messages'], str):
            fullMessages[alertID] = \
                json.loads(licfResponseConversion[alertID]['converted'][0]['messages'])
        else:
            fullMessages[alertID] = \
                licfResponseConversion[alertID]['converted'][0]['messages']

        log.info("[_populate_alert_history,{logprefix}]: Type of fullMessages variable "
            "is, '{typeresp}'. fullMessages content is,\n '{message}'\n\n\n\n".format(
                logprefix=logprefix,
                typeresp=type(fullMessages),
                message=fullMessages[alertID]
            )
        )

        if isinstance(fullMessages[alertID], list):
            for message in fullMessages[alertID]:
                if _populate_alertID_data(alertID=alertID,
                                          logprefix=logprefix,
                                          timestamp=message['timestamp']):
                    _status = True
        elif isinstance(fullMessages[alertID], dict):
            _status = _populate_alertID_data(
                alertID=alertID,
                logprefix=logprefix,
                timestamp=fullMessages[alertID]['timestamp']
            )
        else:
            log.error("[_populate_alert_history,{logprefix}]: messages key in LICF "
                "response is of type {typeresp} which is not 'list' or 'dict' to get "
                "timestamp of alert.".format(
                    logprefix=logprefix,
                    typeresp=type(fullMessages[alertID])
                )
            )

    try:
        return _status
    finally:
        del fullMessages[alertID], licfResponseConversion[alertID]
        del licfResponse[alertID], _status, _api, alertID, logprefix
        gc.collect()


# prepare alerts data to send to wavefront
def _send_alerts_data_to_wavefront():
    global confirmedAlertIDsToMonitor, threads
    threads['alerts'] = []
    _alertID = None
    _thread = None

    for _alertID in confirmedAlertIDsToMonitor:
        newThread = ProcessClassToPushMetrics(
            data={"name": "alerts",
                  "alertID": _alertID
            }
        )
        newThread.start()
        log.info("[_send_alerts_data_to_wavefront]: Started new thread for alert "
            "{alertID}".format(
                 alertID=_alertID
            )
        )
        threads['alerts'].append(newThread)

    del _alertID, _thread
    gc.collect()


# populate final event data
def _populate_eventname_data(name, threadName, text, timestamp, logprefix):
    global eventData, allEvents, licfResponseConversion, populateEventData, match
    populateEventData[logprefix] = {}
    _ndigits = int(math.log10(timestamp))+1

    try:
        timestamp = timestamp//int(10**(_ndigits-10))

        log.info("[_populate_eventname_data,{logprefix}]: Generating event data with,\n"
            "text '{text}'\nname '{name}'\npattern '{pattern}'".format(
                logprefix=logprefix,
                text=text,
                name=name,
                pattern=allEvents[name]['pattern']
            )
        )
        populateEventData[logprefix]['epoch_time'] = timestamp
        populateEventData[logprefix]['labels'] = {}
        populateEventData[logprefix]['labels'].update(
            licfResponseConversion[logprefix]['labels'])

        match[logprefix] = re.match(allEvents[name]['pattern'], text)
        if match[logprefix]:
            log.info("[_populate_eventname_data,{logprefix}]: matched groups, "
                "'{mgrp}'".format(
                    logprefix=logprefix,
                    mgrp=match[logprefix].groupdict()
                )
            )
            populateEventData[logprefix]['labels'].update(match[logprefix].groupdict())
            log.info("[_populate_eventname_data,{logprefix}]: label keys "
                "'{keys}'".format(
                    logprefix=logprefix,
                    keys=populateEventData[logprefix]['labels'].keys())
            )

            if allEvents[name]['metricValue'] in \
                    populateEventData[logprefix]['labels'].keys():
                populateEventData[logprefix]['value'] = \
                    populateEventData[logprefix]['labels'][allEvents[name]['metricValue']]
            else:
                return False

            del populateEventData[logprefix]['labels'][allEvents[name]['metricValue']]
            populateEventData[logprefix]['labelsAsString'] = ""
            for key in populateEventData[logprefix]['labels']:
                keyPlusValue = '\\"{key}\\"=\\"{value}\\"'.format(
                    key=key, value=populateEventData[logprefix]['labels'][key]
                )
                populateEventData[logprefix]['labelsAsString'] += " " + keyPlusValue
            eventData[name][threadName].append(populateEventData[logprefix])
            return True
    except ZeroDivisionError:
        log.info("[_populate_eventname_data,{logprefix}]: When removing milli seconds "
            "from timestamp to get epochtime in seconds (10 digits), "
            "'ZeroDivisionError' exception occured.".format(logprefix=logprefix))
        return False
    finally:
        del populateEventData[logprefix], _ndigits, name, text, timestamp, \
            match[logprefix], logprefix
        gc.collect()


# populate event data
def _populate_event_data(name, threadName, logprefix, greaterthan, lessthan):
    global allEvents, eventData, licfResponse, licfResponseConversion
    _status = False
    licfResponse[logprefix] = None
    licfResponseConversion[logprefix] = {}
    fullMessages[logprefix] = None

    _api = "api/v1/events/timestamp/%3E%3D{greaterthan}/timestamp/%3C%3D{lessthan}" \
        "{filters}?limit=9999".format(
            filters=allEvents[name]['filters'],
            greaterthan=greaterthan,
            lessthan=lessthan,
        )
    log.info("[_populate_event_data,{logprefix}]: Collect data of event with "
        "api '{api}'".format(
            logprefix=logprefix,
            api=_api
        )
    )
    licfResponse[logprefix] = _execute_licf_api_cmd(cmd=GET,
                                                    api=_api,
                                                    logprefix=logprefix)

    if not licfResponse[logprefix]:
        log.warning("[_populate_event_data,{logprefix}]: There is no event with "
            "filters '{filters}'\n\n\n".format(
                logprefix=logprefix,
                filters=allEvents[name]['filters']
            )
        )
    else:
        licfResponseConversion[logprefix]['converted'] = None
        if isinstance(licfResponse[logprefix], str):
            # Removing '\', double quotes before and after '[', ']' respectively
            licfResponse[logprefix] = licfResponse[logprefix].replace('\\','')
            licfResponse[logprefix] = \
                str(licfResponse[logprefix])[licfResponse[logprefix].index('['):]
            licfResponse[logprefix] = \
                str(licfResponse[logprefix])[:licfResponse[logprefix].rfind(']')]
            licfResponseConversion[logprefix]['converted'] = \
                json.loads(licfResponse[logprefix])
        else:
            licfResponseConversion[logprefix]['converted'] = licfResponse[logprefix]

        if isinstance(licfResponseConversion[logprefix]['converted']['events'], str):
            fullMessages[logprefix] = \
                json.loads(licfResponseConversion[logprefix]['converted']['events'])
        else:
            fullMessages[logprefix] = \
                licfResponseConversion[logprefix]['converted']['events']

        licfResponseConversion[logprefix]['labels'] = {}
        if 'customLabels'in allEvents[name]:
            licfResponseConversion[logprefix]['labels'].update(
                allEvents[name]['customLabels']
            )

        if isinstance(fullMessages[logprefix], list):
            for message in fullMessages[logprefix]:
                if isinstance(message['fields'], str):
                    licfResponseConversion[logprefix]['fields'] = \
                        json.loads(message['fields'])
                else:
                    licfResponseConversion[logprefix]['fields'] = \
                        message['fields']

                for field in licfResponseConversion[logprefix]['fields']:
                    if field['name'] in allEvents[name]['licfFieldsAsLabels']:
                        licfResponseConversion[logprefix]['labels'][field['name']] = \
                            field['content']

                if _populate_eventname_data(name=name,
                                            threadName=threadName,
                                            text=message['text'],
                                            timestamp=message['timestamp'],
                                            logprefix=logprefix):
                    _status = True
        elif isinstance(fullMessages[name], dict):
            if isinstance(fullMessages[name]['fields'], str):
                licfResponseConversion[name]['fields'] = \
                    json.loads(fullMessages[name]['fields'])
            else:
                licfResponseConversion[name]['fields'] = \
                    fullMessages[name]['fields']
            for field in licfResponseConversion[name]['fields']:
                if field['name'] in allEvents[name]['licfFieldsAsLabels']:
                    licfResponseConversion[name]['labels'][field['name']] = \
                        field['content']
            _status = _populate_eventname_data(name=name,
                                               threadName=threadName,
                                               text=fullMessages['text'],
                                               timestamp=fullMessages['timestamp'],
                                               logprefix=logprefix)
        else:
            log.error("[_populate_event_data,{logprefix}]: events key in LICF response"
                "is of type {typeresp} which is not 'list' or 'dict' to get events "
                "timestamp, text etc.".format(
                    logprefix=logprefix,
                    typeresp=type(fullMessages[logprefix])
                )
            )

    try:
        return _status
    finally:
        del licfResponseConversion[logprefix], _status, _api
        del licfResponse[logprefix], fullMessages[logprefix], name, logprefix
        gc.collect()


# send events data to wavefront
def _send_events_data_to_wavefront():
    global allEvents, threads
    threads['events'] = []
    _thread = None
    _eventName = None
    _threadName = None

    for _eventName in allEvents.keys():
        _start_time_of_event_data_to_be_populated = EVENT_POPULATE_TIME_GREATER
        _end_time_of_event_data_to_be_populated = EVENT_POPULATE_TIME_GREATER + 1000
        eventData[_eventName] = {}
        while _end_time_of_event_data_to_be_populated <= EVENT_POPULATE_TIME_LESS:
            log.info("[_send_events_data_to_wavefront]: Start and End time being used "
                "are, StartTime '{starttime}' and EndTime '{endtime}' which are less "
                "than the time till which we need to send metrics, '{time}'".format(
                    starttime=_start_time_of_event_data_to_be_populated,
                    endtime=_end_time_of_event_data_to_be_populated,
                    time=EVENT_POPULATE_TIME_LESS
                )
            )
            _threadName = _eventName + "-" + \
                str(_start_time_of_event_data_to_be_populated)
            eventData[_eventName][_threadName] = []
            newThread = ProcessClassToPushMetrics(
                data={
                    "name": "events",
                    "eventName": _eventName,
                    "threadName": _threadName,
                    "startTime": _start_time_of_event_data_to_be_populated,
                    "endTime": _end_time_of_event_data_to_be_populated
                }
            )
            newThread.start()
            log.info("[_send_events_data_to_wavefront]: Started new thread for event "
                "{eventName}".format(eventName=_eventName))
            threads['events'].append(newThread)
            _start_time_of_event_data_to_be_populated = \
                _end_time_of_event_data_to_be_populated
            _end_time_of_event_data_to_be_populated += 1000

    del _thread, _eventName, _threadName
    gc.collect()


# populate user defined events from file
def _populate_userdefined_events():
    global allEvents, event_details

    _status = False
    log.info("[_populate_userdefined_events]: User defined event details - \n"
        "{userEventDetails}\n\n".format(
            userEventDetails=event_details.userSpecifiedEvents
        )
    )

    for event in event_details.userSpecifiedEvents:
        allEvents[event['name']] = {}
        allEvents[event['name']]['filters'] = event['filters']
        allEvents[event['name']]['pattern'] = event['pattern']
        allEvents[event['name']]['metricValue'] = event['metricValue']
        if 'licfFieldsAsLabels' in event.keys():
            allEvents[event['name']]['licfFieldsAsLabels'] = event['licfFieldsAsLabels']
        if 'customLabels' in event.keys():
            allEvents[event['name']]['customLabels'] = event['customLabels']
        _status = True

    try:
        return _status
    finally:
        del _status, event


# check if file contents has changed
def _check_file_change(name):
    global checkFileChange
    fileContents = None

    try:
        log.info("[_check_file_change]: Load file '{file}' contents.".format(
            file=checkFileChange[name]['filePath'])
        )
        with open(checkFileChange[name]['filePath'], 'r') as inputfile:
            fileContents = inputfile.read()

        if fileContents == checkFileChange[name]['fileContents']:
            log.info("[_check_file_change]: File '{file}' contents have not "
                "changed.".format(file=checkFileChange[name]['filePath']))
            return False
        else:
            log.info("[_check_file_change]: File '{file}' contents have changed."
                "Hence, details will be reloaded.".format(
                    file=checkFileChange[name]['filePath']
                )
            )
            # Saving latest filecontents to check the change next time.
            checkFileChange[name]['fileContents'] = fileContents
            return True
    except Exception as exception:
        log.error("[_check_file_change]: Exception occured while checking if file, "
            "'{file}' has changed,\n Exception is, '{exception}'".format(
                file=name,
                exception=exception
            )
        )
    finally:
        del fileContents
        gc.collect()


# Integrate LICF and Wavefront
def _loginsight_wavefront_integration():
    global epoch_time, user_details, alert_details, event_details, Semaphore, \
        threads, EVENT_LAST_POPULATED_EPOCH_TIME, EVENT_POPULATE_TIME_GREATER, \
        EVENT_POPULATE_TIME_LESS, eventData
    _active = True
    _sendEventMetrics = True
    _sendAlertMetrics = True

    while 1:
        try:
            log.info("[_loginsight_wavefront_integration]: "
                "Start of a cycle in While 1 indefinite loop.\n\n")
            if _check_file_change(name="users"):
                # Reload the module to get latest values.
                user_details = importlib.reload(user_details)
                _log_licf_server_details()
                _log_wf_server_details()
                _connect_to_loginsight_server()
                #Semaphore = multiprocessing.BoundedSemaphore(
                #    user_details.NoOfThreadsExecutionAtOnce
                #)
                Semaphore = multiprocessing.BoundedSemaphore(9999)

            epoch_time = int(datetime.datetime.now().strftime("%s"))

            if _verify_metrics_in_wavefront():
                log.info("[_loginsight_wavefront_integration]: As other statefulset "
                    "pods are sending metrics, this instance will be in inactive mode")
                _send_integration_status_to_wavefront(active=0)
                time.sleep(5)
                _active = False
                continue
            else:
                log.info("[_loginsight_wavefront_integration]: This instance should "
                "send metrics, hence sending status as active")
                _send_integration_status_to_wavefront(active=1)

            if sendMetricsOf == "events" or sendMetricsOf == "all":
                if _check_file_change(name="events"):
                    # Reload the module to get latest values.
                    event_details = importlib.reload(event_details)
                    _log_event_details()

                    if not _populate_userdefined_events():
                        log.error("[_loginsight_wavefront_integration]: There are no "
                            "user defined events that exists in "
                            "loginsight_event_details.py for which metrics needs to "
                            "be sent to wavefront.")
                        _sendEventMetrics = False
                    else:
                        _sendEventMetrics = True

                if _sendEventMetrics:
                    log.info("[_loginsight_wavefront_integration]: There are user "
                        "defined events specified in loginsight_event_details.py "
                        "for which metrics needs to be sent to wavefront.")
                    if not EVENT_LAST_POPULATED_EPOCH_TIME:
                        EVENT_LAST_POPULATED_EPOCH_TIME = (epoch_time - 90) * 1000
                    EVENT_POPULATE_TIME_GREATER = EVENT_LAST_POPULATED_EPOCH_TIME
                    EVENT_POPULATE_TIME_LESS = (epoch_time - 60) * 1000
                    log.info("[_loginsight_wavefront_integration]: epoch time is "
                        "'{etime}'.\nTimes between which events needs to be populated, "
                        "'{gtime} - {ltime}'".format(
                            etime=epoch_time,
                            gtime=EVENT_POPULATE_TIME_GREATER,
                            ltime=EVENT_POPULATE_TIME_LESS
                        )
                    )
                    if EVENT_POPULATE_TIME_LESS > EVENT_POPULATE_TIME_GREATER:
                        try:
                            _send_events_data_to_wavefront()
                        finally:
                            EVENT_LAST_POPULATED_EPOCH_TIME = EVENT_POPULATE_TIME_LESS

            if sendMetricsOf == "alerts" or sendMetricsOf == "all":
                if _check_file_change(name="alerts"):
                    # Reload the module to get latest values.
                    alert_details = importlib.reload(alert_details)
                    _log_alert_details()

                    if not _check_alerts_exists():
                        log.error("[_loginsight_wavefront_integration]: There are no "
                            "alert IDs that exists/visible and enabled for which "
                            "metrics needs to be sent to wavefront.")
                        _sendAlertMetrics = False
                    else:
                        _sendAlertMetrics = True

                if _sendAlertMetrics:
                    log.info("[_loginsight_wavefront_integration]: There are user "
                        "defined alert IDs that exists/visible and enabled for which "
                        "metrics needs to be sent to wavefront.")
                    _send_alerts_data_to_wavefront()

        except Exception as exception:
            log.error("[_loginsight_wavefront_integration]: Exception occured - "
                "{exception}".format(exception=exception))
        finally:
            if _active and threads['alerts']:
                # Wait for all alert threads to complete
                log.info("[_loginsight_wavefront_integration]: "
                    "Wait for all alert threads to complete.")
                for _thread in threads['alerts']:
                    _thread.join()

                log.info("[_loginsight_wavefront_integration]: "
                    "All threads sending alerts data have completed.\n\n")
                threads['alerts'] = []

            if _active and threads['events']:
                #  Wait for all event threads to complete
                log.info("[_loginsight_wavefront_integration]: "
                    "Wait for all event threads to complete.")
                for _thread in threads['events']:
                    _thread.join()

                log.info("[_loginsight_wavefront_integration]: "
                    "All threads sending events data have completed.\n\n")
                threads['events'] = []

            log.info("[_loginsight_wavefront_integration]: "
                "End of a cycle in While 1 indefinite loop.\n\n\n\n\n")
            eventData = {}
            gc.collect()


if __name__ == "__main__":
    try:
        if os.path.isfile(checkFileChange['users']['filePath']):
            log.info("[Main]: User Details file '{file}' exists. "
                "Importing the module and log its content.".format(
                    file=checkFileChange['users']['filePath']
                )
            )

            if os.path.dirname(checkFileChange['users']['filePath']) not in \
                    sys.path:
                sys.path.append(os.path.dirname(
                    checkFileChange['users']['filePath']))
                log.info("[Main]: Added following path '{path}' to sys.path\n"
                    "'{syspath}'".format(
                    path=os.path.dirname(checkFileChange['users']['filePath']),
                    syspath=sys.path)
                )
            user_details = importlib.import_module(
                os.path.splitext(
                    os.path.basename(checkFileChange['users']['filePath']))[0])
        else:
            raise("User Details file doesn't exists, '{file}'".format(
                file=checkFileChange['users']['filePath'])
            )

        if sendMetricsOf == "all" or sendMetricsOf == "alerts":
            if os.path.isfile(checkFileChange['alerts']['filePath']):
                log.info("[Main]: Alert Details file '{file}' exists. "
                    "Importing the module and log its content".format(
                        file=checkFileChange['alerts']['filePath']
                    )
                )

                if os.path.dirname(checkFileChange['alerts']['filePath']) not in \
                        sys.path:
                    log.info("[Main]: Adding following path to sys, '{path}'".format(
                        path=checkFileChange['alerts']['filePath'])
                    )
                    sys.path.append(os.path.dirname(
                        checkFileChange['alerts']['filePath']))
                alert_details = importlib.import_module(
                    os.path.splitext(
                        os.path.basename(checkFileChange['alerts']['filePath']))[0])
        else:
            raise("Alert Details file doesn't exists, '{file}'".format(
                file=checkFileChange['alerts']['filePath'])
            )

        if sendMetricsOf == "all" or sendMetricsOf == "events":
            if os.path.isfile(checkFileChange['events']['filePath']):
                log.info("[Main]: Event Details file '{file}' exists. "
                    "Importing the module and log its content".format(
                        file=checkFileChange['events']['filePath']
                    )
                )

                if os.path.dirname(checkFileChange['events']['filePath']) not in \
                        sys.path:
                    log.info("[Main]: Adding following path to sys, '{path}'".format(
                        path=checkFileChange['events']['filePath'])
                    )
                    sys.path.append(os.path.dirname(
                        checkFileChange['events']['filePath']))
                event_details = importlib.import_module(
                    os.path.splitext(
                        os.path.basename(checkFileChange['events']['filePath']))[0])
        else:
            raise Exception("Alert Details file doesn't exists, '{file}'".format(
                file=checkFileChange['alerts']['filePath'])
            )

        _loginsight_wavefront_integration()
    except Exception as exception:
        log.error(
            "[Main]: LogInsight Wavefront Integration failed with exception,\n "
            "'{exp}'".format(exp=exception)
        )
Reply
#4
Thanks, Larz60+. I have posted the same under Jobs section too.
Reply
#5
I've merged the threads, in the job section for the moment. Please don't double-post.

Are you looking for help learning how to optimize your code, or are you looking for it to be done for you?
Reply
#6
Looking for tips if its only small mistakes I have done.
If not I am fine with some expert looking into it and re-writing if necessary (paid help too).
Reply
#7
As this is more CPU intensive, I have increased CPUs and used multiprocessing pool in a couple of places to increase the performance of script to desired ones.
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020