Source code for cirun.client

import os

import requests

from cirun.utils import _print_error, _print_error_data

API_ENDPOINT = "https://api.cirun.io/api/v1"
GITHUB_API = "https://api.github.com"
GH_TOKEN_ENV_VAR = "GITHUB_TOKEN"


class CirunAPIException(Exception):
    pass


[docs] class Cirun: """Cirun Client to interact to cirun's API""" def __init__(self, token=None): """ :param token: cirun's API client token """ self.token = token self._get_credentials() self.api_endpoint = os.environ.get('CIRUN_API_ENDPOINT', API_ENDPOINT) def _get_credentials(self): if not self.token: try: token = os.environ['CIRUN_API_KEY'] self.token = token except KeyError: msg = "Could not find CIRUN_API_KEY in environment variables" _print_error_data(msg) raise KeyError(msg) def _headers(self): return { "Content-Type": "application/json", "Authorization": f"Bearer {self.token}" } def _get(self, path, *args, **kwargs): return requests.get(f"{self.api_endpoint}/{path}", headers=self._headers(), *args, **kwargs) def _post(self, path, *args, **kwargs): return requests.post(f"{self.api_endpoint}/{path}", headers=self._headers(), *args, **kwargs) def _put(self, path, *args, **kwargs): return requests.put(f"{self.api_endpoint}/{path}", headers=self._headers(), *args, **kwargs)
[docs] def get_repos(self, print_error=False): """Get all the repositories connected to cirun.""" response = self._get("repo") if response.status_code not in [200, 201]: if print_error: return _print_error(response) return response.json()
[docs] def set_repo( self, name, active=True, print_error=False, installation_id=None ): """ Activate repository for Cirun Parameters ---------- name: str Repository name active: bool ``True`` to activate, ``False`` otherwise. Default is ``True`` installation_id: int Cirun App's Installation ID for the Organization Returns ------- dict """ data = { "repository": name, "active": active } gh_response_json = {} if installation_id: gh_response_json = self.install_github_app(name, installation_id) response = self._post("repo", json=data) if response.status_code not in [200, 201]: if print_error: _print_error(response) response.raise_for_status() response = response.json() if gh_response_json: response = { **response, "github_installation": gh_response_json } return response
def _get_github_repo_id(self, owner, repo): url = f"{GITHUB_API}/repos/{owner}/{repo}" response = requests.get(url) response.raise_for_status() response_json = response.json() return response_json["id"] def install_github_app(self, name, installation_id): owner, repo = name.split("/") repository_id = self._get_github_repo_id(owner=owner, repo=repo) url = f"{GITHUB_API}/user/installations/{installation_id}/repositories/{repository_id}" if not os.environ.get(GH_TOKEN_ENV_VAR): _print_error_data(f"ERROR: Environment variable: '{GH_TOKEN_ENV_VAR}'" f" not found. Unable to install Cirun GitHub App on {name}") return gh_token = os.environ[GH_TOKEN_ENV_VAR] headers = { "Authorization": f"Bearer {gh_token}", "Accept": "application/vnd.github+json", } response = requests.put(url, headers=headers) if response.status_code not in [204, 304]: _print_error(response) response.raise_for_status() response = { "message": f"GitHub Installation done", "status_code": response.status_code } return response def update_access_control(self, org, repository_resource_access): json = { "org": org, "repository_resource_access": repository_resource_access } response = self._put("access-control", json=json) if response.status_code not in [200, 201]: _print_error(response) response.raise_for_status() return response def get_access_control(self, org): response = self._get("access-control", json={"org": org}) if response.status_code != 200: return return response.json() def _create_access_control_repo_resource_data( self, repo, resources, action="add", teams=None, roles=None, users=None, users_from_json=None, policy_args=None, ): repository_resource_access = { "repository": repo, "resources": resources, "action": action, "policy_args": policy_args } repository_resource_access = { **repository_resource_access, "teams": teams, "users": users, "roles": roles, "users_from_json": users_from_json, } return repository_resource_access
[docs] def remove_repo_from_resources(self, org, repo, resources): """ Removes the access to the resource for the repository. Parameters ---------- org: str GitHub Organization repo: str GitHub Repository resources: List[str] List of resources Returns ------- requests.Response """ repository_resource_access = self._create_access_control_repo_resource_data( repo, resources, action="remove", ) return self.update_access_control(org, [repository_resource_access])
[docs] def add_repo_to_resources( self, org, repo, resources, teams=None, roles=None, users=None, users_from_json=None, policy_args=None, ): """ Grants access to the resource for the repository Parameters ---------- org: str GitHub Organization repo: str GitHub Repository resources: List[str] List of resources teams: List[str] List of teams roles: List[str] List of roles users: List[str] List of users users_from_json: List[str] List of users from a json url policy_args: Optional[Dict[str, Any]] Policy arguments, this is a dictionary of key values, currently the only supported argument is ``{"pull_request": True}`` or ``{"pull_request": False}`` Returns ------- requests.Response """ repository_resource_access = self._create_access_control_repo_resource_data( repo, resources, action="add", teams=teams, roles=roles, users=users, users_from_json=users_from_json, policy_args=policy_args ) return self.update_access_control(org, [repository_resource_access])
def _get_repo_policy(self, access_yml, repo): for policy in access_yml["policies"]: if policy['repo'] == repo: return policy['id'] def get_repo_resources(self, org, repo): access_control = self.get_access_control(org) if not access_control: return access_yml = access_control["access_yml"] policy_id = self._get_repo_policy(access_yml, repo) repo_resources = [] for access_item in access_yml["access_control"]: if policy_id in access_item["policies"]: repo_resources.append(access_item["resource"]) return repo_resources
[docs] def clouds(self, print_error=False): """Returns all the connected cloud""" response = self._get("cloud-connect") if response.status_code not in [200, 201]: if print_error: return _print_error(response) return response.json()
[docs] def cloud_connect(self, name, credentials, print_error=False): """ Connect a cloud provider to cirun. Parameters ---------- name: str Name of cloud provider credentials: str Cloud Credentials Returns ------- dict: Response json """ data = { "cloud": name, "credentials": credentials } response = self._post("cloud-connect", json=data) if response.status_code not in [200, 201]: if print_error: _print_error(response) return response.json() return response.json()