Consumer Expectations

Implementation

How do I develop a consumer?

  1. Optionally, develop a shell script that will initialize the environment for the consumer (i.e., that will install any requirements that cannot be listed in a Python3 requirements file).
  • The filename of the shell script is ./init.sh.
  • If the shell script file is not present, then the system assumes that it is empty.
  1. Optionally, develop a Python3 requirements file for the consumer.
  • The filename for the requirements file is ./requirements.txt.
  • If the requirements file is not present, then the system assumes that it is empty.
  1. Develop a JSONPath to be tested against the JSON object for a given CloudEvents notification (corresponding to a given Pacifica transaction).
  • The filename for the JSONPath is jsonpath2.txt.
  • The usage of the JSONPath is as follows:
    • If the test returns a non-empty result set, then the consumer will be notified.
    • If the test returns an empty result set, then the consumer will not be notified.
  1. Develop a top-level Python3 script (viz., the entry-point) that will be executed by the consumer within a virtual environment. The behavior of the entry-point is to act upon a given CloudEvents notification for a given Pacifica transaction (and its associated input data files and key-value pairs) and then to generate a new Pacifica transaction (and its associated output data files and key-value pairs).
  • The filename of the entry-point is ./__main__.py.
  • The usage of the entry-point is ./__main__.py SRC DST, where:
    • SRC is the path to the temporary directory for the incoming Pacifica transaction.
    • DST is the path to the temporary directory for the outgoing Pacifica transaction.
  • The input data files (downloaded from Pacifica) are located in the SRC/downloads/ subdirectory.
  • The output data files (uploaded to Pacifica) are located in the DST/uploads/ subdirectory.
  • The JSON object for the CloudEvents notification is SRC/notification.json.
  • The execution of the entry-point terminates with the following exit status codes:
    • 0 = Terminated successfully.
    • >0 = Terminated unsuccessfully.
  1. Compress the entry-point, the requirements file, the JSONPath file and any additional files that are necessary for execution of the entry-point into a zip archive called consumer.zip.

Example implementation of a consumer

In this example, we develop a consumer that copies the input data files with all cased characters converted to uppercase.

consumer.zip

The directory-tree listing for consumer.zip is as follows:

  • /
    • init.sh
    • jsonpath2.txt
    • requirements.txt
    • __main__.py
init.sh

The shell script does nothing.

#!/usr/bin/env sh

exit 0
jsonpath2.txt

The JSONPath returns the set of IDs for input data files whose MIME type is text/plain.

$[?(@["eventID"] and (@["eventType"] = "org.pacifica.metadata.ingest") and (@["source"] = "/pacifica/metadata/ingest"))]["data"][*][?((@["destinationTable"] = "Files") and (@["mimetype"] = "text/plain"))]["_id"]
requirements.txt

The requirements file specifies the JSONPath, Pacifica notification service consumer and Promise packages.

jsonpath2
pacifica-notifications-consumer
promises
__main__.py

The entry-point is as follows:

#!/usr/bin/env python3

import json
import os
import shutil
import sys

from jsonpath2 import Path
from pacifica_notifications_consumer import download, upload
from promise import Promise

