Consumer Expectations¶
Implementation¶
How do I develop a consumer?¶
- Optionally, develop a shell script that will initialize the environment for the consumer (i.e., that will install any requirements that cannot be listed in a Python3 requirements file).
- The filename of the shell script is
./init.sh
. - If the shell script file is not present, then the system assumes that it is empty.
- Optionally, develop a Python3 requirements file for the consumer.
- The filename for the requirements file is
./requirements.txt
. - If the requirements file is not present, then the system assumes that it is empty.
- Develop a JSONPath to be tested against the JSON object for a given CloudEvents notification (corresponding to a given Pacifica transaction).
- The filename for the JSONPath is
jsonpath2.txt
. - The usage of the JSONPath is as follows:
- If the test returns a non-empty result set, then the consumer will be notified.
- If the test returns an empty result set, then the consumer will not be notified.
- Develop a top-level Python3 script (viz., the entry-point) that will be executed by the consumer within a virtual environment. The behavior of the entry-point is to act upon a given CloudEvents notification for a given Pacifica transaction (and its associated input data files and key-value pairs) and then to generate a new Pacifica transaction (and its associated output data files and key-value pairs).
- The filename of the entry-point is
./__main__.py
. - The usage of the entry-point is
./__main__.py SRC DST
, where:SRC
is the path to the temporary directory for the incoming Pacifica transaction.DST
is the path to the temporary directory for the outgoing Pacifica transaction.
- The input data files (downloaded from Pacifica) are located in the
SRC/downloads/
subdirectory. - The output data files (uploaded to Pacifica) are located in the
DST/uploads/
subdirectory. - The JSON object for the CloudEvents notification is
SRC/notification.json
. - The execution of the entry-point terminates with the following exit status codes:
0
= Terminated successfully.>0
= Terminated unsuccessfully.
- Compress the entry-point, the requirements file, the JSONPath file and any additional files that are necessary for execution of the entry-point into a zip archive called
consumer.zip
.
Example implementation of a consumer¶
In this example, we develop a consumer that copies the input data files with all cased characters converted to uppercase.
consumer.zip
¶
The directory-tree listing for consumer.zip
is as follows:
/
init.sh
jsonpath2.txt
requirements.txt
__main__.py
jsonpath2.txt
¶
The JSONPath returns the set of IDs for input data files whose MIME type is text/plain
.
$[?(@["eventID"] and (@["eventType"] = "org.pacifica.metadata.ingest") and (@["source"] = "/pacifica/metadata/ingest"))]["data"][*][?((@["destinationTable"] = "Files") and (@["mimetype"] = "text/plain"))]["_id"]
requirements.txt
¶
The requirements file specifies the JSONPath, Pacifica notification service consumer and Promise packages.
jsonpath2
pacifica-notifications-consumer
promises
__main__.py
¶
The entry-point is as follows:
#!/usr/bin/env python3
import json
import os
import shutil
import sys
from jsonpath2 import Path
from pacifica_notifications_consumer import download, upload
from promise import Promise
# execute only if run as a top-level script
if __name__ == '__main__':
# path to directory for input CloudEvents notification
orig_event_path = sys.argv[1]
# path to directory for output CloudEvents notification
new_event_path = sys.argv[2]
def upload_did_fulfill(d):
"""Callback for Pacifica uploader promise's eventual value.
Args:
d (dict): CloudEvents notification.
Returns:
Promise[bool]: True for success, False otherwise.
Raises:
BaseException: If an error occurs.
"""
# delete entire directory tree for input CloudEvents notification
shutil.rmtree(orig_event_path, ignore_errors=True)
# delete entire directory tree for output CloudEvents notification
shutil.rmtree(new_event_path, ignore_errors=True)
# return True for success
return Promise(lambda resolve, reject: resolve(True))
def download_did_fulfill(d):
"""Callback for Pacifica downloader promise's eventual value.
Args:
d (dict): CloudEvents notification.
Returns:
Promise[bool]: True for success, False otherwise.
Raises:
BaseException: If an error occurs.
"""
# iterate over plain-text files
for file_d in [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Files" and @["mimetype"] = "text/plain")]').match(d) ]:
# open input data file
with open(os.path.join(orig_event_path, 'downloads', file_d['subdir'], file_d['name']), 'r') as orig_file:
# open output data file
with open(os.path.join(new_event_path, 'uploads', file_d['subdir'], file_d['name']), 'w') as new_file:
# read input data file, convert cased characters to uppercase, and then write output data file
new_file.write(orig_file.read().upper())
# invoke Pacifica uploader with specified transaction attributes and key-value pairs
return upload(new_event_path, {
'Transactions.instrument': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.instrument")]["value"]').match(d) ][0],
'Transactions.proposal': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.proposal")]["value"]').match(d) ][0],
'Transactions.submitter': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.submitter")]["value"]').match(d) ][0],
}, {
'Transactions._id': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions._id")]["value"]').match(d) ][0],
}).then(upload_did_fulfill)
# invoke Pacifica downloader
download(orig_event_path).then(download_did_fulfill)
How do I deploy a consumer¶
- Download and install the
pacifica-notifications-consumer
package.
- When the
pacifica-notifications-consumer
is successfully installed, thestart-pacifica-notifications-consumer
command is available on thePATH
. - When started, the behavior of the
start-pacifica-notifications-consumer
command is to: (i) extract the contents ofconsumer.zip
to a temporary location, (ii) create a new virtual environment, (iii) install the contents of the requirements file within said virtual environment, (iv) start a new asynchronous job processing queue, (v) start a new web service end-point for a new consumer, and (vi) register said web service end-point for said consumer with the given producer. - When stopped, the behavior of the
start-pacifica-notifications-consumer
command is reverse the side-effects of the start-up behavior (i.e., to clean up after itself).
- Execute the
start-pacifica-notifications-consumer
command, specifying as command-line arguments:
- The location of
consumer.zip
; - The URL and authentication credentials for the producer; and
- The configuration for the asynchronous job processing queue, web service end-point, etc.
Frequently Asked Questions¶
Are downloaded files persisted after the entry-point for a consumer has terminated?¶
Yes. Downloaded files may be deleted by the entry-point (e.g., using the shutil.rmtree
method; see __main__.py
in the example).
Are locally-generated files persisted after the entry-point for a consumer has terminated?¶
Yes. Locally-generated files may be deleted by the entry-point (e.g., using the shutil.rmtree
method; see __main__.py
in the example).
Can I develop the entry-point for a consumer to wait for two-or-more CloudEvents notifications?¶
Yes. However, this behavior must be implemented by the entry-points themselves and is not provided by the default behavior of the start-pacifica-notifications-consumer
command.
For example, to wait for two CloudEvents notifications:
- Develop two consumers with two entry-points.
- The entry-point for the first consumer receives and stores the first CloudEvents notification.
- The entry-point for the second consumer receives the second CloudEvents notification, retrieves the first CloudEvents notification, and then does its work.
Can I develop the entry-point for a consumer using a non-Python3 programming language?¶
No. Only Python3 is supported as the programming language for the entry-point file (viz., __main__.py
).
Non-Python3 executables can be called from within the entry-point using the subprocess
module.
Other programming languages can be called from within the entry-point using the appropriate interface, e.g., the jpy
package for the Java programming language, and the rpy2
package for the R programming language.
How do I authenticate with the CloudEvents notification provider?¶
Authentication credentials are specified in the configuration for the pacifica-notifications
package (see https://pacifica-notifications.readthedocs.io/en/latest/configuration.html for more information).
Authentication credentials are included in all HTTP requests that are issued by the consumer, e.g., the username is specified via the Http-Remote-User
header (see https://pacifica-notifications.readthedocs.io/en/latest/exampleusage.html for more information).
Glossary of Terms¶
- Consumer: A software system that receives CloudEvents notifications (corresponding to Pacifica transactions) from producers. CloudEvents notifications are filtered by testing against a JSONPath. If the test for a given CloudEvents notification is successful, then the consumer routes said CloudEvents notification to a processor.
- Processor: A software system that downloads the input data files and metadata for a given Pacifica transaction, processes said input data files and associated metadata, generates output data files and associated metadata, and then creates a new Pacifica transaction.
- Producer: A software system that sends CloudEvents notifications (corresponding to Pacifica transactions) to consumers.