Source code for gordon_gcp.plugins.janitor.gpubsub_publisher

# -*- coding: utf-8 -*-
#
# Copyright 2018 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client module to publish any required DNS changes initiated from
:class:`.GDNSReconciler` to `Google Cloud Pub/Sub <https://cloud.
google.com/pubsub/docs/overview>`_. The consumer of these messages is
the `Gordon service <https://github.com/spotify/gordon>`_.

This client wraps around `google-cloud-pubsub <https://pypi.python.org
/pypi/google-cloud-pubsub>`_ using `grpc <https://github.com/googleapis/
googleapis/blob/master/google/pubsub/v1/pubsub.proto>`_ rather than
inheriting from :class:`.AIOConnection`.

.. attention::

    This publisher client is an internal module for the core janitor
    logic. No other use cases are expected.


To use:

.. code-block:: python

    import asyncio
    import gordon_gcp

    config = {
        'keyfile': '/path/to/keyfile.json',
        'project': 'a-dns-project',
        'topic': 'a-topic',
    }
    changes_channel = asyncio.Queue()

    publisher = gordon_gcp.get_gpubsub_publisher(
        config, changes_channel)

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(publisher.start())
    finally:
        loop.close()


The keyfile is optional.
If not provided the default service account will be used.
"""

import asyncio
import datetime
import functools
import json
import logging
import os

import zope.interface
from asyncio_extras import threads
from google.api_core import exceptions as google_exceptions
from google.cloud import pubsub
from gordon_janitor import interfaces

from gordon_gcp import exceptions
from gordon_gcp.clients import auth


__all__ = ('GPubsubPublisher',)


class GPubsubPublisherBuilder:
    """Build and configure a :class:`GPubsubPublisher` object.

    Args:
        config (dict): Google Cloud Pub/Sub-related configuration.
        metrics (obj): :interface:`IMetricRelay` implementation.
        changes_channel (asyncio.Queue): queue to publish message to
            make corrections to Cloud DNS.
    """
    def __init__(self, config, metrics, changes_channel, **kwargs):
        self.config = config
        self.metrics = metrics
        self.changes_channel = changes_channel
        self.kwargs = kwargs

    def _validate_config(self):
        # req keys: project, topic
        if not self.config.get('project'):
            msg = 'The GCP project where Cloud Pub/Sub is located is required.'
            logging.error(msg)
            raise exceptions.GCPConfigError(msg)
        if not self.config.get('topic'):
            msg = ('A topic for the client to publish to in Cloud Pub/Sub is '
                   'required.')
            logging.error(msg)
            raise exceptions.GCPConfigError(msg)

        topic_prefix = f'projects/{self.config["project"]}/topics/'
        if not self.config.get('topic').startswith(topic_prefix):
            self.config['topic'] = f'{topic_prefix}{self.config["topic"]}'

    def _init_auth(self):
        # a publisher client can't be made with credentials if a channel is
        # already made/provided, which is what happens when the emulator is
        # running ಠ_ಠ
        # See (https://github.com/GoogleCloudPlatform/
        #      google-cloud-python/pull/4839)
        auth_client = None
        if not os.environ.get('PUBSUB_EMULATOR_HOST'):
            scopes = self.config.get('scopes')
            # creating a dummy `session` as the pubsub.PublisherClient never
            # uses it but without it aiohttp will complain about an unclosed
            # client session that would otherwise be made by default
            auth_client = auth.GAuthClient(
                keyfile=self.config.get('keyfile'),
                scopes=scopes, session='noop')
        return auth_client

    def _init_client(self, auth_client):
        # Silly emulator constraints
        creds = getattr(auth_client, 'creds', None)
        _client = pubsub.PublisherClient(credentials=creds)

        topic = self.config['topic']
        try:
            _client.create_topic(topic)
        except google_exceptions.AlreadyExists:
            # already created
            pass
        except Exception as e:
            msg = f'Error trying to create topic "{topic}": {e}'
            logging.error(msg, exc_info=e)
            raise exceptions.GCPGordonJanitorError(msg)

        return _client

    def build_publisher(self):
        self._validate_config()
        auth_client = self._init_auth()
        pubsub_client = self._init_client(auth_client)
        return GPubsubPublisher(
            self.config, self.metrics, pubsub_client, self.changes_channel,
            **self.kwargs)


[docs]@zope.interface.implementer(interfaces.IPublisher) class GPubsubPublisher: """Client to publish change messages to Google Pub/Sub. Args: config (dict): Google Cloud Pub/Sub-related configuration, ex. 'projects/test-example/topics/a-topic'. publisher (google.cloud.pubsub_v1.publisher.client.Client): client to interface with Google Pub/Sub API. metrics (obj): :interface:`IMetricRelay` implementation. changes_channel (asyncio.Queue): queue to publish message to make corrections to Cloud DNS. """ def __init__(self, config, metrics, publisher, changes_channel=None, **kw): self.topic = config['topic'] self.metrics = metrics self.publisher = publisher self.changes_channel = changes_channel self.cleanup_timeout = config.get('cleanup_timeout', 60) self.sleep_for = 0.5 # half a second self._messages = set()
[docs] async def cleanup(self): """Clean up outstanding tasks and emit final logs + metrics. This method collects all tasks that this particular class initiated, and will cancel them if they don't complete within the configured timeout period. """ tasks_to_clear = [ t for t in self._messages if not t.done() ] iterations = self.cleanup_timeout / self.sleep_for while iterations: tasks_to_clear = [t for t in tasks_to_clear if not t.done()] if not tasks_to_clear: break await asyncio.sleep(self.sleep_for) iterations -= 1 # give up on waiting for tasks to complete if tasks_to_clear: msg = (f'The following tasks did not complete in time and are ' f'being cancelled: {tasks_to_clear}') logging.warning(msg) for task in tasks_to_clear: task.cancel() msg = ('Finished sending reconciliation messages to Google Pub/Sub.') logging.info(msg)
def _message_publish_callback(self, message, future): action = message['action'] name = message['resourceRecords']['name'] msg_id = future.result() self._messages.remove(future) logging.debug(f'Message published for {action}:{name} as {msg_id},' f'currently tracking {len(self._messages)} messages.')
[docs] @threads.threadpool def publish(self, message): """Publish received change message to Google Pub/Sub. Args: message (dict): change message received from the :obj:`changes_channel` to emit. """ message['timestamp'] = datetime.datetime.utcnow().isoformat() bytes_message = bytes(json.dumps(message), encoding='utf-8') future = self.publisher.publish(self.topic, bytes_message) self._messages.add(future) future.add_done_callback( functools.partial(self._message_publish_callback, message))
[docs] async def run(self): """Start consuming from :obj:`changes_channel`. Once ``None`` is received from the channel, finish processing records and clean up any outstanding tasks. """ base_context = {'plugin': 'gpubsub-publisher'} timer = self.metrics.timer('plugin-runtime', context=base_context) await timer.start() while True: change_message = await self.changes_channel.get() if change_message is None: break context = base_context.copy() context['action'] = change_message['action'] try: await self.publish(change_message) context['result'] = 'published' except Exception as e: # todo logging.error('Exception while trying to publish message to ' f' pubsub: {e}') context['result'] = 'error' await self.metrics.incr('change-message', context=context) await self.cleanup() await timer.stop()