Source code for gordon_gcp.plugins.janitor.gpubsub_publisher

# -*- coding: utf-8 -*-
# Copyright 2018 Spotify AB
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Client module to publish any required DNS changes initiated from
:class:`.GDNSReconciler` to `Google Cloud Pub/Sub <https://cloud.>`_. The consumer of these messages is
the `Gordon service <>`_.

This client wraps around `google-cloud-pubsub <
/pypi/google-cloud-pubsub>`_ using `grpc <
googleapis/blob/master/google/pubsub/v1/pubsub.proto>`_ rather than
inheriting from :class:`.AIOConnection`.

.. attention::

    This publisher client is an internal module for the core janitor
    logic. No other use cases are expected.

To use:

.. code-block:: python

    import asyncio
    import gordon_gcp

    config = {
        'keyfile': '/path/to/keyfile.json',
        'project': 'a-dns-project',
        'topic': 'a-topic',
    changes_channel = asyncio.Queue()

    publisher = gordon_gcp.get_gpubsub_publisher(
        config, changes_channel)

    loop = asyncio.get_event_loop()

The keyfile is optional.
If not provided the default service account will be used.

import asyncio
import datetime
import functools
import json
import logging
import os

import zope.interface
from asyncio_extras import threads
from google.api_core import exceptions as google_exceptions
from import pubsub
from gordon_janitor import interfaces

from gordon_gcp import exceptions
from gordon_gcp.clients import auth

__all__ = ('GPubsubPublisher',)

class GPubsubPublisherBuilder:
    """Build and configure a :class:`GPubsubPublisher` object.

        config (dict): Google Cloud Pub/Sub-related configuration.
        metrics (obj): :interface:`IMetricRelay` implementation.
        changes_channel (asyncio.Queue): queue to publish message to
            make corrections to Cloud DNS.
    def __init__(self, config, metrics, changes_channel, **kwargs):
        self.config = config
        self.metrics = metrics
        self.changes_channel = changes_channel
        self.kwargs = kwargs

    def _validate_config(self):
        # req keys: project, topic
        if not self.config.get('project'):
            msg = 'The GCP project where Cloud Pub/Sub is located is required.'
            raise exceptions.GCPConfigError(msg)
        if not self.config.get('topic'):
            msg = ('A topic for the client to publish to in Cloud Pub/Sub is '
            raise exceptions.GCPConfigError(msg)

        topic_prefix = f'projects/{self.config["project"]}/topics/'
        if not self.config.get('topic').startswith(topic_prefix):
            self.config['topic'] = f'{topic_prefix}{self.config["topic"]}'

    def _init_auth(self):
        # a publisher client can't be made with credentials if a channel is
        # already made/provided, which is what happens when the emulator is
        # running ಠ_ಠ
        # See (
        #      google-cloud-python/pull/4839)
        auth_client = None
        if not os.environ.get('PUBSUB_EMULATOR_HOST'):
            scopes = self.config.get('scopes')
            # creating a dummy `session` as the pubsub.PublisherClient never
            # uses it but without it aiohttp will complain about an unclosed
            # client session that would otherwise be made by default
            auth_client = auth.GAuthClient(
                scopes=scopes, session='noop')
        return auth_client

    def _init_client(self, auth_client):
        # Silly emulator constraints
        creds = getattr(auth_client, 'creds', None)
        _client = pubsub.PublisherClient(credentials=creds)

        topic = self.config['topic']
        except google_exceptions.AlreadyExists:
            # already created
        except Exception as e:
            msg = f'Error trying to create topic "{topic}": {e}'
            logging.error(msg, exc_info=e)
            raise exceptions.GCPGordonJanitorError(msg)

        return _client

    def build_publisher(self):
        auth_client = self._init_auth()
        pubsub_client = self._init_client(auth_client)
        return GPubsubPublisher(
            self.config, self.metrics, pubsub_client, self.changes_channel,

[docs]@zope.interface.implementer(interfaces.IPublisher) class GPubsubPublisher: """Client to publish change messages to Google Pub/Sub. Args: config (dict): Google Cloud Pub/Sub-related configuration, ex. 'projects/test-example/topics/a-topic'. publisher ( client to interface with Google Pub/Sub API. metrics (obj): :interface:`IMetricRelay` implementation. changes_channel (asyncio.Queue): queue to publish message to make corrections to Cloud DNS. """ def __init__(self, config, metrics, publisher, changes_channel=None, **kw): self.topic = config['topic'] self.metrics = metrics self.publisher = publisher self.changes_channel = changes_channel self.cleanup_timeout = config.get('cleanup_timeout', 60) self.sleep_for = 0.5 # half a second self._messages = set()
[docs] async def cleanup(self): """Clean up outstanding tasks and emit final logs + metrics. This method collects all tasks that this particular class initiated, and will cancel them if they don't complete within the configured timeout period. """ tasks_to_clear = [ t for t in self._messages if not t.done() ] iterations = self.cleanup_timeout / self.sleep_for while iterations: tasks_to_clear = [t for t in tasks_to_clear if not t.done()] if not tasks_to_clear: break await asyncio.sleep(self.sleep_for) iterations -= 1 # give up on waiting for tasks to complete if tasks_to_clear: msg = (f'The following tasks did not complete in time and are ' f'being cancelled: {tasks_to_clear}') logging.warning(msg) for task in tasks_to_clear: task.cancel() msg = ('Finished sending reconciliation messages to Google Pub/Sub.')
def _message_publish_callback(self, message, future): action = message['action'] name = message['resourceRecords']['name'] msg_id = future.result() self._messages.remove(future) logging.debug(f'Message published for {action}:{name} as {msg_id},' f'currently tracking {len(self._messages)} messages.')
[docs] @threads.threadpool def publish(self, message): """Publish received change message to Google Pub/Sub. Args: message (dict): change message received from the :obj:`changes_channel` to emit. """ message['timestamp'] = datetime.datetime.utcnow().isoformat() bytes_message = bytes(json.dumps(message), encoding='utf-8') future = self.publisher.publish(self.topic, bytes_message) self._messages.add(future) future.add_done_callback( functools.partial(self._message_publish_callback, message))
[docs] async def run(self): """Start consuming from :obj:`changes_channel`. Once ``None`` is received from the channel, finish processing records and clean up any outstanding tasks. """ base_context = {'plugin': 'gpubsub-publisher'} timer = self.metrics.timer('plugin-runtime', context=base_context) await timer.start() while True: change_message = await self.changes_channel.get() if change_message is None: break context = base_context.copy() context['action'] = change_message['action'] try: await self.publish(change_message) context['result'] = 'published' except Exception as e: # todo logging.error('Exception while trying to publish message to ' f' pubsub: {e}') context['result'] = 'error' await self.metrics.incr('change-message', context=context) await self.cleanup() await timer.stop()