Source code for pacifica.notifications.tasks

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""The Celery tasks module."""
from datetime import datetime
from json import dumps
import requests
from requests.exceptions import RequestException
from celery import Celery
from .orm import EventMatch
from .jsonpath import parse, find
from .config import get_config

CELERY_APP = Celery(
    'notifications',
    broker=get_config().get('celery', 'broker_url'),
    backend=get_config().get('celery', 'backend_url')
)


@CELERY_APP.task
def dispatch_event(event_obj):
    """Get all the events and see which match."""
    EventMatch.database_connect()
    eventmatch_objs = EventMatch.select().where(
        (EventMatch.deleted >> None) &
        (EventMatch.disabled >> None)
    )
    EventMatch.database_close()
    for eventmatch in eventmatch_objs:
        jsonpath_expr = parse(eventmatch.jsonpath)
        if find(jsonpath_expr, event_obj):
            query_policy.delay(eventmatch.to_hash(), event_obj)


[docs]def disable_eventmatch(eventmatch_uuid, error): """Disable the eventmatch obj.""" EventMatch.database_connect() with EventMatch.atomic(): eventmatch = EventMatch.get(EventMatch.uuid == eventmatch_uuid) eventmatch.disabled = datetime.now() eventmatch.error = error eventmatch.save() EventMatch.database_close()
@CELERY_APP.task def query_policy(eventmatch, event_obj): """Query policy server to see if the event should be routed.""" resp = requests.post( '{}/events/{}'.format( get_config().get('notifications', 'policy_url'), eventmatch['user'] ), data=dumps(event_obj), headers={'Content-Type': 'application/json'} ) resp_major = int(int(resp.status_code)/100) if resp_major == 5: disable_eventmatch(eventmatch['uuid'], resp.text) if resp_major == 4: return if resp_major == 2: route_event.delay(eventmatch, event_obj)
[docs]def event_auth_to_requests(eventmatch, headers): """Convert the eventmatch authentication to requests arguments.""" requests_kwargs = {} if eventmatch.get('auth').get('type', None) == 'basic': auth_obj = eventmatch.get('auth').get('basic', {}) requests_kwargs['auth'] = (auth_obj.get('username', ''), auth_obj.get('password', '')) elif eventmatch.get('auth').get('type', None) == 'header': auth_obj = eventmatch.get('auth').get('header', {}) headers['Authorization'] = '{} {}'.format( auth_obj.get('type', ''), auth_obj.get('credentials', '') ) return requests_kwargs
@CELERY_APP.task def route_event(eventmatch, event_obj): """Route the event to the target url.""" try: new_extensions = event_obj.get('extensions', {}) new_extensions.update(eventmatch.get('extensions', {})) event_obj['extensions'] = new_extensions headers = {'Content-Type': 'application/json'} extra_args = event_auth_to_requests(eventmatch, headers) resp = requests.post( eventmatch['target_url'], data=dumps(event_obj), headers=headers, **extra_args ) except RequestException as ex: disable_eventmatch(eventmatch['uuid'], str(ex)) return resp_major = int(int(resp.status_code)/100) if resp_major == 5 or resp_major == 4: disable_eventmatch(eventmatch['uuid'], resp.text)