Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
History
=======

Unreleased
----------
* New ``zocalo.shutdown`` command to shutdown Zocalo services

0.9.1 (2021-08-18)
------------------
* Expand ~ in paths in configuration files

0.9.0 (2021-08-18)
------------------
* Removed --live/--test command line arguments, use -e/--environment instead
Expand Down
18 changes: 12 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Zocalo as a wider whole is made up of two repositories (plus some private intern

As mentioned, Zocalo is currently built on top of ActiveMQ. ActiveMQ is an apache project that provides a `message broker <https://en.wikipedia.org/wiki/Message_broker>`_ server, acting as a central dispatch that allows various services to communicate. Messages are plaintext, but from the Zocalo point of view it's passing aroung python objects (json dictionaries). Every message sent has a destination to help the message broker route. Messages may either be sent to a specific queue or broadcast to multiple queues. These queues are subscribed to by the services that run in Zocalo. In developing with Zocalo, you may have to interact with ActiveMQ or RabbitMQ, but it is unlikely that you will have to configure it.

Zocalo allows for the monitoring of jobs executing ``python-workflows`` services or recipe wrappers. The ``python-workflows`` package contains most of the infrastructure required for the jobs themselves and more detailed documentation of its components can be found in the ``python-workflows`` `GitHub repository <https://github.com/DiamondLightSource/python-workflows/>`_ and `the Zocalo documentation <https://zocalo.readthedocs.io>`_.
Zocalo allows for the monitoring of jobs executing ``python-workflows`` services or recipe wrappers. The ``python-workflows`` package contains most of the infrastructure required for the jobs themselves and more detailed documentation of its components can be found in the ``python-workflows`` `GitHub repository <https://github.com/DiamondLightSource/python-workflows/>`_ and `the Zocalo documentation <https://zocalo.readthedocs.io>`_.

.. _ActiveMQ: http://activemq.apache.org/
.. _STOMP: https://stomp.github.io/
Expand Down Expand Up @@ -94,29 +94,35 @@ The only public Zocalo service at present is ``Schlockmeister``, a garbage colle
Working with Zocalo
-------------------

`Graylog <https://www.graylog.org/>`_ is used to manage the logs produced by Zocalo. Once Graylog and the message broker server are running then services and wrappers can be launched with Zocalo.
`Graylog <https://www.graylog.org/>`_ is used to manage the logs produced by Zocalo. Once Graylog and the message broker server are running then services and wrappers can be launched with Zocalo.

Zocalo provides some command line tools. These tools are ``zocalo.go``, ``zocalo.wrap`` and ``zocalo.service``: the first triggers the processing of a recipe and the second runs a command while exposing its status to Zocalo so that it can be tracked. Services are available through ``zocalo.service`` if they are linked through the ``workflows.services`` entry point in ``setup.py``. For example, to start a Schlockmeister service:
Zocalo provides the following command line tools::
* ``zocalo.go``: trigger the processing of a recipe
* ``zocalo.wrap``: run a command while exposing its status to Zocalo so that it can be tracked
* ``zocalo.service``: start a new instance of a service
* ``zocalo.shutdown``: shutdown either specific instances of Zocalo services or all instances for a given type of service

Services are available through ``zocalo.service`` if they are linked through the ``workflows.services`` entry point in ``setup.py``. For example, to start a Schlockmeister service:

.. code:: bash

$ zocalo.service -s Schlockmeister

.. list-table::
.. list-table::
:widths: 100
:header-rows: 1

* - Q: How are services started?
* - A: Zocalo itself is agnostic on this point. Some of the services are self-propagating and employ simple scaling behaviour - in particular the per-image-analysis services. The services in general all run on cluster nodes, although this means that they can not be long lived - beyond a couple of hours there is a high risk of the service cluster jobs being terminated or pre-empted. This also helps encourage programming more robust services if they could be killed.

.. list-table::
.. list-table::
:widths: 100
:header-rows: 1

* - Q: So if a service is terminated in the middle of processing it will still get processed?
* - A: Yes, messages are handled in transactions - while a service is processing a message, it's marked as "in-progress" but isn't completely dropped. If the service doesn't process the message, or it's connection to ActiveMQ gets dropped, then it get's requeued so that another instance of the service can pick it up.

Repeat Message Failure
Repeat Message Failure
----------------------

How are repeat errors handled? This is a problem with the system - if e.g. an image or malformed message kills a service then it will get requeued, and will eventually kill all instances of the service running (which will get re-spawned, and then die, and so forth).
Expand Down
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PyYAML==5.4.1
graypy==2.1.0
marshmallow==3.13.0
pytest-cov==2.12.1
pytest-mock
pytest==6.2.5
setuptools==58.1.0
workflows==2.12
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ zip_safe = False
console_scripts =
zocalo.go = zocalo.cli.go:run
zocalo.service = zocalo.service:start_service
zocalo.shutdown = zocalo.cli.shutdown:run
zocalo.wrap = zocalo.cli.wrap:run
libtbx.dispatcher.script =
zocalo.go = zocalo.go
zocalo.service = zocalo.service
zocalo.shutdown = zocalo.shutdown
zocalo.wrap = zocalo.wrap
libtbx.precommit =
zocalo = zocalo
Expand Down
89 changes: 89 additions & 0 deletions src/zocalo/cli/shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# zocalo.shutdown
# Stop a zocalo service
#

import argparse
import socket
import sys

import workflows
import workflows.services
import workflows.transport

import zocalo.configuration


def run(args=None):
parser = argparse.ArgumentParser()

# Load configuration
zc = zocalo.configuration.from_file()
zc.activate()

default_transport = workflows.transport.default_transport
if (
zc.storage
and zc.storage.get("zocalo.default_transport")
in workflows.transport.get_known_transports()
):
default_transport = zc.storage["zocalo.default_transport"]

known_services = workflows.services.get_known_services()

parser.add_argument("-?", action="help", help=argparse.SUPPRESS)
parser.add_argument(
"HOSTS",
nargs="*",
type=str,
help="Specific service instances specified as hostname.pid",
)
parser.add_argument(
"-s",
"--service",
dest="services",
metavar="SVC",
action="append",
default=[],
help="Stop all instances of a service. Use 'none' for instances without "
"loaded service. Known services: " + ", ".join(known_services),
)
parser.add_argument(
"-t",
"--transport",
dest="transport",
metavar="TRN",
default=default_transport,
help="Transport mechanism. Known mechanisms: "
+ ", ".join(workflows.transport.get_known_transports())
+ f" (default: {default_transport})",
)
zc.add_command_line_options(parser)
workflows.transport.add_command_line_options(parser)
args = parser.parse_args(args)

if not args.services and not len(args.HOSTS):
print("Need to specify one or more services to shut down.")
print("Either specify service groups with -s or specify specific instances")
print("as: hostname.pid")
sys.exit(1)

transport = workflows.transport.lookup(args.transport)()
transport.connect()

for host in args.HOSTS:
if host.count(".") == 1:
# See also workflows.util.generate_unique_host_id()
host = ".".join(reversed(socket.gethostname().split(".")[1:])) + "." + host

message = {"command": "shutdown", "host": host}
transport.broadcast("command", message)
print("Shutting down", host)

for service in args.services:
if service.lower() == "none":
# Special case for placeholder instances
service = None
message = {"command": "shutdown", "service": service}
transport.broadcast("command", message)
print("Stopping all instances of", service)
45 changes: 45 additions & 0 deletions tests/cli/test_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import itertools
from unittest import mock

import workflows.transport
from workflows.transport.common_transport import CommonTransport
from workflows.util import generate_unique_host_id

from zocalo.cli.shutdown import run


def test_shutdown_host(mocker):
mocked_transport = mocker.MagicMock(CommonTransport)
mocked_lookup = mocker.patch.object(
workflows.transport, "lookup", return_value=mocked_transport
)
host_prefix = ".".join(generate_unique_host_id().split(".")[:-2])
hosts = ["uk.ac.diamond.ws123.4567", "ws987.6543"]
expected_hosts = ["uk.ac.diamond.ws123.4567", f"{host_prefix}.ws987.6543"]
run(hosts)
mocked_lookup.assert_called_with("StompTransport")
mocked_transport().broadcast.assert_has_calls(
[
mock.call("command", {"command": "shutdown", "host": host})
for host in expected_hosts
]
)


def test_shutdown_services(mocker):
mocked_transport = mocker.MagicMock(CommonTransport)
mocked_lookup = mocker.patch.object(
workflows.transport, "lookup", return_value=mocked_transport
)
services = ["Foo", "Bar"]
run(
list(itertools.chain.from_iterable([["-s", service] for service in services]))
+ ["-t", "PikaTransport"]
)
mocked_lookup.assert_called_with("PikaTransport")
mocked_transport().broadcast.assert_has_calls(
[
mock.call("command", {"command": "shutdown", "service": service})
for service in services
]
)