# -*- coding: utf-8 -*-
# Copyright 2018 Spotify AB
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Module to interact with Google APIs via asynchronous HTTP calls.
:class:`.AIOConnection` is meant to be used/inherited by other
product-specific API clients as it handles Google authentication and
automatic refresh of tokens.

.. todo::

    Include that it also handles retries once implemented.

To use:

.. code-block:: python

    import gordon_gcp

    keyfile = '/path/to/service_account_keyfile.json'
    auth_client = gordon_gcp.GAuthClient(keyfile=keyfile)

    client = AIOConnection(auth_client=auth_client)
    resp = await client.request('get', '')

The keyfile is optional.
If not provided the default service account will be used.

import datetime
import http.client
import json
import logging
import uuid

import aiohttp

from gordon_gcp import exceptions
from gordon_gcp.clients import _utils

__all__ = ('AIOConnection',)


[docs]class AIOConnection: """Async HTTP client to Google APIs with service-account-based auth. Args: auth_client (.GAuthClient): client to manage authentication for HTTP API requests. session (aiohttp.ClientSession): (optional) ``aiohttp`` HTTP session to use for sending requests. Defaults to the session object attached to :obj:`auth_client` if not provided. """ def __init__(self, auth_client=None, session=None): self._auth_client = auth_client self._session = session or auth_client._session
[docs] async def valid_token_set(self): """Check for validity of token, and refresh if none or expired.""" is_valid = False if self._auth_client.token: # Account for a token near expiration now = datetime.datetime.utcnow() skew = datetime.timedelta(seconds=60) if self._auth_client.expiry > (now + skew): is_valid = True return is_valid
[docs] async def request(self, method, url, params=None, headers=None, data=None, json=None, token_refresh_attempts=2, **kwargs): """Make an asynchronous HTTP request. Args: method (str): HTTP method to use for the request. url (str): URL to be requested. params (dict): (optional) Query parameters for the request. Defaults to ``None``. headers (dict): (optional) HTTP headers to send with the request. Headers pass through to the request will include :attr:`DEFAULT_REQUEST_HEADERS`. data (obj): (optional) A dictionary, bytes, or file-like object to send in the body of the request. json (obj): (optional) Any json compatible python object. NOTE: json and body parameters cannot be used at the same time. token_refresh_attempts (int): (optional) Number of attempts a token refresh should be performed. Returns: (str) HTTP response body. Raises: :exc:`.GCPHTTPError`: if any exception occurred, specifically a :exc:`.GCPHTTPResponseError`, if the exception is associated with a response status code. """ if all([data, json]): msg = ('"data" and "json" request parameters can not be used ' 'at the same time') logging.warn(msg) raise exceptions.GCPHTTPError(msg) req_headers = headers or {} req_headers.update(_utils.DEFAULT_REQUEST_HEADERS) req_kwargs = { 'params': params, 'headers': req_headers, } if data: req_kwargs['data'] = data if json: req_kwargs['json'] = json if token_refresh_attempts: if not await self.valid_token_set(): await self._auth_client.refresh_token() token_refresh_attempts -= 1 req_headers.update( {'Authorization': f'Bearer {self._auth_client.token}'} ) request_id = kwargs.get('request_id', uuid.uuid4()) logging.debug(_utils.REQ_LOG_FMT.format( request_id=request_id, method=method.upper(), url=url, kwargs=req_kwargs)) try: async with self._session.request(method, url, **req_kwargs) as resp: log_kw = { 'request_id': request_id, 'method': method.upper(), 'url': resp.url, 'status': resp.status, 'reason': resp.reason } logging.debug(_utils.RESP_LOG_FMT.format(**log_kw)) if resp.status in REFRESH_STATUS_CODES: logging.warning( f'[{request_id}] HTTP Status Code {resp.status}' f' returned requesting {resp.url}: {resp.reason}') if token_refresh_attempts: f'[{request_id}] Attempting request to {resp.url} ' 'again.') return await self.request( method, url, token_refresh_attempts=token_refresh_attempts, request_id=request_id, **req_kwargs) logging.warning( f'[{request_id}] Max attempts refreshing auth token ' f'exhausted while requesting {resp.url}') resp.raise_for_status() return await resp.text() except aiohttp.ClientResponseError as e: # bad HTTP status; avoid leaky abstractions and wrap HTTP errors # with our own msg = f'[{request_id}] HTTP error response from {resp.url}: {e}' logging.error(msg, exc_info=e) raise exceptions.GCPHTTPResponseError(msg, resp.status) except exceptions.GCPHTTPResponseError as e: # from recursive call raise e except Exception as e: msg = f'[{request_id}] Request call failed: {e}' logging.error(msg, exc_info=e) raise exceptions.GCPHTTPError(msg)
[docs] async def get_json(self, url, json_callback=None, **kwargs): """Get a URL and return its JSON response. Args: url (str): URL to be requested. json_callback (func): Custom JSON loader function. Defaults to :meth:`json.loads`. kwargs (dict): Additional arguments to pass through to the request. Returns: response body returned by :func:`json_callback` function. """ if not json_callback: json_callback = json.loads response = await self.request(method='get', url=url, **kwargs) return json_callback(response)
[docs] async def get_all(self, url, params=None): """Aggregate data from all pages of an API query. Args: url (str): Google API endpoint URL. params (dict): (optional) URL query parameters. Returns: list: Parsed JSON query response results. """ if not params: params = {} items = [] next_page_token = None while True: if next_page_token: params['pageToken'] = next_page_token response = await self.get_json(url, params=params) items.append(response) next_page_token = response.get('nextPageToken') if not next_page_token: break return items