# execute only if run as a top-level script
if __name__ == '__main__':
  # path to directory for input CloudEvents notification
  orig_event_path = sys.argv[1]

  # path to directory for output CloudEvents notification
  new_event_path = sys.argv[2]

  def upload_did_fulfill(d):
    """Callback for Pacifica uploader promise's eventual value.

    Args:
        d (dict): CloudEvents notification.

    Returns:
        Promise[bool]: True for success, False otherwise.

    Raises:
        BaseException: If an error occurs.
    """

    # delete entire directory tree for input CloudEvents notification
    shutil.rmtree(orig_event_path, ignore_errors=True)

    # delete entire directory tree for output CloudEvents notification
    shutil.rmtree(new_event_path, ignore_errors=True)

    # return True for success
    return Promise(lambda resolve, reject: resolve(True))

  def download_did_fulfill(d):
    """Callback for Pacifica downloader promise's eventual value.

    Args:
        d (dict): CloudEvents notification.

    Returns:
        Promise[bool]: True for success, False otherwise.

    Raises:
        BaseException: If an error occurs.
    """

    # iterate over plain-text files
    for file_d in [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Files" and @["mimetype"] = "text/plain")]').match(d) ]:
      # open input data file
      with open(os.path.join(orig_event_path, 'downloads', file_d['subdir'], file_d['name']), 'r') as orig_file:
        # open output data file
        with open(os.path.join(new_event_path, 'uploads', file_d['subdir'], file_d['name']), 'w') as new_file:
          # read input data file, convert cased characters to uppercase, and then write output data file
          new_file.write(orig_file.read().upper())

    # invoke Pacifica uploader with specified transaction attributes and key-value pairs
    return upload(new_event_path, {
      'Transactions.instrument': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.instrument")]["value"]').match(d) ][0],
      'Transactions.proposal': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.proposal")]["value"]').match(d) ][0],
      'Transactions.submitter': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.submitter")]["value"]').match(d) ][0],
    }, {
      'Transactions._id': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions._id")]["value"]').match(d) ][0],
    }).then(upload_did_fulfill)

  # invoke Pacifica downloader
  download(orig_event_path).then(download_did_fulfill)

How do I deploy a consumer

  1. Download and install the pacifica-notifications-consumer package.
  • When the pacifica-notifications-consumer is successfully installed, the start-pacifica-notifications-consumer command is available on the PATH.
  • When started, the behavior of the start-pacifica-notifications-consumer command is to: (i) extract the contents of consumer.zip to a temporary location, (ii) create a new virtual environment, (iii) install the contents of the requirements file within said virtual environment, (iv) start a new asynchronous job processing queue, (v) start a new web service end-point for a new consumer, and (vi) register said web service end-point for said consumer with the given producer.
  • When stopped, the behavior of the start-pacifica-notifications-consumer command is reverse the side-effects of the start-up behavior (i.e., to clean up after itself).
  1. Execute the start-pacifica-notifications-consumer command, specifying as command-line arguments:
  • The location of consumer.zip;
  • The URL and authentication credentials for the producer; and
  • The configuration for the asynchronous job processing queue, web service end-point, etc.

Frequently Asked Questions

Are downloaded files persisted after the entry-point for a consumer has terminated?

Yes. Downloaded files may be deleted by the entry-point (e.g., using the shutil.rmtree method; see __main__.py in the example).

Are locally-generated files persisted after the entry-point for a consumer has terminated?

Yes. Locally-generated files may be deleted by the entry-point (e.g., using the shutil.rmtree method; see __main__.py in the example).

Can I develop the entry-point for a consumer to wait for two-or-more CloudEvents notifications?

Yes. However, this behavior must be implemented by the entry-points themselves and is not provided by the default behavior of the start-pacifica-notifications-consumer command.

For example, to wait for two CloudEvents notifications:

  • Develop two consumers with two entry-points.
  • The entry-point for the first consumer receives and stores the first CloudEvents notification.
  • The entry-point for the second consumer receives the second CloudEvents notification, retrieves the first CloudEvents notification, and then does its work.

Can I develop the entry-point for a consumer using a non-Python3 programming language?

No. Only Python3 is supported as the programming language for the entry-point file (viz., __main__.py).

Non-Python3 executables can be called from within the entry-point using the subprocess module.

Other programming languages can be called from within the entry-point using the appropriate interface, e.g., the jpy package for the Java programming language, and the rpy2 package for the R programming language.

How do I authenticate with the CloudEvents notification provider?

Authentication credentials are specified in the configuration for the pacifica-notifications package (see https://pacifica-notifications.readthedocs.io/en/latest/configuration.html for more information).

Authentication credentials are included in all HTTP requests that are issued by the consumer, e.g., the username is specified via the Http-Remote-User header (see https://pacifica-notifications.readthedocs.io/en/latest/exampleusage.html for more information).

Glossary of Terms

  • Consumer: A software system that receives CloudEvents notifications (corresponding to Pacifica transactions) from producers. CloudEvents notifications are filtered by testing against a JSONPath. If the test for a given CloudEvents notification is successful, then the consumer routes said CloudEvents notification to a processor.
  • Processor: A software system that downloads the input data files and metadata for a given Pacifica transaction, processes said input data files and associated metadata, generates output data files and associated metadata, and then creates a new Pacifica transaction.
  • Producer: A software system that sends CloudEvents notifications (corresponding to Pacifica transactions) to consumers.