Source code for gordon_gcp.plugins.service.enricher

# -*- coding: utf-8 -*-
#
# Copyright 2017-2018 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client module to enrich an event message with any missing information
(such as IP addresses to a new hostname) and to generate the desired
record(s) (e.g. ``A`` or ``CNAME`` records). Once an event message is
done (either successfully enriched or met with errors along the way),
it will be placed into the appropriate channel, either the
``success_channel`` or ``error_channel`` to be further handled by the
``gordon`` core system.

.. attention::
    The enricher client is an internal module for the core gordon
    logic. No other use cases are expected.

"""

import asyncio
import logging

import zope.interface
from gordon import interfaces

from gordon_gcp import exceptions
from gordon_gcp.clients import auth
from gordon_gcp.clients import gdns
from gordon_gcp.clients import http
from gordon_gcp.plugins import _utils


__all__ = ('GCEEnricher', 'GCEEnricherBuilder')


class GCEEnricherBuilder:
    """Build and configure a :class:`GCEEnricher` object.

    Args:
        config (dict): Google Compute Engine API related configuration.
        metrics (obj): :interface:`IMetricRelay` implementation.
        kwargs (dict): Additional keyword arguments to pass to the
            enricher.
    """
    def __init__(self, config, metrics, **kwargs):
        self.config = config
        self.metrics = metrics
        self.kwargs = kwargs
        self._validate_config()
        self.http_client = self._init_http_client()
        self.dns_client = self._init_dns_client()

    def _validate_dns_zone(self):
        msg = []
        if not self.config.get('dns_zone'):
            msg.append('A dns zone is required to build correct A records.')
        if not self.config.get('dns_zone', '').endswith('.'):
            msg.append('A dns zone must be an FQDN and end with the root '
                       'zone (".").')
        return msg

    def _validate_retries(self):
        if not self.config.get('retries'):
            self.config['retries'] = 5
        return []

    def _validate_project(self):
        msg = []
        if not self.config.get('project'):
            msg.append('The GCP project that contains the Google Cloud DNS '
                       'managed zone is required to correctly delete A records '
                       'for deleted instances.')
        return msg

    def _call_validators(self):
        """Actually run all the validations.

        Returns:
            list(str): Error messages from the validators.
        """
        msg = []
        msg.extend(self._validate_dns_zone())
        msg.extend(self._validate_retries())
        msg.extend(self._validate_project())
        return msg

    def _validate_config(self):
        errors = []
        errors = self._call_validators()
        if errors:
            error_msgs = '\n'.join(errors)
            exp_msg = f'Invalid configuration:\n{error_msgs}'
            logging.error(error_msgs)
            raise exceptions.GCPConfigError(exp_msg)

    def _init_auth(self):
        scopes = self.config.get('scopes')
        return auth.GAuthClient(keyfile=self.config.get('keyfile'),
                                scopes=scopes)

    def _init_http_client(self):
        return http.AIOConnection(auth_client=self._init_auth())

    def _init_dns_client(self):
        return gdns.GDNSClient(
            self.config['project'], self._init_auth(),
            default_zone_prefix=self.config.get('default_zone_prefix', ''))

    def build_enricher(self):
        return GCEEnricher(self.config, self.metrics, self.http_client,
                           self.dns_client, **self.kwargs)


[docs]@zope.interface.implementer(interfaces.IMessageHandler) class GCEEnricher: """Get needed instance information from Google Compute Engine. Args: config (dict): configuration relevant to Compute Engine. metrics (obj): :interface:`IMetricRelay` implementation. http_client (.AIOConnection): client for interacting with the GCE API. """ phase = 'enrich' def __init__(self, config, metrics, http_client, dns_client, **kwargs): self.config = config self.metrics = metrics self._http_client = http_client self._dns_client = dns_client self._logger = logging.getLogger('') def _check_instance_data(self, instance_data): assert self._get_external_ip(instance_data) async def _poll_for_instance_data(self, resource_name, msg_logger): exception = None backoff = 2 base_url = f'https://www.googleapis.com/compute/v1/{resource_name}' # Poll until instance data contains all necessary information. for attempt in range(1, self.config['retries'] + 1): try: msg_logger.debug(f'Attempt {attempt}: fetching {base_url}') instance_data = await self._http_client.get_json(base_url) self._check_instance_data(instance_data) return instance_data except exceptions.GCPHTTPError as e: exception = e break except (KeyError, IndexError) as e: exception = e if attempt == self.config['retries']: continue await asyncio.sleep(backoff) backoff = backoff ** 2 msg = (f'Could not get necessary information for {resource_name}: ' f'{exception.__class__.__name__}: {exception}') raise exceptions.GCPGordonError(msg) def _create_A_rrecord(self, fqdn, external_ip, default_ttl, msg_logger): msg_logger.debug(f'Creating A record: {fqdn} -> {external_ip}') return { 'name': fqdn, 'rrdatas': [external_ip], 'type': 'A', 'ttl': default_ttl } def _get_fqdn(self, hostname): dns_zone = self.config['dns_zone'] return '.'.join([hostname, dns_zone]) def _get_external_ip(self, instance_data): interfaces = instance_data['networkInterfaces'] return interfaces[0]['accessConfigs'][0]['natIP'] def _get_internal_ip(self, instance_data): interfaces = instance_data['networkInterfaces'] return interfaces[0]['networkIP'] async def _create_rrecords(self, event_data, instance_data, msg_logger): # async in case anything gets added in a subclass that needs to be async fqdn = self._get_fqdn(instance_data['name']) external_ip = self._get_external_ip(instance_data) default_ttl = self.config['default_ttl'] return [ self._create_A_rrecord( fqdn, external_ip, default_ttl, msg_logger) ] # The types of records that this returns for deletion should match the # types of records that are created above in the _create_rrecords method async def _get_matching_records_for_deletion(self, instance_resource_url): # Get the fqdn of the instance instance_name = instance_resource_url.split('/')[-1] fqdn = self._get_fqdn(instance_name) # Get the A records with name matching the instance FQDN search_params = {'name': fqdn, 'type': 'A'} response = await self._dns_client.get_records_for_zone( self.config['dns_zone'], params=search_params) return response['rrsets']
[docs] async def handle_message(self, event_message): """ Enrich message with extra context and send it to the publisher. When a message is successfully processed, it is passed to the :obj:`self.success_channel`. However, if there is a problem during processing, the message is passed to the :obj:`self.error_channel`. Args: event_message (.GEventMessage): message requiring additional information. """ # if the message has resource records, assume it has all info # it needs to be published, and therefore is already enriched if event_message.data['resourceRecords']: msg = 'Message already enriched, skipping phase.' event_message.append_to_history(msg, self.phase) return msg_logger = _utils.GEventMessageLogger( self._logger, {'msg_id': event_message.msg_id}) if event_message.data['action'] == 'additions': instance_data = await self._poll_for_instance_data( event_message.data['resourceName'], msg_logger) records = await self._create_rrecords( event_message.data, instance_data, msg_logger) elif event_message.data['action'] == 'deletions': instance_resource_url = event_message.data['resourceName'] records = await self._get_matching_records_for_deletion( instance_resource_url) msg_logger.debug(f'Enriched with resource record(s): {records}') event_message.data['resourceRecords'].extend(records) msg = (f"Enriched msg with {len(event_message.data['resourceRecords'])}" " resource record(s).") event_message.append_to_history(msg, self.phase)