Source code for gordon_gcp.plugins.janitor.authority

# -*- coding: utf-8 -*-
#
# Copyright 2018 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A GCEAuthority retrieves a list of all instances in all projects that it has
access to, and which belong to the configured zone. For every project, it will
create a message containing domain record information and put it into the
rrset channel. Projects can be filtered by 'project name'. Instances can be
filtered by tags and metadata.

To use:

.. code-block:: python

    import asyncio

    from gordon_gcp.plugins import janitor

    async def run():
        rrset_channel = asyncio.queue()
        authority = janitor.get_authority(config, rrset_channel)
        await authority.start()
        msg = await rrset_channel.get()
        print(msg)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    # prints: {'zone': 'example.com', 'resourceRecords': [...]}
"""

import asyncio
import logging

import aiohttp
import zope.interface
from gordon_janitor import interfaces

from gordon_gcp import exceptions
from gordon_gcp.clients import auth
from gordon_gcp.clients import gce
from gordon_gcp.clients import gcrm


__all__ = ('GCEAuthority', 'GCEAuthorityBuilder')


class GCEAuthorityBuilder:
    """Build and configure a :class:`GCEAuthority` object.

    Args:
        config (dict): plugin-specific configuration.
        metrics (obj): :interface:`IMetricRelay` implementation.
        rrset_channel (asyncio.Queue): channel to send resource record messages
            to.
    """
    def __init__(self, config, metrics, rrset_channel, **kwargs):
        self.config = config
        self.metrics = metrics
        self.rrset_channel = rrset_channel
        self.kwargs = kwargs
        self.session = None

    def _get_crm_client(self, keyfile_path, scopes):
        crm_auth = auth.GAuthClient(
            keyfile_path, scopes=scopes, session=self.session)
        return gcrm.GCRMClient(crm_auth, self.session)

    def _get_gce_client(self, keyfile_path, scopes):
        tag_blacklist = self.config.get('tag_blacklist', [])
        _metadata_blacklist = self.config.get('metadata_blacklist', [])
        metadata_blacklist = [dict([pair]) for pair in _metadata_blacklist]

        gce_auth = auth.GAuthClient(keyfile_path, scopes=scopes,
                                    session=self.session)
        return gce.GCEClient(gce_auth, self.session,
                             blacklisted_tags=tag_blacklist,
                             blacklisted_metadata=metadata_blacklist)

    def _validate_config(self):
        if not self.config.get('dns_zone'):
            msg = ('The absolute DNS zone, i.e. "example.com.", is required to '
                   'identify to which zone generated records should belong.')
            logging.error(msg)
            raise exceptions.GCPConfigError(msg)

    def build_authority(self):
        self._validate_config()
        keyfile_path = self.config.get('keyfile')
        scopes = self.config.get('scopes')
        self.session = aiohttp.ClientSession()
        crm_client = self._get_crm_client(keyfile_path, scopes)
        gce_client = self._get_gce_client(keyfile_path, scopes)

        return GCEAuthority(self.config, self.metrics, crm_client, gce_client,
                            self.rrset_channel, **self.kwargs)


[docs]@zope.interface.implementer(interfaces.IAuthority) class GCEAuthority: """Gather instance data from GCE. Args: config (dict): plugin-specific configuration. metrics (obj): :interface:`IMetricRelay` implementation. crm_client (.GCRMClient): client used to fetch GCE projects. gce_client (.GCEClient): client used to fetch instances for a project. rrset_channel (asyncio.Queue): channel to send resource record messages to. """ PROJECT_SKIP_RESP_CODES = {403, 404, 410} def __init__(self, config, metrics, crm_client, gce_client, rrset_channel=None, **kwargs): self.config = config self.metrics = metrics self.crm_client = crm_client self.gce_client = gce_client self.rrset_channel = rrset_channel async def _get_active_project_ids(self): active_projects = await self.crm_client.list_all_active_projects() return set(p.get('projectId') for p in active_projects) async def _get_projects(self): project_whitelist = self.config.get('project_whitelist') if project_whitelist: return sorted(project_whitelist) projects = await self._get_active_project_ids() project_blacklist = set(self.config.get('project_blacklist', [])) sorted_projects = sorted(projects - project_blacklist) context = {'plugin': 'gceauthority'} await self.metrics.set( 'projects', len(sorted_projects), context=context) return sorted_projects def _filter_results(self, results): successful_results = [] for index, result in enumerate(results): if (isinstance(result, exceptions.GCPHTTPResponseError) and result.status in GCEAuthority.PROJECT_SKIP_RESP_CODES): msg = (f'Could not fetch instance list for project, skipping: ' f'{result}') logging.warn(msg) elif isinstance(result, Exception): raise result else: successful_results.extend(result) return successful_results async def _get_instances(self, projects): instance_filter = self.config.get('instance_filter') coros = set() for project in projects: coro = self.gce_client.list_instances( project, instance_filter=instance_filter) coros.add(coro) all_results = await asyncio.gather(*coros, return_exceptions=True) return self._filter_results(all_results) def _create_instance_rrset(self, instance): ip = instance['networkInterfaces'][0]['accessConfigs'][0]['natIP'] fqdn = f"{instance['name']}.{self.config['dns_zone']}" return { 'name': fqdn, 'type': 'A', 'rrdatas': [ip], 'source': 'gceauthority' } def _create_msgs(self, instances): msgs = [] rrsets = [] for instance in instances: try: rrsets.append(self._create_instance_rrset(instance)) except (KeyError, IndexError) as e: instance_name = instance.get('name') logging.warn( 'Could not extract instance information for ' f'{instance_name} because of missing key {e}, skipping.') if rrsets: msgs.append({ 'zone': self.config['dns_zone'], 'rrsets': rrsets }) return msgs
[docs] async def run(self): """Batch instance data and send it to the :obj:`self.rrset_channel`. """ plugin = 'gceauthority' timer_context = {'plugin': plugin} timer = self.metrics.timer( 'plugin-runtime', context=timer_context) await timer.start() projects = await self._get_projects() instances = await self._get_instances(projects) for rrset_msg in self._create_msgs(instances): zone = rrset_msg['zone'] rrsets = rrset_msg['rrsets'] total_rrsets = len(rrsets) msg = f'[{zone}] Found {total_rrsets} rrsets for zone in GCE.' logging.info(msg) context = {'plugin': plugin, 'zone': zone} await self.metrics.set('rrsets', total_rrsets, context=context) await self.rrset_channel.put(rrset_msg) await self.cleanup() await timer.stop()
[docs] async def cleanup(self): """Clean up after a run.""" msg = 'Finished processing GCE data.' logging.info(msg) await self.rrset_channel.put(None) await self.gce_client._session.close()