#!/usr/bin/python
# -*- coding: utf-8 -*-
"""The Celery tasks module."""
from datetime import datetime
from json import dumps, loads
import requests
from requests.exceptions import RequestException
from celery import Celery
from .orm import EventMatch, EventLog, EventLogMatch
from .jsonpath import parse, find
from .config import get_config
CELERY_APP = Celery(
'notifications',
broker=get_config().get('celery', 'broker_url'),
backend=get_config().get('celery', 'backend_url')
)
@CELERY_APP.task
def dispatch_event(event_obj):
"""Get all the events and see which match."""
EventLog.database_connect()
orm_event = EventLog.create(
jsondata=dumps(event_obj)
)
orm_event.save()
EventLog.database_close()
dispatch_orm_event(orm_event)
[docs]def dispatch_orm_event(orm_event):
"""Dispatch the event from an existing orm obj."""
results = []
event_obj = loads(orm_event.jsondata)
EventMatch.database_connect()
eventmatch_objs = EventMatch.select().where(
(EventMatch.deleted >> None) &
(EventMatch.disabled >> None)
)
EventMatch.database_close()
for eventmatch in eventmatch_objs:
jsonpath_expr = parse(eventmatch.jsonpath)
if find(jsonpath_expr, event_obj):
results.append(query_policy.delay(eventmatch.to_hash(), event_obj, orm_event.uuid))
return results
[docs]def disable_eventmatch(eventmatch_uuid, error):
"""Disable the eventmatch obj."""
EventMatch.database_connect()
with EventMatch.atomic():
eventmatch = EventMatch.get(EventMatch.uuid == eventmatch_uuid)
eventmatch.disabled = datetime.now()
eventmatch.error = error
eventmatch.save()
EventMatch.database_close()
[docs]def create_log_match(eventmatch, event_log_uuid, policy_resp):
"""Create the EventLogMatch object."""
EventLogMatch.database_connect()
orm_elm = EventLogMatch.create(
event_log=event_log_uuid,
event_match=eventmatch['uuid'],
policy_status_code=policy_resp.status_code,
policy_resp_body=policy_resp.text
)
orm_elm.save()
EventLogMatch.database_close()
return orm_elm.uuid
[docs]def update_log_match(elm_uuid, target_resp):
"""Update the EventLogMatch object with the target resp."""
EventLogMatch.database_connect()
orm_elm = EventLogMatch.get_by_id(elm_uuid)
orm_elm.target_status_code = target_resp.status_code
orm_elm.target_resp_body = target_resp.text
orm_elm.save()
EventLogMatch.database_close()
@CELERY_APP.task
def query_policy(eventmatch, event_obj, event_log_uuid):
"""Query policy server to see if the event should be routed."""
resp = requests.post(
'{}/events/{}'.format(
get_config().get('notifications', 'policy_url'),
eventmatch['user']
),
data=dumps(event_obj),
headers={'Content-Type': 'application/json'}
)
resp_major = int(int(resp.status_code)/100)
if resp_major == 5:
create_log_match(eventmatch, event_log_uuid, resp)
disable_eventmatch(eventmatch['uuid'], resp.text)
if resp_major == 4:
return
if resp_major == 2:
elm_uuid = create_log_match(eventmatch, event_log_uuid, resp)
route_event.delay(eventmatch, event_obj, elm_uuid)
[docs]def event_auth_to_requests(eventmatch, headers):
"""Convert the eventmatch authentication to requests arguments."""
requests_kwargs = {}
if eventmatch.get('auth').get('type', None) == 'basic':
auth_obj = eventmatch.get('auth').get('basic', {})
requests_kwargs['auth'] = (auth_obj.get('username', ''), auth_obj.get('password', ''))
elif eventmatch.get('auth').get('type', None) == 'header':
auth_obj = eventmatch.get('auth').get('header', {})
headers['Authorization'] = '{} {}'.format(
auth_obj.get('type', ''),
auth_obj.get('credentials', '')
)
return requests_kwargs
@CELERY_APP.task
def route_event(eventmatch, event_obj, elm_uuid):
"""Route the event to the target url."""
try:
new_extensions = event_obj.get('extensions', {})
new_extensions.update(eventmatch.get('extensions', {}))
event_obj['extensions'] = new_extensions
headers = {'Content-Type': 'application/json'}
extra_args = event_auth_to_requests(eventmatch, headers)
resp = requests.post(
eventmatch['target_url'],
data=dumps(event_obj),
headers=headers,
**extra_args
)
except RequestException as ex:
disable_eventmatch(eventmatch['uuid'], str(ex))
return
resp_major = int(int(resp.status_code)/100)
if resp_major in (5, 4):
disable_eventmatch(eventmatch['uuid'], resp.text)
update_log_match(elm_uuid, resp)