Doozer¶
Doozer is a library for building services that are driven by consumers. Doozer applications read from objects that implement the Consumer Interface and provide the message received to a callback for processing. The messsage can be processed before handing it off to the callback, and the callback’s results can be processed after they are returned to the application.
Installation¶
You can install Doozer using Pip:
$ python -m pip install doozer
You can also install it from source:
$ python setup.py install
Quickstart¶
from __future__ import annotations
from doozer import Abort, Application
class FileConsumer:
"""Read lines from a file."""
def __init__(self, filename):
self.filename = filename
self._file = None
def __iter__(self):
"""FileConsumer objects are iterators."""
return self
def __next__(self):
"""Return the next line of the file, if available."""
if not self._file:
self._file = open(self.filename)
try:
return next(self._file)
except StopIteration:
self._file.close()
raise Abort("Reached end of file", None)
async def read(self):
"""Return the next line in the file."""
return next(self)
async def callback(app, message):
"""Print the message retrieved from the file consumer."""
print(app.name, "received:", message)
return message
app = Application(
__name__,
callback=callback,
consumer=FileConsumer(__file__),
)
@app.startup
async def print_header(app):
"""Print a header for the file being processed."""
print("# Begin processing", app.consumer.filename)
@app.teardown
async def print_footer(app):
"""Print a footer for the file being processed."""
print("# Done processing", app.consumer.filename)
@app.message_preprocessor
async def remove_comments(app, line):
"""Abort processing of comments (lines that start with #)."""
if line.strip().startswith("#"):
raise Abort("Line is a comment", line)
return line
Running Applications¶
Doozer provides a doozer
command to run your applications from the command
line. To run the application defined in the quickstart above, cd
to the
directory containing the module and run:
$ doozer run file_printer
Doozer’s CLI can also be invoked by running the installed package as a script. To avoid confusion and prevent different installations of Doozer from interfering with one another, this is the recommended way to run Doozer applications:
$ python -m doozer run file_printer
If a module contains only one instance of a Doozer
Application
, python -m doozer run
will automatically
detect and run it. If more than one instance exists, the desired application’s
name must be specified:
$ python -m doozer run file_printer:app
This form always takes precedence over the former, and the doozer
command
won’t attempt to auto-detect an instance even if there is a problem with the
name specified. If the attribute specified by the name after :
is callable,
python -m doozer run
will call it and use the returned value as the
application. Any callable specified this way should require no arguments and
return an instance of Application
. Autodiscovery of
callables that return applications is not currently supported.
More detailed information about Doozer’s command line interface can be found in Command Line Interface.
Logging¶
Doozer applications provide a default logger. The logger returned by calling
logging.getLogger()
will be used. The name of the logger is the name
given to the application. Any configuration needed (e.g.,
logging.basicConfig()
, logging.config.dictConfig()
, etc.) should be
done before the application is started.
Debug Mode¶
Debugging with asyncio can be tricky. Doozer provides a debug mode enables asyncio’s debug mode as well as debugging information through Doozer’s logger.
Debug mode can be enabled through a configuration setting:
app.settings['DEBUG'] = True
or by providing a truthy value for debug
when calling
run_forever()
:
app.run_forever(debug=True)
Contents:
Consumer Interface¶
To work with Doozer, a consumer must conform to the Consumer Interface. To
conform to the interface, the object must expose a coroutine()
function named read
.
Below is a sample implementation.
from __future__ import annotations
from doozer import Abort, Application
class FileConsumer:
"""Read lines from a file."""
def __init__(self, filename):
self.filename = filename
self._file = None
def __iter__(self):
"""FileConsumer objects are iterators."""
return self
def __next__(self):
"""Return the next line of the file, if available."""
if not self._file:
self._file = open(self.filename)
try:
return next(self._file)
except StopIteration:
self._file.close()
raise Abort("Reached end of file", None)
async def read(self):
"""Return the next line in the file."""
return next(self)
async def callback(app, message):
"""Print the message retrieved from the file consumer."""
print(app.name, "received:", message)
return message
app = Application(
__name__,
callback=callback,
consumer=FileConsumer(__file__),
)
@app.startup
async def print_header(app):
"""Print a header for the file being processed."""
print("# Begin processing", app.consumer.filename)
@app.teardown
async def print_footer(app):
"""Print a footer for the file being processed."""
print("# Done processing", app.consumer.filename)
@app.message_preprocessor
async def remove_comments(app, line):
"""Abort processing of comments (lines that start with #)."""
if line.strip().startswith("#"):
raise Abort("Line is a comment", line)
return line
Callbacks¶
Doozer operates on messages through a series of asyncio.coroutine()
callback functions. Each callback type serves a unique purpose.
callback
¶
This is the only one of the callback settings that is required. Its purpose is to process the incoming message. If desired, it should return the result(s) of processing the message as an iterable.
async def callback(application, message):
return ['spam']
Application('name', callback=callback)
Note
There can only be one function registered as callback
.
error
¶
These callbacks are called when an exception is raised while processing a message.
app = Application('name')
@app.error
async def log_error(application, message, exception):
logger.error('spam')
Note
Exceptions raised while postprocessing a result will not be processed through these callbacks.
message_acknowledgement
¶
These callbacks are intended to acknowledge that a message has been received and should not be made available to other consumers. They run after a message and its result(s) have been fully processed.
app = Application('name')
@app.message_acknowledgement
async def acknowledge_message(application, original_message):
await original_message.acknowledge()
message_preprocessor
¶
These callbacks are called as each message is first received. Any modifications
they make to the message will be reflected in what is passed to callback
for processing.
app = Application('name')
@app.message_preprocessor
async def add_process_id(application, message):
message['pid'] = os.getpid()
return message
result_postprocessor
¶
These callbacks will operate on the result(s) of callback
. Each callback is
applied to each result.
app = Application('name')
@app.result_postprocessor
async def store_result(application, result):
with open('/tmp/result', 'w') as f:
f.write(result)
startup
¶
These callbacks will run as an application is starting.
app = Application('name')
@app.startup
async def connect_to_database(application):
await db.connect(application.settings['DB_HOST'])
teardown
¶
These callbacks will run as an application is shutting down.
app = Application('name')
@app.teardown
async def disconnect_from_database(application):
await db.close()
Command Line Interface¶
Doozer provides the following command line interface.
doozer¶
usage: doozer [-h] [--version] [-a APP] {run} ...
-
-h
,
--help
¶
show this help message and exit
-
--version
¶
show program’s version number and exit
-
-a
,
--app
¶
the path to the application to run
doozer run¶
Import and run an application.
usage: doozer run [-h] [--verbose | --quiet] [-r] [-w WORKERS] [-d]
application-path
-
application-path
¶
‘the path to the application to run’
-
-h
,
--help
¶
show this help message and exit
-
--verbose
,
-v
¶
verbose mode
-
--quiet
,
-q
¶
quiet mode
-
-r
,
--reloader
¶
‘reload the application on changes’
-
-w
<workers>
,
--workers
<workers>
¶ ‘the number of asynchronous tasks to run’
-
-d
,
--debug
¶
‘enable debug mode’
Further Details¶
When developing locally, applications often need to be restarted as changes are
made. To make this easier, Doozer provides a --reloader
option to the
run
command. With this option enabled, Doozer will watch an application’s
root directory and restart the application automatically when changes are
detected:
$ python -m doozer run file_printer --reloader
Note
The --reloader
option is not recommended for production use.
It’s also possible to enable Doozer’s Debug Mode through the --debug
option:
$ python -m doozer run file_printer --debug
Note
The --debug
option is not recommended for production use.
This will also enable the reloader.
Extending the Command Line¶
For information about how to extension Doozer’s command line interface, see Extending the Command Line.
Extensions¶
Extensions provide additional functionality to applications. Configuration management is shared between applications and extensions in a central location.
Using Extensions¶
from doozer import Application
from doozer_sqlite import SQLite
app = Application(__name__)
db = SQLite(app)
db.connection.execute('SELECT 1;')
Developing Extensions¶
Doozer provides an Extension
base class to make
extension development easier.
from doozer import Extension
class SQLite(Extension):
DEFAULT_SETTINGS = {'SQLITE_CONNECTION_STRING': ':memory:'}
def __init__(self, app=None):
self._connection = None
super().__init__(app)
@property
def connection(self):
if not self._connection:
conn_string = self.app.settings['SQLITE_CONNECTION_STRING']
self._connection = sqlite3.connect(conn_string)
return self._connection
The Extension
class provides two special attributes
that are meant to be overridden:
DEFAULT_SETTINGS
provides default values for an extension’s settings during theinit_app()
step. When a value is used by an extension and has a sensible default, it should be stored here (e.g., a database hostname).REQUIRED_SETTINGS
provides a list of keys that are checked for existence during theinit_app()
step. If one or more required settings are not set on the application instance assigned to the extension, aKeyError
is raised. Extensions should set this when a value is required but has no default (e.g., a database password).
Extending the Command Line¶
Doozer offers an extensible command line interface. To register your own
commands, use register_commands()
. Any function passed to it
will have its usage created directly from its signature. During the course of
initializing the application for use with the extension (i.e.,
init_app()
), Doozer will check for a method
on the extension’s instance named register_cli
and call it. If you place
any calls to register_commands()
inside it, the command line
interface will be extended automatically.
In order to access the new commands, the doozer
command line utility must
be given a reference to an Application
. This is done
through the --app
argument:
$ doozer --app APP_PATH
Note
For details about the syntax to use when passing a reference to an
Application
, see Running Applications.
A positional argument in the Python function will result in a required positional argument in the command:
def trash(heap):
pass
$ doozer --app APP_PATH NAMESPACE trash HEAP
A keyword argument in the Python function will result in a positional argument in the command with a default value to be used when the argument is omitted:
def trash(heap='marjory'):
pass
$ doozer --app APP_PATH NAMESPACE trash [HEAP]
A keyword-only argument in the Python function will result in an optional argument in the command:
def trash(*, heap='marjory'):
pass
$ doozer --app APP_PATH NAMESPACE trash [--heap HEAP]
By default, all optional arguments will have a flag that matches the function argument’s name. When no other optional arguments start with the same character, a single-character abbreviated flag can also be used.
$ doozer --app APP_PATH NAMESPACE trash [-g HEAP]
The trash
function can then be registered with the CLI:
register_commands('fraggle', [trash])
$ doozer --app APP_PATH fraggle trash --help
Additionally, if a command includes a quiet
or verbose
argument, it
will automatically receive the count of the number of times it was specified
(e.g., -v
will have the value 1
, -vv
will have the value 2
).
When both arguments are included, they will be added as a mutually exclusive
group.
Note
Due to how argparse
handles argument counts, quiet
and verbose
will be set to None
rather than 0
when the flag isn’t specified when the command is
invoked.
$ doozer --app APP_PATH fraggle trash -vvvv
$ doozer --app APP_PATH fraggle trash --quiet
Available Extensions¶
Several extensions are available for use:
contrib
Packages¶
While it is possible to build your own plugins, the Doozer contrib package contains those that we think will most enhance your application.
Retry¶
Retry is a plugin to add the ability for Doozer applications to automatically retry messages that fail to process.
Warning
Retry registers itself as an error callback on the
Application
instance. When doing so, it inserts itself
at the beginning of the list of error callbacks. It does this so that it can
prevent other callbacks from running.
If you have an error callback that you want to run even when retrying a message, you will need to manually inject it into the list of error callbacks after initializing Retry.
Configuration¶
Retry provides a couple of settings to control how many times a message will be
retried. RETRY_THESHOLD
and RETRY_TIMEOUT
work in tandem. If values are
specified for both, whichever limit is reached first will cause Doozer to stop
retrying the message. By default, Doozer will try forever (yes, this is
literally insane).
|
A number that, if provided, will be used in conjunction with the number of retry attempts already made to calculate the total delay for the current retry. Defaults to 1. |
|
A coroutine that encapsulates the functionality
needed to retry the message. |
|
The number of seconds to wait before scheduling a
retry. If |
|
An exception or tuple of exceptions that will cause
Doozer to retry the message. Defaults to
|
|
The maximum number of times that a Doozer
application will try to process a message before
marking it as a failure. if set to 0, the message
will not be retried. If set to None, the limit will
be controlled by |
|
The maximum number of seconds during which a message
can be retried. If set to None, the limit will be
controlled by |
Usage¶
Application definition:
from doozer import Application
from doozer.contrib.retry import Retry
async def print_message(app, message):
print(message)
app = Application('retryable-application', callback=my_callback)
app.settings['RETRY_CALLBACK'] = print_message
Retry(app)
Somwhere inside the application:
from doozer.contrib.retry import RetryableException
async def my_callback(app, message):
raise RetryableException
API¶
-
class
doozer.contrib.retry.
Retry
(app=None)[source]¶ A class that adds retries to an application.
-
init_app
(app)[source]¶ Initialize an
Application
instance.- Parameters
app (
Application
) – Application instance to be initialized.- Raises
TypeError – If the callback isn’t a coroutine.
ValueError – If the delay or backoff is negative.
- Return type
-
Sphinx¶
The Sphinx contrib plugin adds a directive that can be used to document extensions to the Doozer command line interface.
-
class
doozer.contrib.sphinx.
DoozerCLIDirective
(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]¶ A Sphinx directive that can be used to document a CLI extension.
This class wraps around autoprogram to generate Sphinx documentation for extensions that extend the Doozer CLI.
.. doozercli:: doozer_database:Database :start_command: db
Changed in version 1.2.0: The
prog
option will default to the proper way to invoke command line extensions.
For full details of the options support by the doozercli
directive, please
refer to the
sphinxcontrib-autoprogram documentation.
API¶
Here’s the public API for Doozer.
Application¶
-
class
doozer.base.
Application
(name, settings=None, *, consumer=None, callback=None)[source]¶ A service application.
Each message received from the consumer will be passed to the callback.
- Parameters
name (
str
) – The name of the application.settings (
Optional
[Any
]) – An object with attributed-based settings.consumer (
Optional
[Consumer
]) – Any object that is an iterator or an iterable and yields instances of any type that is supported bycallback
. While this isn’t required, it must be provided before the application can be run.callback (
Optional
[Callable
[…,Awaitable
]]) – A callable object that takes two arguments, an instance ofdoozer.base.Application
and the (possibly) preprocessed incoming message. While this isn’t required, it must be provided before the application can be run.
-
error
(callback)[source]¶ Register an error callback.
- Parameters
callback (
Callable
[…,Awaitable
]) – A callable object that takes three arguments: an instance ofdoozer.base.Application
, the incoming message, and the exception that was raised. It will be called any time there is an exception while reading a message from the queue.- Return type
- Returns
The callback.
- Raises
TypeError – If the callback isn’t a coroutine.
-
message_acknowledgement
(callback)[source]¶ Register a message acknowledgement callback.
- Parameters
callback (
Callable
[…,Awaitable
]) – A callable object that takes two arguments: an instance ofdoozer.base.Application
and the original incoming message as its only argument. It will be called once a message has been fully processed.- Return type
- Returns
The callback.
- Raises
TypeError – If the callback isn’t a coroutine.
-
message_preprocessor
(callback)[source]¶ Register a message preprocessing callback.
- Parameters
callback (
Callable
[…,Awaitable
]) – A callable object that takes two arguments: an instance ofdoozer.base.Application
and the incoming message. It will be called for each incoming message with its result being passed tocallback
.- Return type
- Returns
The callback.
- Raises
TypeError – If the callback isn’t a coroutine.
-
result_postprocessor
(callback)[source]¶ Register a result postprocessing callback.
- Parameters
callback (
Callable
[…,Awaitable
]) – A callable object that takes two arguments: an instance ofdoozer.base.Application
and a result of processing the incoming message. It will be called for each result returned fromcallback
.- Return type
- Returns
The callback.
- Raises
TypeError – If the callback isn’t a coroutine.
-
run_forever
(num_workers=1, loop=None, debug=False)[source]¶ Consume from the consumer until interrupted.
- Parameters
num_workers (
int
) – The number of asynchronous tasks to use to process messages received through the consumer. Defaults to 1.loop (
Optional
[AbstractEventLoop
]) – An event loop that, if provided, will be used for running the application. If none is provided, the default event loop will be used.debug (
bool
) – Whether or not to run with debug mode enabled. Defaults to True.
- Raises
TypeError – If the consumer is None or the callback isn’t a coroutine.
Changed in version 1.2: Unhandled exceptions resulting from processing a message while the consumer is still active will stop cause the application to shut down gracefully.
- Return type
Command Line Interface¶
Collection of Doozer CLI tasks.
-
doozer.cli.
register_commands
(namespace, functions, namespace_kwargs=None, func_kwargs=None)[source]¶ Register commands with the doozer CLI.
The signature of each function provided through
functions
will be mapped to its command’s interface. Any positional arguments in the function’s signature will become required positional arguments to the command. Keyword arguments in the signature will also become positional arguments, although they will use the default value from the signature when not specified on the command line. Keyword-only arguments in the signature will become optional arguments on the command line.- Parameters
namespace (
str
) – A name representing the group of commands. The namespace is required to access the commands being added.functions (
Sequence
[Callable
]) – A list of callables that are used to create subcommands. More details can be found in the documentation foradd_commands()
.
Note
This function is a wrapper around
add_commands()
. Please refer to its documentation for any arguments not explained here.- Return type
Configuration¶
Exceptions¶
Custom exceptions used by Doozer.
-
exception
doozer.exceptions.
Abort
(reason, message)[source]¶ An exception that signals to Doozer to stop processing a message.
When this exception is caught by Doozer it will immediately stop processing the message. None of the remaining callbacks will be called.
If the exception is caught while processing a result, that result will no longer be processed. Any other results generated by the same message will still be processed.
Extensions¶
-
class
doozer.extensions.
Extension
(app=None)[source]¶ A base class for Hension extensions.
- Parameters
app (Optional[doozer.base.Application]) – An application instance that has an attribute named settings that contains a mapping of settings to interact with a database.
-
property
DEFAULT_SETTINGS
¶ A
dict
of default settings for the extension.When a setting is not specified by the application instance and has a default specified, the default value will be used. Extensions should define this where appropriate. Defaults to
{}
.- Return type
-
property
REQUIRED_SETTINGS
¶ An
iterable
of required settings for the extension.When an extension has required settings that do not have default values, their keys may be specified here. Upon extension initialization, an exception will be raised if a value is not set for each key specified in this list. Extensions should define this where appropriate. Defaults to
()
.- Return type
-
property
app
¶ Return the registered app.
- Return type
-
init_app
(app)[source]¶ Initialize the application.
In addition to associating the extension’s default settings with the application, this method will also check for the extension’s required settings.
- Parameters
app (
Application
) – An application instance that will be initialized.- Return type
Changelog¶
Version 2.0.0¶
Release TBD
Drop support for Python 3.4, 3.5, and 3.6 (backwards incompatible)
Add type annotations
Version 1.2.0¶
Released 2017-12-03
Unhandled exceptions raised while processing a message will stop the application
Set the event loop when running with the reloader
Renamed to Doozer
Relicensed under MIT License
Version 1.1.0¶
Released 2016-11-11
Add
henson.cli.register_commands
to extend the command line interfaceMessages are logged using
logging.DEBUG
instead oflogging.INFO
Calls to
print
inhenson.cli.run
are updated toapp.logger.info
References to objects used by
henson.Application
are removed once they are no longer needed to allow the memory to be freed up before the next message is received.uvloop will be used for the event loop if it’s installed.
Automatically register extensions to a registry on the application
Add
hensoncli
Sphinx directive to document extensions to the command line interfacehenson.cli.run
and any command line extensions that request it supportquiet
andverbose
flags to set verbosity