Skip to content

Commit ed1ca62

Browse files
hrefgaudenz
authored andcommitted
Add bucket notifications test
1 parent 60fbd00 commit ed1ca62

6 files changed

Lines changed: 204 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ These tests are run regularly against our public infrastructure as well as our i
4646
| **Nested Virtualization** | [test_virtualization_support](./test_nested_virtualization.py#L14) | default |
4747
| | [test_run_nested_vm](./test_nested_virtualization.py#L40) | default |
4848
| **Objects** | [test_bucket_urls](./test_objects.py#L23) | default |
49+
| | [test_notifications](./test_objects.py#L69) | default |
4950
| **Private Network** | [test_private_ip_address_on_all_images](./test_private_network.py#L14) | all |
5051
| | [test_private_network_connectivity_on_all_images](./test_private_network.py#L35) | all |
5152
| | [test_multiple_private_network_interfaces](./test_private_network.py#L88) | default |

api.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,4 +279,17 @@ def delete_objects_users(api, url):
279279
bucket.objects.all().delete()
280280
bucket.delete()
281281

282+
sns = boto3.client(
283+
'sns',
284+
endpoint_url=objects_endpoint,
285+
aws_access_key_id=user.keys[0]['access_key'],
286+
aws_secret_access_key=user.keys[0]['secret_key'],
287+
region_name='default',
288+
)
289+
290+
for topic in sns.list_topics().get('Topics', ()):
291+
arn = topic["TopicArn"]
292+
assert re.match(r'arn:aws:sns:(rma|lpg)::at-.+', arn)
293+
sns.delete_topic(TopicArn=arn)
294+
282295
api.request("DELETE", url)

resources.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,11 @@ def put_file_content(self, remote_filename, content, sudo=False):
980980

981981
self.put_file(f.name, remote_filename, sudo)
982982

983+
def get_file_handle(self, remote_filename):
984+
985+
sftp = self.host.backend.client.open_sftp()
986+
return sftp.open(remote_filename)
987+
983988
def enable_dhcp_in_networkd(self, interface):
984989
""" Additional private network interfaces have to be explicitly
985990
configured to use DHCP, to get an IP address.

scripts/sns-endpoint-test-server

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#!/usr/bin/env -S python3 -u
2+
# Note: It's important to use unbuffered output, otherwise log
3+
# lines won't appear in the journal immediately.
4+
""" HTTP test server for object storage notifications tests
5+
6+
This server acts as the endpoint for the Simple Notification Service (SNS).
7+
8+
This is based on the HTTP server included in the http.server standard library
9+
module. It has the following features:
10+
* Only responds to POST requests. All other requests are not handled. No access
11+
to the filesystem is possible.
12+
* The body of the request is logged to a separate log file.
13+
"""
14+
15+
import http.server
16+
import ssl
17+
18+
from argparse import ArgumentParser
19+
from http import HTTPStatus
20+
from pathlib import Path
21+
22+
23+
class SNSEndpointRequestHandler(http.server.BaseHTTPRequestHandler):
24+
25+
server_version = f'SNSEndpointTestHTTP/{http.server.__version__}'
26+
protocol_version = 'HTTP/1.1'
27+
28+
def do_POST(self):
29+
30+
# Log some details about the notification record
31+
length = int(self.headers.get('content-length', 0))
32+
if length:
33+
body = self.rfile.read(length)
34+
35+
with body_log_file.open('ab') as body_log:
36+
body_log.write(body + b'\n')
37+
38+
self.send_response(HTTPStatus.NO_CONTENT)
39+
self.end_headers()
40+
41+
42+
parser = ArgumentParser()
43+
parser.add_argument('--ssl', action='store_true')
44+
parser.add_argument('--port', type=int, default=8000)
45+
parser.add_argument('--body-log', default='notification-body.log')
46+
47+
args = parser.parse_args()
48+
49+
with http.server.ThreadingHTTPServer(
50+
('', args.port),
51+
SNSEndpointRequestHandler
52+
) as server:
53+
print(f'SNS Endpoint test HTTP server running on '
54+
f'{":".join(map(str, server.server_address))}')
55+
if args.ssl:
56+
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
57+
context.load_cert_chain('./server.pem')
58+
server.socket = context.wrap_socket(server.socket, server_side=True)
59+
60+
body_log_file = Path(args.body_log)
61+
62+
# Create an empty log file
63+
if body_log_file.exists():
64+
body_log_file.unlink()
65+
body_log_file.touch()
66+
67+
server.serve_forever()

test_objects.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,13 @@
99
"""
1010

1111
import boto3
12+
import json
1213
import requests
1314
import secrets
15+
import time
16+
17+
from util import flatten
18+
from util import setup_notification_endpoint
1419

1520
from urllib.parse import urlparse
1621

@@ -59,3 +64,90 @@ def test_bucket_urls(objects_endpoint, access_key, secret_key):
5964
response = requests.get(url)
6065
assert response.status_code == 200
6166
assert response.text == "test"
67+
68+
69+
def test_notifications(
70+
bucket,
71+
access_key,
72+
secret_key,
73+
objects_endpoint,
74+
server,
75+
region,
76+
):
77+
""" Using S3 SNS (Simple Notification Service) we can be informed via
78+
webhooks, when something changes on a bucket.
79+
80+
"""
81+
82+
# Run a service that can act as a webhook endpoint
83+
setup_notification_endpoint(server)
84+
85+
# Get an SNS client (Simple Notification Service)
86+
sns = boto3.client(
87+
'sns',
88+
endpoint_url=objects_endpoint,
89+
aws_access_key_id=access_key,
90+
aws_secret_access_key=secret_key,
91+
region_name='default',
92+
)
93+
94+
# Create a test topic
95+
name = f"at-{secrets.token_hex(8)}"
96+
97+
topic = sns.create_topic(Name=name, Attributes={
98+
"push-endpoint": f"http://{server.ip('public', 4)}:8000",
99+
})
100+
101+
# Get notified whenever an object is created
102+
bucket.Notification().put(NotificationConfiguration={
103+
"TopicConfigurations": [
104+
{
105+
"Id": name,
106+
"TopicArn": topic['TopicArn'],
107+
"Events": ["s3:ObjectCreated:*"]
108+
}
109+
]
110+
})
111+
112+
# We have to wait a moment for the configuration to propagate
113+
timeout = time.monotonic() + 30
114+
115+
while time.monotonic() < timeout:
116+
bucket.put_object(Key='pre-check', Body=b'')
117+
118+
found = False
119+
with server.get_file_handle('notification-body.log') as notifications:
120+
for line in notifications:
121+
n = json.loads(line)
122+
for r in n['Records']:
123+
if r['s3']['object']['key'] == 'pre-check':
124+
found = True
125+
126+
if found:
127+
break
128+
129+
time.sleep(1)
130+
131+
# Generate multiple notifications
132+
for i in range(3):
133+
bucket.put_object(Key=f'count-{i}', Body=b'')
134+
135+
# Ensure they were received (excluding the pre-check objects from above)
136+
with server.get_file_handle('notification-body.log') as notification_log:
137+
notifications = [json.loads(line) for line in notification_log
138+
if 'pre-check' not in line]
139+
140+
# A single message may contain multiple records
141+
records = flatten(m['Records'] for m in notifications)
142+
assert len(records) == 3
143+
144+
# The records are sent in order
145+
assert records[0]['s3']['object']['key'] == 'count-0'
146+
assert records[1]['s3']['object']['key'] == 'count-1'
147+
assert records[2]['s3']['object']['key'] == 'count-2'
148+
149+
# They all share some properties
150+
for record in records:
151+
assert record['eventName'] == 'ObjectCreated:Put'
152+
assert record['awsRegion'] == region
153+
assert record['s3']['bucket']['name'] == bucket.name

util.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from hashlib import blake2b
2727
from ipaddress import ip_address
2828
from ipaddress import ip_network
29+
from itertools import chain
2930
from paramiko import SSHClient, AutoAddPolicy
3031
from paramiko.ssh_exception import ChannelException
3132
from paramiko.ssh_exception import NoValidConnectionsError
@@ -733,6 +734,25 @@ def setup_lbaas_backend(backend, backend_network, ssl=False, protocol='tcp'):
733734
raise AssertionError(f'Unknown protocol: {protocol}')
734735

735736

737+
def setup_notification_endpoint(endpoint):
738+
""" Configures a server to work as a S3 SNS notification endpoint.
739+
740+
A simple HTTP test server is started to serve a webhook URL for a S3 SNS
741+
notification endpoint.
742+
"""
743+
744+
# Copy test server Python script to backend server
745+
endpoint.put_file('scripts/sns-endpoint-test-server')
746+
endpoint.run('chmod +x sns-endpoint-test-server')
747+
748+
endpoint.run(oneliner(f'''
749+
systemd-run
750+
--user
751+
--unit sns-endpoint
752+
./sns-endpoint-test-server
753+
'''))
754+
755+
736756
def wait_for_url_ready(url, prober, content=None, timeout=90):
737757
""" Waits for an URL to return an OK status code or specific content. """
738758

@@ -856,3 +876,9 @@ def skip_test_when(match, reason=None):
856876
pytest.skip(reason)
857877

858878
raise
879+
880+
881+
def flatten(list_of_lists):
882+
""" Flatten one level of nesting. """
883+
884+
return list(chain.from_iterable(list_of_lists))

0 commit comments

Comments
 (0)