Files
mtakaki df2d094dc6 Fixing push status that has been broken since moving to a client (#80)
* Fixing push status that has been broken since moving to a client

* Adding unit test to cover the bug
2020-01-28 01:42:49 -08:00

145 lines
5.8 KiB
Python

#!/usr/bin/env python
from typing import Dict
from typing import Optional
import click
import requests
from yaml import dump
from cachet_url_monitor import latency_unit, status, exceptions
def normalize_url(url: str) -> str:
"""If passed url doesn't include schema return it with default one - http."""
if not url.lower().startswith('http'):
return f'http://{url}'
return url
def save_config(config_map, filename: str):
with open(filename, 'w') as file:
dump(config_map, file)
class CachetClient(object):
"""Utility class to interact with CahetHQ server."""
url: str
token: str
headers: Dict[str, str]
def __init__(self, url: str, token: str):
self.url = normalize_url(url)
self.token = token
self.headers = {'X-Cachet-Token': token}
def get_components(self):
"""Retrieves all components registered in cachet-hq"""
return requests.get(f"{self.url}/components", headers=self.headers).json()['data']
def get_metrics(self):
"""Retrieves all metrics registered in cachet-hq"""
return requests.get(f"{self.url}/metrics", headers=self.headers).json()['data']
def generate_config(self):
components = self.get_components()
generated_endpoints = [
{
'name': component['name'],
'url': component['link'],
'method': 'GET',
'timeout': 1,
'expectation': [
{
'type': 'HTTP_STATUS',
'status_range': '200-300',
'incident': 'MAJOR'
}
],
'allowed_fails': 0,
'frequency': 30,
'component_id': component['id'],
'action': [
'CREATE_INCIDENT',
'UPDATE_STATUS',
],
'public_incidents': True,
} for component in components if component['enabled']
]
generated_config = {
'cachet': {
'api_url': self.url,
'token': self.token,
},
'endpoints': generated_endpoints
}
return generated_config
def get_default_metric_value(self, metric_id):
"""Returns default value for configured metric."""
get_metric_request = requests.get(f"{self.url}/metrics/{metric_id}", headers=self.headers)
if get_metric_request.ok:
return get_metric_request.json()['data']['default_value']
else:
raise exceptions.MetricNonexistentError(metric_id)
def get_component_status(self, component_id: int) -> Optional[status.ComponentStatus]:
"""Retrieves the current status of the given component. It will fail if the component does
not exist or doesn't respond with the expected data.
:return component status.
"""
get_status_request = requests.get(f'{self.url}/components/{component_id}', headers=self.headers)
if get_status_request.ok:
# The component exists.
return status.ComponentStatus(int(get_status_request.json()['data']['status']))
else:
raise exceptions.ComponentNonexistentError(component_id)
def push_status(self, component_id: int, component_status: status.ComponentStatus):
"""Pushes the status of the component to the cachet server.
"""
params = {'id': component_id, 'status': component_status.value}
return requests.put(f"{self.url}/components/{component_id}", params=params, headers=self.headers)
def push_metrics(self, metric_id: int, latency_time_unit: str, elapsed_time_in_seconds: int, timestamp: int):
"""Pushes the total amount of seconds the request took to get a response from the URL.
"""
value = latency_unit.convert_to_unit(latency_time_unit, elapsed_time_in_seconds)
params = {'id': metric_id, 'value': value, 'timestamp': timestamp}
return requests.post(f"{self.url}/metrics/{metric_id}/points", params=params, headers=self.headers)
def push_incident(self, status_value: status.ComponentStatus, is_public_incident: bool, component_id: int,
previous_incident_id=None, message=None):
"""If the component status has changed, we create a new incident (if this is the first time it becomes unstable)
or updates the existing incident once it becomes healthy again.
"""
if previous_incident_id and status_value == status.ComponentStatus.OPERATIONAL:
# If the incident already exists, it means it was unhealthy but now it's healthy again.
params = {'status': status.IncidentStatus.FIXED.value, 'visible': is_public_incident,
'component_id': component_id, 'component_status': status_value.value, 'notify': True}
return requests.put(f'{self.url}/incidents/{previous_incident_id}', params=params, headers=self.headers)
elif not previous_incident_id and status_value != status.ComponentStatus.OPERATIONAL:
# This is the first time the incident is being created.
params = {'name': 'URL unavailable', 'message': message,
'status': status.IncidentStatus.INVESTIGATING.value,
'visible': is_public_incident, 'component_id': component_id, 'component_status': status_value.value,
'notify': True}
return requests.post(f'{self.url}/incidents', params=params, headers=self.headers)
@click.group()
def run_client():
pass
@click.command()
@click.argument('url')
@click.argument('token')
@click.argument('output')
def run_client(url, token, output):
client = CachetClient(url, token)
config = client.generate_config()
save_config(config, output)
if __name__ == '__main__':
run_client()