Source code for gordon_gcp.clients.http

# -*- coding: utf-8 -*-
#
# Copyright 2018 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module to interact with Google APIs via asynchronous HTTP calls.
:class:`.AIOConnection` is meant to be used/inherited by other
product-specific API clients as it handles Google authentication and
automatic refresh of tokens.

.. todo::

    Include that it also handles retries once implemented.

To use:

.. code-block:: python

    import gordon_gcp

    keyfile = '/path/to/service_account_keyfile.json'
    auth_client = gordon_gcp.GAuthClient(keyfile=keyfile)

    client = AIOConnection(auth_client=auth_client)
    resp = await client.request('get', 'http://api.example.com/foo')


The keyfile is optional.
If not provided the default service account will be used.
"""

import datetime
import http.client
import json
import logging
import uuid

import aiohttp

from gordon_gcp import exceptions
from gordon_gcp.clients import _utils


__all__ = ('AIOConnection',)

REFRESH_STATUS_CODES = (http.client.UNAUTHORIZED,)


[docs]class AIOConnection: """Async HTTP client to Google APIs with service-account-based auth. Args: auth_client (.GAuthClient): client to manage authentication for HTTP API requests. session (aiohttp.ClientSession): (optional) ``aiohttp`` HTTP session to use for sending requests. Defaults to the session object attached to :obj:`auth_client` if not provided. """ def __init__(self, auth_client=None, session=None): self._auth_client = auth_client self._session = session or auth_client._session
[docs] async def valid_token_set(self): """Check for validity of token, and refresh if none or expired.""" is_valid = False if self._auth_client.token: # Account for a token near expiration now = datetime.datetime.utcnow() skew = datetime.timedelta(seconds=60) if self._auth_client.expiry > (now + skew): is_valid = True return is_valid
[docs] async def request(self, method, url, params=None, headers=None, data=None, json=None, token_refresh_attempts=2, **kwargs): """Make an asynchronous HTTP request. Args: method (str): HTTP method to use for the request. url (str): URL to be requested. params (dict): (optional) Query parameters for the request. Defaults to ``None``. headers (dict): (optional) HTTP headers to send with the request. Headers pass through to the request will include :attr:`DEFAULT_REQUEST_HEADERS`. data (obj): (optional) A dictionary, bytes, or file-like object to send in the body of the request. json (obj): (optional) Any json compatible python object. NOTE: json and body parameters cannot be used at the same time. token_refresh_attempts (int): (optional) Number of attempts a token refresh should be performed. Returns: (str) HTTP response body. Raises: :exc:`.GCPHTTPError`: if any exception occurred, specifically a :exc:`.GCPHTTPResponseError`, if the exception is associated with a response status code. """ if all([data, json]): msg = ('"data" and "json" request parameters can not be used ' 'at the same time') logging.warn(msg) raise exceptions.GCPHTTPError(msg) req_headers = headers or {} req_headers.update(_utils.DEFAULT_REQUEST_HEADERS) req_kwargs = { 'params': params, 'headers': req_headers, } if data: req_kwargs['data'] = data if json: req_kwargs['json'] = json if token_refresh_attempts: if not await self.valid_token_set(): await self._auth_client.refresh_token() token_refresh_attempts -= 1 req_headers.update( {'Authorization': f'Bearer {self._auth_client.token}'} ) request_id = kwargs.get('request_id', uuid.uuid4()) logging.debug(_utils.REQ_LOG_FMT.format( request_id=request_id, method=method.upper(), url=url, kwargs=req_kwargs)) try: async with self._session.request(method, url, **req_kwargs) as resp: log_kw = { 'request_id': request_id, 'method': method.upper(), 'url': resp.url, 'status': resp.status, 'reason': resp.reason } logging.debug(_utils.RESP_LOG_FMT.format(**log_kw)) if resp.status in REFRESH_STATUS_CODES: logging.warning( f'[{request_id}] HTTP Status Code {resp.status}' f' returned requesting {resp.url}: {resp.reason}') if token_refresh_attempts: logging.info( f'[{request_id}] Attempting request to {resp.url} ' 'again.') return await self.request( method, url, token_refresh_attempts=token_refresh_attempts, request_id=request_id, **req_kwargs) logging.warning( f'[{request_id}] Max attempts refreshing auth token ' f'exhausted while requesting {resp.url}') resp.raise_for_status() return await resp.text() except aiohttp.ClientResponseError as e: # bad HTTP status; avoid leaky abstractions and wrap HTTP errors # with our own msg = f'[{request_id}] HTTP error response from {resp.url}: {e}' logging.error(msg, exc_info=e) raise exceptions.GCPHTTPResponseError(msg, resp.status) except exceptions.GCPHTTPResponseError as e: # from recursive call raise e except Exception as e: msg = f'[{request_id}] Request call failed: {e}' logging.error(msg, exc_info=e) raise exceptions.GCPHTTPError(msg)
[docs] async def get_json(self, url, json_callback=None, **kwargs): """Get a URL and return its JSON response. Args: url (str): URL to be requested. json_callback (func): Custom JSON loader function. Defaults to :meth:`json.loads`. kwargs (dict): Additional arguments to pass through to the request. Returns: response body returned by :func:`json_callback` function. """ if not json_callback: json_callback = json.loads response = await self.request(method='get', url=url, **kwargs) return json_callback(response)
[docs] async def get_all(self, url, params=None): """Aggregate data from all pages of an API query. Args: url (str): Google API endpoint URL. params (dict): (optional) URL query parameters. Returns: list: Parsed JSON query response results. """ if not params: params = {} items = [] next_page_token = None while True: if next_page_token: params['pageToken'] = next_page_token response = await self.get_json(url, params=params) items.append(response) next_page_token = response.get('nextPageToken') if not next_page_token: break return items