mirror of
https://github.com/mtan93/cachet-url-monitor.git
synced 2026-05-29 22:07:45 +01:00
* #10 - Creating auxiliary client class to generate configuration class based on cachet's component list. * Updating the python version in codacy to reduce false positives * Moving some of the cachet operations to the client class to clean up the configuration class and making better constants * Refactoring status to have proper classes and adding more tests. Refactoring the requests tests to use requests-mock. * Removing unused imports from test_scheduler * Adding more tests and the ability to run the client from command line * Updating README and client arg parsing * Fixing broken unit tests
This commit is contained in:
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env python
|
||||
from typing import Dict
|
||||
|
||||
import click
|
||||
import requests
|
||||
from yaml import dump
|
||||
from cachet_url_monitor import latency_unit, status, exceptions
|
||||
|
||||
|
||||
def normalize_url(url: str) -> str:
|
||||
"""If passed url doesn't include schema return it with default one - http."""
|
||||
if not url.lower().startswith('http'):
|
||||
return f'http://{url}'
|
||||
return url
|
||||
|
||||
|
||||
def save_config(config_map, filename: str):
|
||||
with open(filename, 'w') as file:
|
||||
dump(config_map, file)
|
||||
|
||||
|
||||
class CachetClient(object):
|
||||
"""Utility class to interact with CahetHQ server."""
|
||||
url: str
|
||||
token: str
|
||||
headers: Dict[str, str]
|
||||
|
||||
def __init__(self, url: str, token: str):
|
||||
self.url = normalize_url(url)
|
||||
self.token = token
|
||||
self.headers = {'X-Cachet-Token': token}
|
||||
|
||||
def get_components(self):
|
||||
"""Retrieves all components registered in cachet-hq"""
|
||||
return requests.get(f"{self.url}/components", headers=self.headers).json()['data']
|
||||
|
||||
def get_metrics(self):
|
||||
"""Retrieves all metrics registered in cachet-hq"""
|
||||
return requests.get(f"{self.url}/metrics", headers=self.headers).json()['data']
|
||||
|
||||
def generate_config(self):
|
||||
components = self.get_components()
|
||||
generated_endpoints = [
|
||||
{
|
||||
'name': component['name'],
|
||||
'url': component['link'],
|
||||
'method': 'GET',
|
||||
'timeout': 1,
|
||||
'expectation': [
|
||||
{
|
||||
'type': 'HTTP_STATUS',
|
||||
'status_range': '200-300',
|
||||
'incident': 'MAJOR'
|
||||
}
|
||||
],
|
||||
'allowed_fails': 0,
|
||||
'frequency': 30,
|
||||
'component_id': component['id'],
|
||||
'action': [
|
||||
'CREATE_INCIDENT',
|
||||
'UPDATE_STATUS',
|
||||
],
|
||||
'public_incidents': True,
|
||||
} for component in components if component['enabled']
|
||||
]
|
||||
generated_config = {
|
||||
'cachet': {
|
||||
'api_url': self.url,
|
||||
'token': self.token,
|
||||
},
|
||||
'endpoints': generated_endpoints
|
||||
}
|
||||
return generated_config
|
||||
|
||||
def get_default_metric_value(self, metric_id):
|
||||
"""Returns default value for configured metric."""
|
||||
get_metric_request = requests.get(f"{self.url}/metrics/{metric_id}", headers=self.headers)
|
||||
|
||||
if get_metric_request.ok:
|
||||
return get_metric_request.json()['data']['default_value']
|
||||
else:
|
||||
raise exceptions.MetricNonexistentError(metric_id)
|
||||
|
||||
def get_component_status(self, component_id):
|
||||
"""Retrieves the current status of the given component. It will fail if the component does
|
||||
not exist or doesn't respond with the expected data.
|
||||
:return component status.
|
||||
"""
|
||||
get_status_request = requests.get(f'{self.url}/components/{component_id}', headers=self.headers)
|
||||
|
||||
if get_status_request.ok:
|
||||
# The component exists.
|
||||
return status.ComponentStatus(int(get_status_request.json()['data']['status']))
|
||||
else:
|
||||
raise exceptions.ComponentNonexistentError(component_id)
|
||||
|
||||
def push_status(self, component_id, component_status):
|
||||
"""Pushes the status of the component to the cachet server.
|
||||
"""
|
||||
params = {'id': component_id, 'status': component_status}
|
||||
return requests.put(f"{self.url}/components/{component_id}", params=params, headers=self.headers)
|
||||
|
||||
def push_metrics(self, metric_id, latency_time_unit, elapsed_time_in_seconds, timestamp):
|
||||
"""Pushes the total amount of seconds the request took to get a response from the URL.
|
||||
"""
|
||||
value = latency_unit.convert_to_unit(latency_time_unit, elapsed_time_in_seconds)
|
||||
params = {'id': metric_id, 'value': value, 'timestamp': timestamp}
|
||||
return requests.post(f"{self.url}/metrics/{metric_id}/points", params=params, headers=self.headers)
|
||||
|
||||
def push_incident(self, status_value: status.ComponentStatus, is_public_incident: bool, component_id: int,
|
||||
previous_incident_id=None, message=None):
|
||||
"""If the component status has changed, we create a new incident (if this is the first time it becomes unstable)
|
||||
or updates the existing incident once it becomes healthy again.
|
||||
"""
|
||||
if previous_incident_id and status_value == status.ComponentStatus.OPERATIONAL:
|
||||
# If the incident already exists, it means it was unhealthy but now it's healthy again.
|
||||
params = {'status': status.IncidentStatus.FIXED.value, 'visible': is_public_incident,
|
||||
'component_id': component_id, 'component_status': status_value.value, 'notify': True}
|
||||
|
||||
return requests.put(f'{self.url}/incidents/{previous_incident_id}', params=params, headers=self.headers)
|
||||
elif not previous_incident_id and status_value != status.ComponentStatus.OPERATIONAL:
|
||||
# This is the first time the incident is being created.
|
||||
params = {'name': 'URL unavailable', 'message': message,
|
||||
'status': status.IncidentStatus.INVESTIGATING.value,
|
||||
'visible': is_public_incident, 'component_id': component_id, 'component_status': status_value,
|
||||
'notify': True}
|
||||
return requests.post(f'{self.url}/incidents', params=params, headers=self.headers)
|
||||
|
||||
@click.group()
|
||||
def run_client():
|
||||
pass
|
||||
|
||||
@click.command()
|
||||
@click.argument('url')
|
||||
@click.argument('token')
|
||||
@click.argument('output')
|
||||
def run_client(url, token, output):
|
||||
client = CachetClient(url, token)
|
||||
config = client.generate_config()
|
||||
save_config(config, output)
|
||||
|
||||
if __name__ == '__main__':
|
||||
run_client()
|
||||
Reference in New Issue
Block a user