Jul-07-2018, 06:59 PM
I need help in optimizing below code. I am a beginner in Python and used multiprocessing for the first time. Below script execution is taking a long time and causing issues for me to keep up with real-time representation.
We have a logs server and I am trying to convert lines and alerts (we can create alerts in logs server) into metrics. But, as there are like 2000 lines every 30 seconds, the conversion of logs to metrics is taking more than 5 minutes but I want it under 30 seconds itself to keep up with real time.
Example:
log pattern - ".*(?P<REQUEST>GET|POST)\s+(?P<URI>/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*)[a-zA-Z0-9/_-]*\s+HTTP/\d+.\d+'\s+'[^:/ ]+.?[0-9]*'\s+(?P<RESPONSE>\d+)(.*\s+)(?P<RESPONSETIME>\d+.\d+)\s+"
I convert above one to metric with value as response time and other match group items as labels to that metric
We call each line as an event, hence code has the naming convention 'event'
We have a logs server and I am trying to convert lines and alerts (we can create alerts in logs server) into metrics. But, as there are like 2000 lines every 30 seconds, the conversion of logs to metrics is taking more than 5 minutes but I want it under 30 seconds itself to keep up with real time.
Example:
log pattern - ".*(?P<REQUEST>GET|POST)\s+(?P<URI>/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*/*[a-zA-Z0-9_-]*)[a-zA-Z0-9/_-]*\s+HTTP/\d+.\d+'\s+'[^:/ ]+.?[0-9]*'\s+(?P<RESPONSE>\d+)(.*\s+)(?P<RESPONSETIME>\d+.\d+)\s+"
I convert above one to metric with value as response time and other match group items as labels to that metric
We call each line as an event, hence code has the naming convention 'event'
#!/usr/bin/env python import datetime import socket import subprocess import time import shlex import logging.handlers import os import sys import yaml import base64 import re import importlib import json import math import gc import threading import multiprocessing import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # Logging related variables LOG_FILE_NAME = "/var/log/loginsightWavefrontIntegration.log" LOGGING_LEVEL = logging.DEBUG formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") handler = logging.handlers.RotatingFileHandler( LOG_FILE_NAME, mode="a", maxBytes=10000000, backupCount=5 ) handler.setFormatter(formatter) log = logging.getLogger("LogInsightWavefrontIntegration") log.setLevel(LOGGING_LEVEL) log.addHandler(handler) stdoutAndstderr = logging.StreamHandler() stdoutAndstderr.setLevel(LOGGING_LEVEL) stdoutAndstderr.setFormatter(formatter) log.addHandler(stdoutAndstderr) # system level variables hostname = socket.gethostname() wavefrontProxyServiceName = os.environ.get("WAVEFRONT_PROXY_SERVICE_NAME") # SEND_METRICS_OF supported values - all, events, alerts sendMetricsOf = os.environ.get("SEND_METRICS_OF") # Files related variables checkFileChange = { "users": { "filePath": os.environ.get("USER_DETAILS_FILE"), "fileContents": None }, "alerts": { "filePath": os.environ.get("LOGINSIGHT_ALERT_DETAILS_FILE"), "fileContents": None }, "events": { "filePath": os.environ.get("LOGINSIGHT_EVENT_DETAILS_FILE"), "fileContents": None } } sessionID = None HTTP_RESPONSE_LOGIN_TIMEOUT_CODE = 440 POST = 'POST' GET = 'GET' ZERO = 0 confirmedAlertIDsToMonitor = [] WF_METRIC_ALERTS = {} licfResponse = {} licfResponseConversion = {} match = {} fullMessages = {} alertResponse = {} status = {} allEvents = {} eventData = {} populateEventData = {} threads = {} setLastAlertEpochTime = {} Semaphore = None activeProcesses = [] epoch_time = int(datetime.datetime.now().strftime("%s")) EVENT_LAST_POPULATED_EPOCH_TIME = 0 EVENT_POPULATE_TIME_GREATER = 0 EVENT_POPULATE_TIME_LESS = 0 # Time backwards from current epoch time to verify metric sent to wavefront TIME_MINUS = 120 # Multithreading class # A thread gets initialized when the class is invoked with thread name class ProcessClassToPushMetrics(multiprocessing.Process): def __init__(self, data): log.info("[ProcessClassToPushMetrics,__init__]: data variable content, " "'{data}'".format(data=data)) self.dataName = data['name'] if self.dataName == "alerts": multiprocessing.Process.__init__(self, name="Thread-Alert-" + data['alertID']) self.alertID = data['alertID'] elif self.dataName == "events": multiprocessing.Process.__init__(self, name="Thread-Event-" + data['threadName']) self.threadName = data['threadName'] self.eventName = data['eventName'] self.startTime = data['startTime'] self.endTime = data['endTime'] del data gc.collect() # run event related threads def _run_event_process(self, process_name): global allEvents, eventData, status, \ EVENT_POPULATE_TIME_GREATER, EVENT_POPULATE_TIME_LESS self.metricName = "events.{name}".format(name=self.eventName) log.info("[{logprefix}]: Metric name to be used is '{metricName}'".format( logprefix=process_name, metricName=self.metricName) ) status[self.threadName] = _populate_event_data( name=self.eventName, threadName = self.threadName, logprefix=process_name, greaterthan=self.startTime, lessthan=self.endTime ) if status[self.threadName]: if eventData[self.eventName][self.threadName]: for self.event in eventData[self.eventName][self.threadName]: _send_to_wavefront( metricName=self.metricName, time=self.event['epoch_time'], value=self.event['value'], logprefix=process_name, labelsAsString=self.event['labelsAsString'] ) del process_name, status[self.threadName] gc.collect() # run alert related threads def _run_alert_process(self, process_name): global WF_METRIC_ALERTS, alertResponse, status, setLastAlertEpochTime _metricName = "alerts.{name}".format( name=WF_METRIC_ALERTS[self.alertID]['metricName'] ) log.info("[{logprefix}]: Metric name to be used is '{metricName}'".format( logprefix=process_name, metricName=_metricName) ) status[self.alertID] = _populate_alert_history(alertID=self.alertID, logprefix=process_name) alertResponse[self.alertID]['labelsAsString'] = "" if 'labels' in WF_METRIC_ALERTS[self.alertID]: for key in WF_METRIC_ALERTS[self.alertID]['labels']: keyPlusValue = '\\"{key}\\"=\\"{value}\\"'.format( key=key, value=WF_METRIC_ALERTS[self.alertID]['labels'][key] ) alertResponse[self.alertID]['labelsAsString'] += " " + keyPlusValue if status[self.alertID]: for timestamp in alertResponse[self.alertID].keys(): _send_to_wavefront( metricName=_metricName, time=timestamp, value=alertResponse[self.alertID][timestamp], logprefix=process_name, labelsAsString=alertResponse[self.alertID]['labelsAsString'] ) else: _send_to_wavefront( metricName=_metricName, time=epoch_time, value=ZERO, logprefix=process_name, labelsAsString=alertResponse[self.alertID]['labelsAsString'] ) alertResponse[self.alertID] = {} WF_METRIC_ALERTS[self.alertID]['lastAlertEpochTime'] = \ setLastAlertEpochTime[self.alertID] del _metricName, status[self.alertID], process_name gc.collect() # run function will be executed when we use process_name.start() cmd def run(self): global user_details, Semaphore, activeProcesses self.processName = "" Semaphore.acquire() self.processName = multiprocessing.current_process().name activeProcesses.append(self.processName) log.info("[Semaphore,Acquire]: Running: {activeProcessesCount} Processes" ".".format(activeProcessesCount=len(activeProcesses))) try: if self.dataName == "alerts": self._run_alert_process(process_name=self.processName) elif self.dataName == "events": self._run_event_process(process_name=self.processName) finally: activeProcesses.remove(self.processName) log.info("[Semaphore,Release]: Running: {activeProcessesCount} Processes" ".".format(activeProcessesCount=len(activeProcesses))) Semaphore.release() gc.collect() # send integration active status to wavefront def _send_integration_status_to_wavefront(active): _send_to_wavefront( metricName="integration.symphony.status.podactive", time=epoch_time, value=active, logprefix="_send_integration_status_to_wavefront" ) # verify metric points of particular hostname def _verify_metric_of(number, name): _status = False _epoch_minus = epoch_time - TIME_MINUS _epoch_minus_in_millisec = _epoch_minus * 1000 _last_update = 0 _last_update_trim = 0 _url = "https://{wfinstance}/api/v2/chart/raw?host={hostname}&" \ "metric=loginsight.integration.symphony.status.podactive&startTime={starttime}".format( wfinstance=user_details.wavefront['WAVEFRONT_INSTANCE'], hostname=name + '-' + str(number), starttime=str(_epoch_minus_in_millisec) ) _headers={"Authorization": "Bearer {apikey}".format( apikey=base64.b64decode( user_details.wavefront['WAVEFRONT_INSTANCE_API_KEY']).decode()) } response = requests.get(_url, headers=_headers) log.info("[_verify_metric_of]: Metric Chart Raw api '{url}' response content " "'{content}'".format( url=_url, content=response.content ) ) response_json = response.json() if response_json: _points = response_json[0]['points'] for _point in _points: if _last_update < _point['timestamp']: _last_update = _point['timestamp'] # last update time is in milli seconds. so rounding it to seconds. _last_update_trim = int(str(_last_update)[0:len(str(_epoch_minus))]) log.info("[_verify_metric_of]: Last update trimmed value is {value}".format( value=_last_update_trim)) log.info("[_verify_metric_of]: Current Epoch Time - {epoch_time} and Epoch " "Time with minus {time_minus} seconds {epoch_minus}".format( epoch_time=epoch_time, time_minus=TIME_MINUS, epoch_minus=_epoch_minus ) ) # metric last update comparison if _last_update_trim < _epoch_minus: log.warning("Last update is more than {sec} seconds, " "hence returning false".format(sec=TIME_MINUS)) else: log.info("Last update is less than {sec} seconds, " "hence returning true.".format(sec=TIME_MINUS)) _status = True try: return _status finally: del _status, _url, _headers, _epoch_minus, _last_update, _last_update_trim, \ response_json, response gc.collect() # Verify metrics getting pushed to wavefront def _verify_metrics_in_wavefront(): _status = False _statefulset_number = None _statefulset_name = None match = re.match("(?P<STATEFULSETNAME>.*)-(?P<STATEFULSETNUMBER>\d+)", hostname) if match and int(match.groupdict()['STATEFULSETNUMBER']): _statefulset_number = int(match.groupdict()['STATEFULSETNUMBER']) _statefulset_name = match.groupdict()['STATEFULSETNAME'] for replica in range(0, _statefulset_number): if _verify_metric_of(number=replica, name=_statefulset_name): _status = True break try: return _status finally: del _status, _statefulset_number, _statefulset_name gc.collect() # python requests command execution def _execute_licf_api_cmd(cmd, api, logprefix, data={}, headers={}): global sessionID _responseConversion = None _response = None _url = "https://{srvName}:{srvPort}/{api}".format( srvName=user_details.loginsight['LOGINSIGHT_SERVER_NAME'], srvPort=user_details.loginsight['LOGINSIGHT_SERVER_PORT'], api=api ) try: if cmd == POST: if data: _response = requests.post(url=_url, json=data, verify=False) else: raise Exception("data variable passed to this function was empty for " "POST requests: '{url}'".format(url=_url) ) elif cmd == GET: headers['Authorization'] = "Bearer {sessionID}".format(sessionID=sessionID) _response = requests.get(url=_url, headers=headers, verify=False) log.info("[_execute_licf_api_cmd,{logprefix}]: Executed LICF API command " "with,\n url: '{url}'\ncmd: '{cmd}'\ndata: {data}\n" "headers: {headers}\n\n".format( url=_url, cmd=cmd, data=data, headers=headers, logprefix=logprefix ) ) if _response.status_code == requests.codes.ok: _responseConversion = _response.content if type(_responseConversion) == bytes: log.info("[_execute_licf_api_cmd,{logprefix}]: Since response of licf " "api is of type 'bytes', try decoding it...".format( logprefix=logprefix ) ) try: _responseConversion = _responseConversion.decode('utf-8') except Exception as exception: log.info("[_execute_licf_api_cmd,{logprefix}]: Can't decode since " "'{ex}'".format(logprefix=logprefix, ex=exception)) try: _responseConversion = _responseConversion.replace("\r", '') except Exception as exception: log.info("[_execute_licf_api_cmd,{logprefix}]: Can't replace '\\r' " "since '{ex}'".format(logprefix=logprefix, ex=exception)) _responseConversion = _responseConversion.replace('\\"', '\'') _responseConversion = json.dumps(_responseConversion) _responseConversion = yaml.safe_load(_responseConversion) if type(_responseConversion) in [bytes, bytearray, str]: _responseConversion = json.loads(_responseConversion) log.info("[_execute_licf_api_cmd,{logprefix}]: Exiting the function by " "returning response\n\n\n.".format(logprefix=logprefix)) return _responseConversion elif _response.status_code == HTTP_RESPONSE_LOGIN_TIMEOUT_CODE: log.warning("[_execute_licf_api_cmd,{logprefix}]: LICF Rest API login " "session has timed out. Reconnecting....".format(logprefix=logprefix)) _connect_to_loginsight_server() log.warning("[_execute_licf_api_cmd,{logprefix}]: Executing licf api " "command again...".format(logprefix=logprefix)) return _execute_licf_api_cmd(cmd=cmd, api=api, logprefix=logprefix, data=data, headers=headers) else: log.error("[_execute_licf_api_cmd,{logprefix}]: Execution of licf api cmd " "has failed: {status} \nresponse was: {resp}\n\n\n".format( status=_response.status_code, resp=_response, logprefix=logprefix ) ) except Exception as exception: log.warning("[_execute_licf_api_cmd,{logprefix}]: Excpetion occured, " "'{ex}'".format(logprefix=logprefix, ex=exception)) raise Exception("[_execute_licf_api_cmd,{logprefix}]: Excpetion occured, " "'{ex}'".format(logprefix=logprefix, ex=exception)) finally: del _response, _responseConversion, _url, cmd, api, logprefix, data, headers gc.collect() # log alert details being used def _log_alert_details(): log.info("[_log_alert_details]: Array of dicts to be used as alert details for " "sending alert metrics to wavefront instance are,\n'{details}'".format( details=alert_details.userSpecifiedAlerts ) ) # log event details being used def _log_event_details(): log.info("[_log_event_details]: Array of dicts to be used as event details for " "sending event metrics to wavefront instance are,\n'{details}'".format( details=event_details.userSpecifiedEvents ) ) # Log Wavefront Server details being used def _log_wf_server_details(): _status = True _logText = "" if 'WAVEFRONT_INSTANCE' in user_details.wavefront.keys(): _logText += "Wavefront Instance Name - {srvName}\n".format( srvName=user_details.wavefront['WAVEFRONT_INSTANCE'] ) else: _logText += "WAVEFRONT_INSTANCE key is not specified in " \ "user details module dict wavefront.\n" _status = False if 'WAVEFRONT_INSTANCE_API_KEY' in user_details.wavefront.keys(): _logText += "Wavefront Instance API Key - {srvPort}\n".format( srvPort=user_details.wavefront['WAVEFRONT_INSTANCE_API_KEY'] ) else: _logText += "WAVEFRONT_INSTANCE_API_KEY key is not specified in " \ "user details module dict wavefront.\n" _status = False try: if _status: log.info("[_log_wf_server_details]: All required wavefront variables are " "specified in user details module.\n Values provided to connect to " "Wavefront Instance are,\n{logText}".format(logText=_logText)) else: raise Exception("[_log_wf_server_details]: Not all required wavefront " "variables are specified in user details module.\n{logText}".format( logText=_logText ) ) except Exception as exception: log.error(exception) raise Exception(exception) finally: del _logText, _status gc.collect() # Log LICF Server details being used def _log_licf_server_details(): _status = True _logText = "" if 'LOGINSIGHT_SERVER_NAME' in user_details.loginsight.keys(): _logText += "LogInsight Server Name - {srvName}\n".format( srvName=user_details.loginsight['LOGINSIGHT_SERVER_NAME'] ) else: _logText += "LOGINSIGHT_SERVER_NAME key is not specified in " \ "user details module dict loginsight.\n" _status = False if 'LOGINSIGHT_SERVER_PORT' in user_details.loginsight.keys(): _logText += "LogInsight Server Port - {srvPort}\n".format( srvPort=user_details.loginsight['LOGINSIGHT_SERVER_PORT'] ) else: _logText += "LOGINSIGHT_SERVER_PORT key is not specified in " \ "user details module dict loginsight.\n" _status = False if 'LOGINSIGHT_SERVER_USER_NAME' in user_details.loginsight.keys(): _logText += "LogInsight Server User Name - {userName}\n".format( userName=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME'] ) else: _logText += "LOGINSIGHT_SERVER_USER_NAME key is not specified in " \ "user details module dict loginsight.\n" _status = False if 'LOGINSIGHT_SERVER_USER_PASSWORD' in user_details.loginsight.keys(): _logText += "LogInsight Server User Pwd - {userPwd}\n".format( userPwd=user_details.loginsight['LOGINSIGHT_SERVER_USER_PASSWORD'] ) else: _logText += "LOGINSIGHT_SERVER_USER_PASSWORD key is not specified in " \ "user details module dict loginsight.\n" _status = False if 'LOGINSIGHT_SERVER_USER_PROVIDER' in user_details.loginsight.keys(): _logText += "LogInsight Server User Provider - {provider}\n".format( provider=user_details.loginsight['LOGINSIGHT_SERVER_USER_PROVIDER'] ) else: _logText += "LOGINSIGHT_SERVER_USER_PROVIDER key is not specified in " \ "user details module dict loginsight.\n" _status = False try: if _status: log.info("[_log_licf_server_details]: All required variables are specified " "in user details module.\n Values provided to connect to LogInsight " "Server are,\n{logText}".format(logText=_logText)) else: raise Exception("[_log_licf_server_details]: Not all required variables " "are specified in user details module.\n{logText}".format( logText=_logText ) ) except Exception as exception: log.error(exception) raise Exception(exception) finally: del _logText, _status gc.collect() # get session ID for LICF server def _connect_to_loginsight_server(): global sessionID, user_details _response = None _data = { "username": base64.b64decode( user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME']).decode(), "password": base64.b64decode( user_details.loginsight['LOGINSIGHT_SERVER_USER_PASSWORD']).decode(), "provider": user_details.loginsight['LOGINSIGHT_SERVER_USER_PROVIDER'] } _response = _execute_licf_api_cmd(cmd=POST, api='api/v1/sessions', logprefix='_connect_to_loginsight_server', data=_data) sessionID = _response["sessionId"] log.info("[_connect_to_loginsight_server]: Session ID to connect to LICF server " "is, '{sessionID}'\nfor user ID: {userID}\n\n\n".format( sessionID=sessionID, userID=_response['userId'] ) ) del _response, _data gc.collect() # send metric data to wavefront def _send_to_wavefront(metricName, time, value, logprefix, labelsAsString=""): # Formaing text to be sent to wavefront proxy. _sendText = ( "loginsight.{metricName} {value} {epochTime} " 'source=\\"{hostname}\\" {labels}'.format( metricName=metricName, value=value, epochTime=time, hostname=hostname, labels=labelsAsString ) ) log.info( "[_send_to_wavefront,{logprefix}]: Text to be sent is, {sendText}".format( logprefix=logprefix, sendText=_sendText ) ) # First we echo the text to be sent to pipe in 'p1' and then send # it to 'p2' as input which uses netcat 'nc' to write to network # connection which in this case is wavefront proxy. _p1 = subprocess.Popen( shlex.split("echo -e {sendText}".format(sendText=_sendText)), stdout=subprocess.PIPE, ) _p2 = subprocess.Popen( shlex.split("nc -q -v {proxy} 2878".format(proxy=wavefrontProxyServiceName)), stdin=_p1.stdout, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) _process_output, _ = _p2.communicate() log.info( "[_send_to_wavefront,{logprefix}]: Output of nc command <EMPTY means success> " "is, {process_output}".format(logprefix=logprefix, process_output=_process_output) ) del metricName, time, value, logprefix, labelsAsString, _sendText del _p1, _p2, _process_output # check alerts specified in loginsight_alerts_details.py exists/visible to user # user here is user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME'] def _check_alerts_exists(): global confirmedAlertIDsToMonitor, alert_details, WF_METRIC_ALERTS confirmedAlertIDsToMonitor = [] _response = None _alertIDs = [] _alertEnabled = {} _status = False log.info("[_check_alerts_exists]: Get details of all alerts.\n") _response = _execute_licf_api_cmd(cmd=GET, api='api/v1/alerts', logprefix='_check_alerts_exists') log.info("[_check_alerts_exists]: Received details of all alerts\n\n") for alert in _response: _alertIDs.append(alert['id']) _alertEnabled[alert['id']] = alert['enabled'] log.info("[_check_alerts_exists]: Append id {id} to all alerts with its " "enabled status - {status}".format(id=alert['id'], status=alert['enabled'])) log.info("[_check_alerts_exists]: User alert details - {userAlertDetails}\n\n" "All alerts IDs - {allAlertIDS}\n\n".format( userAlertDetails=alert_details.userSpecifiedAlerts, allAlertIDS=_alertIDs ) ) for alert in alert_details.userSpecifiedAlerts: log.info("[_check_alerts_exists]: Alert from user specified is - " "{alert}".format(alert=alert)) if alert['id'] in _alertIDs and \ str(_alertEnabled[alert['id']]).lower() == "true": log.info("[_check_alerts_exists]: Alert ID '{id}' specified in " "loginsight_alerts_details.py exists/visible for user {user} and is " "enabled.".format( id=alert['id'], user=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME'] ) ) confirmedAlertIDsToMonitor.append(alert['id']) if alert['id'] not in WF_METRIC_ALERTS.keys(): WF_METRIC_ALERTS[alert['id']] = {} WF_METRIC_ALERTS[alert['id']]['lastAlertEpochTime'] = 0 # metricName assignment is not done in above, if condition, # to get value updated if ID already exists WF_METRIC_ALERTS[alert['id']]['metricName'] = alert['name'] if 'labels' in alert.keys(): if alert['labels']: WF_METRIC_ALERTS[alert['id']]['labels'] = alert['labels'] _status = True else: log.error("[_check_alerts_exists]: Alert ID '{id}' specified in " "loginsight_alerts_details.py either doesn't exist/visible for user " "{user} (OR) not enabled. Enabled key value for alert is " "'{enabled}'".format( id=alert['id'], user=user_details.loginsight['LOGINSIGHT_SERVER_USER_NAME'], enabled=_alertEnabled[alert['id']] ) ) # Remove alerts that are not in userSpecifiedAlerts for alertID in WF_METRIC_ALERTS.keys(): if alertID not in confirmedAlertIDsToMonitor: log.warning("[_check_alerts_exists]: Deleting alertID {alertID} from " "monitoring".format(alertID=alertID)) del WF_METRIC_ALERTS[alertID] log.info("[_check_alerts_exists]: Populating alert details that needs monitoring " "is completed.\n\n\n") try: return _status finally: del _status, _response, _alertIDs, _alertEnabled gc.collect() # populate alert data def _populate_alertID_data(alertID, logprefix, timestamp): global alertResponse, setLastAlertEpochTime, alert_details _status = False _ndigits = int(math.log10(timestamp))+1 try: timestamp = timestamp//int(10**(_ndigits-10)) if timestamp > setLastAlertEpochTime[alertID]: log.info("[_populate_alertID_data,{logprefix}]: epochTime for alert " "{alertID} is {utime} which is greater than the highest epoch time in " "this iteration of results so far i.e., {savedTime}. Saving this as " "current highest in the iteration.".format( logprefix=logprefix, alertID=alertID, utime=timestamp, savedTime=setLastAlertEpochTime[alertID] ) ) setLastAlertEpochTime[alertID] = timestamp if timestamp > WF_METRIC_ALERTS[alertID]['lastAlertEpochTime']: _status = True # Check if there are more than 1 alert at same timestamp. if timestamp in alertResponse[alertID].keys(): alertResponse[alertID][timestamp] += 1 else: alertResponse[alertID][timestamp] = 1 else: log.info("[_populate_alertID_data,{logprefix}]: alert at timestamp " "{timestamp} was sent to wavefront already".format( logprefix=logprefix, timestamp=timestamp ) ) return _status except ZeroDivisionError: log.info("[_populate_alertID_data,{logprefix}]: When removing milli seconds " "from timestamp to get epochtime in seconds (10 digits), " "'ZeroDivisionError' exception occured.") return _status finally: del _status, _ndigits, alertID, logprefix, timestamp gc.collect() # get alert history and populate def _populate_alert_history(alertID, logprefix): global alertResponse, setLastAlertEpochTime, licfResponse, \ licfResponseConversion, fullMessages _status = False alertResponse[alertID] = {} licfResponse[alertID] = None licfResponseConversion[alertID] = {} fullMessages[alertID] = None setLastAlertEpochTime[alertID] = WF_METRIC_ALERTS[alertID]['lastAlertEpochTime'] _api = "api/v1/alerts/{alertID}/history".format(alertID=alertID) log.info("[_populate_alert_history,{logprefix}]: Collect history of alert with " "ID, {alertID}".format(logprefix=logprefix, alertID=alertID)) licfResponse[alertID] = _execute_licf_api_cmd(cmd=GET, api=_api, logprefix=logprefix) if not licfResponse[alertID]: log.warning("[_populate_alert_history,{logprefix}]: There is no history for " "alert ID '{alertID}'\n\n\n".format(logprefix=logprefix, alertID=alertID) ) else: licfResponseConversion[alertID]['converted'] = None if isinstance(licfResponse[alertID], str): # Removing '\', double quotes before and after '[', ']' respectively licfResponse[alertID] = licfResponse[alertID].replace('\\','') licfResponse[alertID] = \ str(licfResponse[alertID])[licfResponse[alertID].index('['):] licfResponse[alertID] = \ str(licfResponse[alertID])[:licfResponse[alertID].rfind(']')] licfResponseConversion[alertID]['converted'] = \ json.loads(licfResponse[alertID]) else: licfResponseConversion[alertID]['converted'] = licfResponse[alertID] if isinstance(licfResponseConversion[alertID]['converted'][0]['messages'], str): fullMessages[alertID] = \ json.loads(licfResponseConversion[alertID]['converted'][0]['messages']) else: fullMessages[alertID] = \ licfResponseConversion[alertID]['converted'][0]['messages'] log.info("[_populate_alert_history,{logprefix}]: Type of fullMessages variable " "is, '{typeresp}'. fullMessages content is,\n '{message}'\n\n\n\n".format( logprefix=logprefix, typeresp=type(fullMessages), message=fullMessages[alertID] ) ) if isinstance(fullMessages[alertID], list): for message in fullMessages[alertID]: if _populate_alertID_data(alertID=alertID, logprefix=logprefix, timestamp=message['timestamp']): _status = True elif isinstance(fullMessages[alertID], dict): _status = _populate_alertID_data( alertID=alertID, logprefix=logprefix, timestamp=fullMessages[alertID]['timestamp'] ) else: log.error("[_populate_alert_history,{logprefix}]: messages key in LICF " "response is of type {typeresp} which is not 'list' or 'dict' to get " "timestamp of alert.".format( logprefix=logprefix, typeresp=type(fullMessages[alertID]) ) ) try: return _status finally: del fullMessages[alertID], licfResponseConversion[alertID] del licfResponse[alertID], _status, _api, alertID, logprefix gc.collect() # prepare alerts data to send to wavefront def _send_alerts_data_to_wavefront(): global confirmedAlertIDsToMonitor, threads threads['alerts'] = [] _alertID = None _thread = None for _alertID in confirmedAlertIDsToMonitor: newThread = ProcessClassToPushMetrics( data={"name": "alerts", "alertID": _alertID } ) newThread.start() log.info("[_send_alerts_data_to_wavefront]: Started new thread for alert " "{alertID}".format( alertID=_alertID ) ) threads['alerts'].append(newThread) del _alertID, _thread gc.collect() # populate final event data def _populate_eventname_data(name, threadName, text, timestamp, logprefix): global eventData, allEvents, licfResponseConversion, populateEventData, match populateEventData[logprefix] = {} _ndigits = int(math.log10(timestamp))+1 try: timestamp = timestamp//int(10**(_ndigits-10)) log.info("[_populate_eventname_data,{logprefix}]: Generating event data with,\n" "text '{text}'\nname '{name}'\npattern '{pattern}'".format( logprefix=logprefix, text=text, name=name, pattern=allEvents[name]['pattern'] ) ) populateEventData[logprefix]['epoch_time'] = timestamp populateEventData[logprefix]['labels'] = {} populateEventData[logprefix]['labels'].update( licfResponseConversion[logprefix]['labels']) match[logprefix] = re.match(allEvents[name]['pattern'], text) if match[logprefix]: log.info("[_populate_eventname_data,{logprefix}]: matched groups, " "'{mgrp}'".format( logprefix=logprefix, mgrp=match[logprefix].groupdict() ) ) populateEventData[logprefix]['labels'].update(match[logprefix].groupdict()) log.info("[_populate_eventname_data,{logprefix}]: label keys " "'{keys}'".format( logprefix=logprefix, keys=populateEventData[logprefix]['labels'].keys()) ) if allEvents[name]['metricValue'] in \ populateEventData[logprefix]['labels'].keys(): populateEventData[logprefix]['value'] = \ populateEventData[logprefix]['labels'][allEvents[name]['metricValue']] else: return False del populateEventData[logprefix]['labels'][allEvents[name]['metricValue']] populateEventData[logprefix]['labelsAsString'] = "" for key in populateEventData[logprefix]['labels']: keyPlusValue = '\\"{key}\\"=\\"{value}\\"'.format( key=key, value=populateEventData[logprefix]['labels'][key] ) populateEventData[logprefix]['labelsAsString'] += " " + keyPlusValue eventData[name][threadName].append(populateEventData[logprefix]) return True except ZeroDivisionError: log.info("[_populate_eventname_data,{logprefix}]: When removing milli seconds " "from timestamp to get epochtime in seconds (10 digits), " "'ZeroDivisionError' exception occured.".format(logprefix=logprefix)) return False finally: del populateEventData[logprefix], _ndigits, name, text, timestamp, \ match[logprefix], logprefix gc.collect() # populate event data def _populate_event_data(name, threadName, logprefix, greaterthan, lessthan): global allEvents, eventData, licfResponse, licfResponseConversion _status = False licfResponse[logprefix] = None licfResponseConversion[logprefix] = {} fullMessages[logprefix] = None _api = "api/v1/events/timestamp/%3E%3D{greaterthan}/timestamp/%3C%3D{lessthan}" \ "{filters}?limit=9999".format( filters=allEvents[name]['filters'], greaterthan=greaterthan, lessthan=lessthan, ) log.info("[_populate_event_data,{logprefix}]: Collect data of event with " "api '{api}'".format( logprefix=logprefix, api=_api ) ) licfResponse[logprefix] = _execute_licf_api_cmd(cmd=GET, api=_api, logprefix=logprefix) if not licfResponse[logprefix]: log.warning("[_populate_event_data,{logprefix}]: There is no event with " "filters '{filters}'\n\n\n".format( logprefix=logprefix, filters=allEvents[name]['filters'] ) ) else: licfResponseConversion[logprefix]['converted'] = None if isinstance(licfResponse[logprefix], str): # Removing '\', double quotes before and after '[', ']' respectively licfResponse[logprefix] = licfResponse[logprefix].replace('\\','') licfResponse[logprefix] = \ str(licfResponse[logprefix])[licfResponse[logprefix].index('['):] licfResponse[logprefix] = \ str(licfResponse[logprefix])[:licfResponse[logprefix].rfind(']')] licfResponseConversion[logprefix]['converted'] = \ json.loads(licfResponse[logprefix]) else: licfResponseConversion[logprefix]['converted'] = licfResponse[logprefix] if isinstance(licfResponseConversion[logprefix]['converted']['events'], str): fullMessages[logprefix] = \ json.loads(licfResponseConversion[logprefix]['converted']['events']) else: fullMessages[logprefix] = \ licfResponseConversion[logprefix]['converted']['events'] licfResponseConversion[logprefix]['labels'] = {} if 'customLabels'in allEvents[name]: licfResponseConversion[logprefix]['labels'].update( allEvents[name]['customLabels'] ) if isinstance(fullMessages[logprefix], list): for message in fullMessages[logprefix]: if isinstance(message['fields'], str): licfResponseConversion[logprefix]['fields'] = \ json.loads(message['fields']) else: licfResponseConversion[logprefix]['fields'] = \ message['fields'] for field in licfResponseConversion[logprefix]['fields']: if field['name'] in allEvents[name]['licfFieldsAsLabels']: licfResponseConversion[logprefix]['labels'][field['name']] = \ field['content'] if _populate_eventname_data(name=name, threadName=threadName, text=message['text'], timestamp=message['timestamp'], logprefix=logprefix): _status = True elif isinstance(fullMessages[name], dict): if isinstance(fullMessages[name]['fields'], str): licfResponseConversion[name]['fields'] = \ json.loads(fullMessages[name]['fields']) else: licfResponseConversion[name]['fields'] = \ fullMessages[name]['fields'] for field in licfResponseConversion[name]['fields']: if field['name'] in allEvents[name]['licfFieldsAsLabels']: licfResponseConversion[name]['labels'][field['name']] = \ field['content'] _status = _populate_eventname_data(name=name, threadName=threadName, text=fullMessages['text'], timestamp=fullMessages['timestamp'], logprefix=logprefix) else: log.error("[_populate_event_data,{logprefix}]: events key in LICF response" "is of type {typeresp} which is not 'list' or 'dict' to get events " "timestamp, text etc.".format( logprefix=logprefix, typeresp=type(fullMessages[logprefix]) ) ) try: return _status finally: del licfResponseConversion[logprefix], _status, _api del licfResponse[logprefix], fullMessages[logprefix], name, logprefix gc.collect() # send events data to wavefront def _send_events_data_to_wavefront(): global allEvents, threads threads['events'] = [] _thread = None _eventName = None _threadName = None for _eventName in allEvents.keys(): _start_time_of_event_data_to_be_populated = EVENT_POPULATE_TIME_GREATER _end_time_of_event_data_to_be_populated = EVENT_POPULATE_TIME_GREATER + 1000 eventData[_eventName] = {} while _end_time_of_event_data_to_be_populated <= EVENT_POPULATE_TIME_LESS: log.info("[_send_events_data_to_wavefront]: Start and End time being used " "are, StartTime '{starttime}' and EndTime '{endtime}' which are less " "than the time till which we need to send metrics, '{time}'".format( starttime=_start_time_of_event_data_to_be_populated, endtime=_end_time_of_event_data_to_be_populated, time=EVENT_POPULATE_TIME_LESS ) ) _threadName = _eventName + "-" + \ str(_start_time_of_event_data_to_be_populated) eventData[_eventName][_threadName] = [] newThread = ProcessClassToPushMetrics( data={ "name": "events", "eventName": _eventName, "threadName": _threadName, "startTime": _start_time_of_event_data_to_be_populated, "endTime": _end_time_of_event_data_to_be_populated } ) newThread.start() log.info("[_send_events_data_to_wavefront]: Started new thread for event " "{eventName}".format(eventName=_eventName)) threads['events'].append(newThread) _start_time_of_event_data_to_be_populated = \ _end_time_of_event_data_to_be_populated _end_time_of_event_data_to_be_populated += 1000 del _thread, _eventName, _threadName gc.collect() # populate user defined events from file def _populate_userdefined_events(): global allEvents, event_details _status = False log.info("[_populate_userdefined_events]: User defined event details - \n" "{userEventDetails}\n\n".format( userEventDetails=event_details.userSpecifiedEvents ) ) for event in event_details.userSpecifiedEvents: allEvents[event['name']] = {} allEvents[event['name']]['filters'] = event['filters'] allEvents[event['name']]['pattern'] = event['pattern'] allEvents[event['name']]['metricValue'] = event['metricValue'] if 'licfFieldsAsLabels' in event.keys(): allEvents[event['name']]['licfFieldsAsLabels'] = event['licfFieldsAsLabels'] if 'customLabels' in event.keys(): allEvents[event['name']]['customLabels'] = event['customLabels'] _status = True try: return _status finally: del _status, event # check if file contents has changed def _check_file_change(name): global checkFileChange fileContents = None try: log.info("[_check_file_change]: Load file '{file}' contents.".format( file=checkFileChange[name]['filePath']) ) with open(checkFileChange[name]['filePath'], 'r') as inputfile: fileContents = inputfile.read() if fileContents == checkFileChange[name]['fileContents']: log.info("[_check_file_change]: File '{file}' contents have not " "changed.".format(file=checkFileChange[name]['filePath'])) return False else: log.info("[_check_file_change]: File '{file}' contents have changed." "Hence, details will be reloaded.".format( file=checkFileChange[name]['filePath'] ) ) # Saving latest filecontents to check the change next time. checkFileChange[name]['fileContents'] = fileContents return True except Exception as exception: log.error("[_check_file_change]: Exception occured while checking if file, " "'{file}' has changed,\n Exception is, '{exception}'".format( file=name, exception=exception ) ) finally: del fileContents gc.collect() # Integrate LICF and Wavefront def _loginsight_wavefront_integration(): global epoch_time, user_details, alert_details, event_details, Semaphore, \ threads, EVENT_LAST_POPULATED_EPOCH_TIME, EVENT_POPULATE_TIME_GREATER, \ EVENT_POPULATE_TIME_LESS, eventData _active = True _sendEventMetrics = True _sendAlertMetrics = True while 1: try: log.info("[_loginsight_wavefront_integration]: " "Start of a cycle in While 1 indefinite loop.\n\n") if _check_file_change(name="users"): # Reload the module to get latest values. user_details = importlib.reload(user_details) _log_licf_server_details() _log_wf_server_details() _connect_to_loginsight_server() #Semaphore = multiprocessing.BoundedSemaphore( # user_details.NoOfThreadsExecutionAtOnce #) Semaphore = multiprocessing.BoundedSemaphore(9999) epoch_time = int(datetime.datetime.now().strftime("%s")) if _verify_metrics_in_wavefront(): log.info("[_loginsight_wavefront_integration]: As other statefulset " "pods are sending metrics, this instance will be in inactive mode") _send_integration_status_to_wavefront(active=0) time.sleep(5) _active = False continue else: log.info("[_loginsight_wavefront_integration]: This instance should " "send metrics, hence sending status as active") _send_integration_status_to_wavefront(active=1) if sendMetricsOf == "events" or sendMetricsOf == "all": if _check_file_change(name="events"): # Reload the module to get latest values. event_details = importlib.reload(event_details) _log_event_details() if not _populate_userdefined_events(): log.error("[_loginsight_wavefront_integration]: There are no " "user defined events that exists in " "loginsight_event_details.py for which metrics needs to " "be sent to wavefront.") _sendEventMetrics = False else: _sendEventMetrics = True if _sendEventMetrics: log.info("[_loginsight_wavefront_integration]: There are user " "defined events specified in loginsight_event_details.py " "for which metrics needs to be sent to wavefront.") if not EVENT_LAST_POPULATED_EPOCH_TIME: EVENT_LAST_POPULATED_EPOCH_TIME = (epoch_time - 90) * 1000 EVENT_POPULATE_TIME_GREATER = EVENT_LAST_POPULATED_EPOCH_TIME EVENT_POPULATE_TIME_LESS = (epoch_time - 60) * 1000 log.info("[_loginsight_wavefront_integration]: epoch time is " "'{etime}'.\nTimes between which events needs to be populated, " "'{gtime} - {ltime}'".format( etime=epoch_time, gtime=EVENT_POPULATE_TIME_GREATER, ltime=EVENT_POPULATE_TIME_LESS ) ) if EVENT_POPULATE_TIME_LESS > EVENT_POPULATE_TIME_GREATER: try: _send_events_data_to_wavefront() finally: EVENT_LAST_POPULATED_EPOCH_TIME = EVENT_POPULATE_TIME_LESS if sendMetricsOf == "alerts" or sendMetricsOf == "all": if _check_file_change(name="alerts"): # Reload the module to get latest values. alert_details = importlib.reload(alert_details) _log_alert_details() if not _check_alerts_exists(): log.error("[_loginsight_wavefront_integration]: There are no " "alert IDs that exists/visible and enabled for which " "metrics needs to be sent to wavefront.") _sendAlertMetrics = False else: _sendAlertMetrics = True if _sendAlertMetrics: log.info("[_loginsight_wavefront_integration]: There are user " "defined alert IDs that exists/visible and enabled for which " "metrics needs to be sent to wavefront.") _send_alerts_data_to_wavefront() except Exception as exception: log.error("[_loginsight_wavefront_integration]: Exception occured - " "{exception}".format(exception=exception)) finally: if _active and threads['alerts']: # Wait for all alert threads to complete log.info("[_loginsight_wavefront_integration]: " "Wait for all alert threads to complete.") for _thread in threads['alerts']: _thread.join() log.info("[_loginsight_wavefront_integration]: " "All threads sending alerts data have completed.\n\n") threads['alerts'] = [] if _active and threads['events']: # Wait for all event threads to complete log.info("[_loginsight_wavefront_integration]: " "Wait for all event threads to complete.") for _thread in threads['events']: _thread.join() log.info("[_loginsight_wavefront_integration]: " "All threads sending events data have completed.\n\n") threads['events'] = [] log.info("[_loginsight_wavefront_integration]: " "End of a cycle in While 1 indefinite loop.\n\n\n\n\n") eventData = {} gc.collect() if __name__ == "__main__": try: if os.path.isfile(checkFileChange['users']['filePath']): log.info("[Main]: User Details file '{file}' exists. " "Importing the module and log its content.".format( file=checkFileChange['users']['filePath'] ) ) if os.path.dirname(checkFileChange['users']['filePath']) not in \ sys.path: sys.path.append(os.path.dirname( checkFileChange['users']['filePath'])) log.info("[Main]: Added following path '{path}' to sys.path\n" "'{syspath}'".format( path=os.path.dirname(checkFileChange['users']['filePath']), syspath=sys.path) ) user_details = importlib.import_module( os.path.splitext( os.path.basename(checkFileChange['users']['filePath']))[0]) else: raise("User Details file doesn't exists, '{file}'".format( file=checkFileChange['users']['filePath']) ) if sendMetricsOf == "all" or sendMetricsOf == "alerts": if os.path.isfile(checkFileChange['alerts']['filePath']): log.info("[Main]: Alert Details file '{file}' exists. " "Importing the module and log its content".format( file=checkFileChange['alerts']['filePath'] ) ) if os.path.dirname(checkFileChange['alerts']['filePath']) not in \ sys.path: log.info("[Main]: Adding following path to sys, '{path}'".format( path=checkFileChange['alerts']['filePath']) ) sys.path.append(os.path.dirname( checkFileChange['alerts']['filePath'])) alert_details = importlib.import_module( os.path.splitext( os.path.basename(checkFileChange['alerts']['filePath']))[0]) else: raise("Alert Details file doesn't exists, '{file}'".format( file=checkFileChange['alerts']['filePath']) ) if sendMetricsOf == "all" or sendMetricsOf == "events": if os.path.isfile(checkFileChange['events']['filePath']): log.info("[Main]: Event Details file '{file}' exists. " "Importing the module and log its content".format( file=checkFileChange['events']['filePath'] ) ) if os.path.dirname(checkFileChange['events']['filePath']) not in \ sys.path: log.info("[Main]: Adding following path to sys, '{path}'".format( path=checkFileChange['events']['filePath']) ) sys.path.append(os.path.dirname( checkFileChange['events']['filePath'])) event_details = importlib.import_module( os.path.splitext( os.path.basename(checkFileChange['events']['filePath']))[0]) else: raise Exception("Alert Details file doesn't exists, '{file}'".format( file=checkFileChange['alerts']['filePath']) ) _loginsight_wavefront_integration() except Exception as exception: log.error( "[Main]: LogInsight Wavefront Integration failed with exception,\n " "'{exp}'".format(exp=exception) )