Skip to content

fix(pubsub): split large (mod)ACK requests into smaller ones#9594

Merged
plamut merged 3 commits intogoogleapis:masterfrom
plamut:iss-9103
Nov 22, 2019
Merged

fix(pubsub): split large (mod)ACK requests into smaller ones#9594
plamut merged 3 commits intogoogleapis:masterfrom
plamut:iss-9103

Conversation

@plamut
Copy link
Contributor

@plamut plamut commented Nov 4, 2019

Fixes #9103.

There is a server-side limit on the maximum size of ACK and modACK requests, which can be hit if the leaser tries to manage too many messages in a single requests. This PR avoids the problem by splitting such large requests into multiple smaller requests.

How to test

Steps to reproduce:

  • publish a lot of messages to a topic (significantly more than 3000)
  • subscribe to that topic using a streaming pull and a callback that processes messages for a non-negligible amount of time (e.g. 1 second)
  • observe the DEBUG log

Actual result (before the fix):
The following error can be observed in the log output:

google.api_core.exceptions.InvalidArgument: 400 Request payload size exceeds the limit: 524288 bytes.

Expected result (after the fix):
There is no error, mass-acknowledging leased messages works just fine.

The following script might come helpful:

reproduce_9103.py
import itertools
import logging
import os
import threading
import time
from argparse import ArgumentParser

from google import api_core
from google.cloud import pubsub_v1

logging.basicConfig(
    format='%(asctime)s %(levelname)s %(threadName)s: %(message)s', level=logging.INFO)
logger = logging.getLogger()
logger.setLevel("DEBUG")


log_format = (
    "%(levelname)-8s [%(asctime)s] %(threadName)-33s "
    "[%(name)s] [%(filename)s:%(lineno)d][%(funcName)s] %(message)s"
)

logging.basicConfig(
    level=logging.DEBUG, format=log_format
)


def msg_handler(message):
    """Message handler."""
    SLEEP_FOR = 1

    logging.info(f"\x1b[1m[{message.message_id}] got message with content: {str(message.data)} (ack_id: {message.ack_id})\x1b[0m")
    logging.info(f"\x1b[1m[{message.message_id}] sleeping for {SLEEP_FOR} seconds\x1b[0m")

    time.sleep(SLEEP_FOR)
    logging.info(f"\x1b[1m[[{message.message_id}] done sleeping, sending ack()\x1b[0m")

    message.ack()


def _publish_messages(publisher, topic_path, batch_sizes):
    """Publish ``count`` messages in batches and wait until completion."""
    publish_futures = []
    msg_counter = itertools.count(start=1)

    for batch_size in batch_sizes:
        msg_batch = _make_messages(count=batch_size)
        for msg in msg_batch:
            future = publisher.publish(
                topic_path, msg, seq_num=str(next(msg_counter))
            )
            publish_futures.append(future)
        time.sleep(0.1)

    # wait untill all messages have been successfully published
    for future in publish_futures:
        future.result(timeout=30)

def _make_messages(count):
    messages = [
        u"message {}/{}".format(i, count).encode("utf-8")
        for i in range(1, count + 1)
    ]
    return messages


def main():
    PROJECT_ID = "TODO: set"
    SUBSCRIPTION_NAME = "TODO: set"
    TOPIC_NAME = "TODO: set"

    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()

    topic_path = publisher.topic_path(PROJECT_ID, TOPIC_NAME)
    subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_NAME)

    try:
        publisher.create_topic(topic_path)
        subscriber.create_subscription(subscription_path, topic_path)
    except api_core.exceptions.AlreadyExists as exc:
        pass

    #batch_sizes = (400, 600, 700, 300, 500, 500, 250, 750)  # total: 4000
    batch_sizes = (4000,)
    _publish_messages(publisher, topic_path, batch_sizes=batch_sizes)

    # now subscribe and do the main part, check for max pending messages
    total_messages = sum(batch_sizes)
    flow_control = pubsub_v1.types.FlowControl(max_messages=total_messages)
    subscription_future = subscriber.subscribe(
        subscription_path, msg_handler, flow_control=flow_control
    )

    logging.info("Starting streaming pull...")    
    future = subscriber.subscribe(
        subscription_path, msg_handler, flow_control=flow_control
    )
    logging.info("Streming pull started")

    time.sleep(5)

    try:
        future.result()
    except KeyboardInterrupt:
        logging.info("Interrupted, cancelling")
        future.cancel()


if __name__ == '__main__':
    main()

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pub/Sub: Send modify_ack_deadline requests in chunks

3 participants