# -*- coding: utf-8 -*-
#
# Copyright 2018 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module to create a client interacting with Google Cloud authentication.
An instantiated client is needed for interacting with any of the Google
APIs via the :class:`.AIOConnection`.
The GAuthClient supports both service account (JSON Web Tokens/JWT)
authentication with keyfiles, and default credentials. To setup a
service account, follow `Google's docs <https://cloud.google.com/iam/
docs/creating-managing-service-account-keys>`_. More information on
default credentials can be found :ref:`here <app_default_creds>`. To
setup default credentials, follow `Application Default Credentials`_.
If a keyfile is not provided, the Application Default Credentials will
be used.
To use:
.. code-block:: pycon
>>> import asyncio
>>> import google_gcp
>>> loop = asyncio.get_event_loop()
>>> keyfile = '/path/to/service_account_keyfile.json'
# with keyfile
>>> auth_client = google_gcp.GAuthClient(keyfile=keyfile)
# with Application Default Credentials
>>> auth_client = google_gcp.GAuthClient()
>>> auth_client.token is None
True
>>> loop.run_until_complete(auth_client.refresh_token())
>>> auth_client.token
'c0ffe3'
"""
import asyncio
import json
import logging
import os
import urllib.parse
import uuid
import aiohttp
from google import auth as gauth
from google.auth import compute_engine
from google.auth import environment_vars
from google.oauth2 import _client
from google.oauth2 import service_account
from gordon_gcp import exceptions
from gordon_gcp.clients import _utils
__all__ = ('GAuthClient',)
[docs]class GAuthClient:
"""Async client to authenticate against Google Cloud APIs.
Attributes:
SCOPE_TMPL_URL (str): template URL for Google auth scopes.
DEFAULT_SCOPE (str): default scope if not provided.
JWT_GRANT_TYPE (str): grant type header value when
requesting/refreshing an access token.
Args:
keyfile (str): path to service account (SA) keyfile.
scopes (list): (optional) scopes with which to authorize the SA.
Default is ``'cloud-platform'``.
session (aiohttp.ClientSession): (optional) ``aiohttp`` HTTP
session to use for sending requests.
loop: (optional) asyncio event loop to use for HTTP requests.
NOTE: if :obj:`session` is given, then :obj:`loop` will be
ignored. Otherwise, :obj:`loop` will be used to create a
session, if provided.
"""
SCOPE_TMPL_URL = 'https://www.googleapis.com/auth/{scope}'
DEFAULT_SCOPE = 'cloud-platform'
JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'
def __init__(self, keyfile=None, scopes=None, session=None, loop=None):
self._keydata = self._load_keyfile(keyfile)
self.scopes = self._set_scopes(scopes)
self.creds = self._load_credentials()
self._session = self._set_session(session, loop)
self.token = None
self.expiry = None # UTC time
if isinstance(self.creds, compute_engine.credentials.Credentials):
self._refresh_token_f = \
self._refresh_token_using_compute_credentials
else:
self._refresh_token_f = \
self._refresh_token_using_service_account_credentials
def _load_keyfile(self, keyfile):
if not keyfile:
return None
try:
with open(keyfile, 'r') as f:
return json.load(f)
except FileNotFoundError as e:
msg = f'Keyfile {keyfile} was not found.'
logging.error(msg, exc_info=e)
raise exceptions.GCPGordonError(msg)
except json.JSONDecodeError as e:
msg = f'Keyfile {keyfile} is not valid JSON.'
logging.error(msg, exc_info=e)
raise exceptions.GCPGordonError(msg)
def _set_scopes(self, scopes):
if not scopes:
scopes = [self.DEFAULT_SCOPE]
return [self.SCOPE_TMPL_URL.format(scope=s) for s in scopes]
def _load_credentials(self):
# load credentials with two options:
# 1. using key data 2. using Application Default Credentials
if self._keydata:
return service_account.Credentials.from_service_account_info(
self._keydata, scopes=self.scopes)
credentials, _ = gauth.default(
scopes=['https://www.googleapis.com/auth/userinfo.email'])
return credentials
def _set_session(self, session, loop):
if session is not None:
return session
if not loop:
loop = asyncio.get_event_loop()
session = aiohttp.ClientSession(loop=loop)
return session
def _setup_token_request(self):
url = self.creds._token_uri
headers = _utils.DEFAULT_REQUEST_HEADERS.copy()
headers.update(
{'Content-type': 'application/x-www-form-urlencoded'}
)
body = self._setup_request_body()
body = urllib.parse.urlencode(body)
return url, headers, bytes(body.encode('utf-8'))
def _setup_request_body(self):
if self._keydata:
return {
'assertion': self.creds._make_authorization_grant_assertion(),
'grant_type': self.JWT_GRANT_TYPE,
}
return {
'refresh_token': self.creds._refresh_token,
'client_id': self.creds._client_id,
'client_secret': self.creds._client_secret,
'grant_type': 'refresh_token'
}
[docs] async def refresh_token(self):
"""Refresh oauth access token attached to this HTTP session.
Raises:
:exc:`.GCPAuthError`: if no token was found in the
response.
:exc:`.GCPHTTPError`: if any exception occurred,
specifically a :exc:`.GCPHTTPResponseError`, if the
exception is associated with a response status code.
"""
await self._refresh_token_f()
async def _refresh_token_using_compute_credentials(self):
metadata_host = os.getenv(environment_vars.GCE_METADATA_ROOT,
'metadata.google.internal')
headers = {'Metadata-Flavor': 'Google'}
sa_url = (f'http://{metadata_host}/computeMetadata/'
'v1/instance/service-accounts/'
f'{self.creds._service_account_email}/'
'?recursive=true')
sa_response = await self._execute_request(sa_url, 'GET', headers)
if 'email' in sa_response:
email = sa_response['email']
else:
email = self.creds._service_account_email
token_url = (f'http://{metadata_host}/computeMetadata/'
'v1/instance/service-accounts/'
f'{email}/token')
token_response = await self._execute_request(token_url, 'GET', headers)
self._handle_refresh_token_response(token_response)
async def _refresh_token_using_service_account_credentials(self):
url, headers, body = self._setup_token_request()
response = await self._execute_request(url, 'POST', headers, body)
self._handle_refresh_token_response(response)
async def _execute_request(
self, url, method, headers, body=None):
request_id = uuid.uuid4()
logging.debug(_utils.REQ_LOG_FMT.format(
request_id=request_id, method=method, url=url, kwargs=None))
async with self._session.request(
method, url, headers=headers, data=body) as resp:
log_kw = {
'request_id': request_id,
'method': method,
'url': resp.url,
'status': resp.status,
'reason': resp.reason,
}
logging.debug(_utils.RESP_LOG_FMT.format(**log_kw))
# avoid leaky abstractions and wrap http errors with our own
try:
resp.raise_for_status()
except aiohttp.ClientResponseError as e:
msg = f'[{request_id}] Issue connecting to {resp.url}: {e}'
logging.error(msg, exc_info=e)
raise exceptions.GCPHTTPResponseError(msg, resp.status)
return await resp.json()
def _handle_refresh_token_response(self, response):
if 'access_token' in response:
self.token = response['access_token']
else:
msg = '[{request_id}] No access token in response.'
logging.error(msg)
raise exceptions.GCPAuthError(msg)
self.expiry = _client._parse_expiry(response)