Source code for transcriptic.config

import http.client as http_client
import inspect
import io
import json
import logging
import os
import platform
import time
import warnings
import zipfile

import requests
import transcriptic

from Crypto.PublicKey import RSA

from . import routes
from .auth import AuthSession, StrateosBearerAuth, StrateosSign
from .util import is_valid_jwt_token
from .version import __version__


try:
    import magic
except ImportError:
    warnings.warn(
        "`python-magic` is recommended. You may be missing some system-level "
        "dependencies if you have already pip-installed it.\n"
        "Please refer to https://github.com/ahupp/python-magic#installation "
        "for more installation instructions."
    )


def initialize_default_session():
    """
    Initialize a default `requests.Session()` object which can be used for
    requests into the tx web api.
    """
    session = AuthSession()
    session.headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "User-Agent": f"txpy/{__version__} "
        f"({platform.python_implementation()}/"
        f"{platform.python_version()}; "
        f"{platform.system()}/{platform.release()}; "
        f"{platform.machine()}; {platform.architecture()[0]})",
    }
    return session


[docs]class Connection(object): """ A Connection object is the object used for communicating with Transcriptic. Local usage: This is most easily instantiated by using the `from_file` function after calling `transcriptic login` from the command line. .. code-block:: shell :caption: shell $ transcriptic login Email: me@example.com Password: Logged in as me@example.com (example-lab) .. code-block:: python :caption: python from transcriptic.config import Connection api = Connection.from_file("~/.transcriptic") For those using Jupyter notebooks on secure.strateos.com (beta), a Connection object is automatically instantiated as api. .. code-block:: python :caption: python from transcriptic import api The `api` object can then be used for making any api calls. It is recommended to use the objects in `transcriptic.objects` since that wraps the response in a more friendly format. Example Usage: .. code-block:: python :caption: python api.projects() api.runs(project_id="p123456789") If you have multiple organizations and would like to switch to a specific organization, or if you would like to auto-load certain projects, you can set it directly by assigning to the corresponding variable. Example Usage: .. code-block:: python :caption: python api.organization_id = "my_other_org" api.project_id = "p123" """ def __init__( self, email=None, token=None, organization_id=None, api_root="https://secure.strateos.com", cookie=None, verbose=False, analytics=True, user_id="default", feature_groups=[], rsa_key=None, session=None, bearer_token=None, ): # Initialize environment args used for computing routes self.env_args = dict() self.api_root = api_root # Initialize session headers if session is None: session = initialize_default_session() self.session = session self._bearer_token = None # Initialize RSA props self._rsa_key = None self._rsa_key_path = None self._rsa_secret = None # Set/Load rsa_key from argument as string or Path self.rsa_key = rsa_key # NB: These many setattr calls update self.session.headers # cookie authentication is mutually exclusive from token authentication self.organization_id = organization_id if cookie: if email is not None or token is not None: warnings.warn( "Cookie and token authentication is mutually " "exclusive. Ignoring email and token" ) self.session.headers["X-User-Email"] = None self.session.headers["X-User-Token"] = None self.cookie = cookie self.update_session_auth(use_signature=False) else: if cookie is not None: warnings.warn( "Cookie and token authentication is mutually " "exclusive. Ignoring cookie" ) self.session.headers["Cookie"] = None self.email = email if bearer_token is not None: if token is not None: warnings.warn( "User token and bearer token authentication" "is mutually exclusive. Ignoring user token" ) self.bearer_token = bearer_token elif token is not None: self.token = token self.update_session_auth() # Initialize feature groups self.feature_groups = feature_groups # Initialize CLI parameters self.verbose = verbose self.analytics = analytics self.user_id = user_id transcriptic.api = self
[docs] @staticmethod def from_file(path): """Loads connection from file""" config_path = os.path.expanduser(path) with open(config_path) as f: cfg = json.load(f) expected_keys = set( ("email", "token", "organization_id", "api_root", "analytics", "user_id") ) keys = set(cfg.keys()) if not keys.issuperset(expected_keys): raise OSError( f"Key(s) not found in configuration file ({config_path}) " f"Missing {repr(expected_keys - keys)}" ) return Connection(**cfg)
[docs] @staticmethod def from_default_config(): """ Load the default configuration file from the home directory of the current user and return a Connection instance that is costructed from it. """ return Connection.from_file("~/.transcriptic")
@property def api_root(self): try: return self.env_args["api_root"] except (NameError, KeyError): raise ValueError("api_root is not set.") @api_root.setter def api_root(self, value): self.update_environment(api_root=value) @property def organization_id(self): try: return self.session.headers["X-Organization-Id"] except (NameError, KeyError): raise ValueError("organization_id is not set.") @organization_id.setter def organization_id(self, value): self.update_headers(**{"X-Organization-Id": value}) self.update_environment(org_id=value) @property def project_id(self): try: return self.env_args["project_id"] except (NameError, KeyError): raise ValueError("project_id is not set.") @project_id.setter def project_id(self, value): self.update_environment(project_id=value) @property def email(self): try: return self.session.headers["X-User-Email"] except (NameError, KeyError): raise ValueError("email is not set.") @email.setter def email(self, value): if self.cookie is not None: warnings.warn( "Cookie and token authentication is mutually " "exclusive. Clearing cookie from headers" ) self.update_headers(**{"Cookie": None}) self.update_headers(**{"X-User-Email": value}) self.update_session_auth() @property def bearer_token(self): return self._bearer_token @bearer_token.setter def bearer_token(self, value): if is_valid_jwt_token(value): self._bearer_token = value else: raise ValueError("Malformed JWT Bearer Token") @property def token(self): try: return self.session.headers["X-User-Token"] except (NameError, KeyError): raise ValueError("token is not set.") @token.setter def token(self, value): if self.cookie is not None: warnings.warn( "Cookie and token authentication is mutually " "exclusive. Clearing cookie from headers" ) self.update_headers(**{"Cookie": None}) self.update_headers(**{"X-User-Token": value}) @property def cookie(self): try: return self.session.headers["Cookie"] except (NameError, KeyError): return ValueError("cookie is not set.") @cookie.setter def cookie(self, value): if self.email is not None or self.token is not None: warnings.warn( "Cookie and token authentication is mutually " "exclusive. Clearing email and token from headers" ) self.update_headers(**{"X-User-Email": None, "X-User-Token": None}) self.update_headers(**{"Cookie": value}) @property def rsa_key(self): return self._rsa_key @rsa_key.setter def rsa_key(self, value): self._rsa_key = value self.load_rsa_secret()
[docs] def get_container(self, container_id): route = self.get_route( "get_container", org_id=self.organization_id, container_id=container_id ) return self.get(route)
[docs] def save(self, path): """Saves current connection into specified file, used for CLI""" with open(os.path.expanduser(path), "w") as f: config_params = { "email": self.email, "token": self.token, "organization_id": self.organization_id, "api_root": self.api_root, "analytics": self.analytics, "user_id": self.user_id, "feature_groups": self.feature_groups, } # We dont want to save a string key, only a path if self._rsa_key is not None: config_params["rsa_key"] = self._rsa_key_path f.write(json.dumps(config_params, indent=2))
[docs] def load_rsa_secret(self): key_string_or_path = self._rsa_key if key_string_or_path is None: self._rsa_key_path = None self._rsa_secret = None else: try: # First try loading it as a key key = RSA.import_key(key_string_or_path) self._rsa_key_path = None self._rsa_secret = key.export_key() except ValueError: # Then try as a Path key_path = os.path.abspath(os.path.expanduser(key_string_or_path)) with open(key_path, "rb") as key_file: key = RSA.import_key(key_file.read()) self._rsa_key_path = str(key_path) self._rsa_secret = key.export_key() self.update_session_auth()
[docs] def update_session_auth(self, use_signature=True): if ( use_signature and self._rsa_secret and "X-User-Email" in self.session.headers ): self.session.auth = StrateosSign( self.email, self._rsa_secret, self.api_root ) elif self.bearer_token: self.session.auth = StrateosBearerAuth(self.bearer_token, self.api_root) else: self.session.auth = None
[docs] def update_environment(self, **kwargs): """ Updates environment variables used for computing routes. To remove an existing variable, set value to None. """ self.env_args = dict(self.env_args, **kwargs)
[docs] def update_headers(self, **kwargs): """ Updates session headers To remove an existing variable, set value to None. """ self.session.headers = dict(self.session.headers, **kwargs)
[docs] def url(self, path): """url format helper""" if path.startswith("/"): return f"{self.api_root}{path}" else: return f"{self.api_root}/{self.organization_id}/{path}"
[docs] def preview_protocol(self, protocol): """Post protocol preview""" route = self.get_route("preview_protocol") protocol = _parse_protocol(protocol) err_default = "Unable to preview protocol" return self.post( route, json={"protocol": protocol}, allow_redirects=False, status_response={ "200": lambda resp: resp.json()["key"], "default": lambda resp: RuntimeError(err_default), }, )
[docs] def organizations(self): """Get list of organizations""" route = self.get_route("get_organizations") return self.get(route)
[docs] def get_organization(self, org_id=None): """Get particular organization""" route = self.get_route("get_organization", org_id=org_id) err_404 = f"There was an error fetching the organization {org_id}" resp = self.get( route, status_response={ "200": lambda resp: resp, "404": lambda resp: RuntimeError(err_404), "default": lambda resp: resp, }, ) return resp
[docs] def projects(self): """Get list of projects in organization""" route = self.get_route("get_projects") err_default = ( "There was an error listing the projects in your " "organization. Make sure your login details are correct." ) return self.get( route, status_response={ "200": lambda resp: resp.json()["projects"], "default": lambda resp: RuntimeError(err_default), }, )
[docs] def project(self, project_id=None): """Get particular project""" route = self.get_route("get_project", project_id=project_id) err_default = f"There was an error fetching project {project_id}" return self.get( route, status_response={"default": lambda resp: RuntimeError(err_default)} )
[docs] def runs(self, project_id=None): """Get list of runs in project""" route = self.get_route("get_project_runs", project_id=project_id) err_default = ( f"There was an error fetching the runs in project " f"{project_id}" ) results = [] while True: response = self.get( route, status_response={ "200": lambda resp: resp.json(), "default": lambda resp: RuntimeError(err_default), }, ) results.extend( [{**data["attributes"], "id": data["id"]} for data in response["data"]] ) if "next" in response["links"]: route = response["links"]["next"] else: break return results
[docs] def create_project(self, title): """Create project with given title""" route = self.get_route("create_project") return self.post(route, data=json.dumps({"name": title}))
[docs] def delete_project(self, project_id=None): """Delete project with given project_id""" route = self.get_route("delete_project", project_id=project_id) return self.delete(route, status_response={"200": lambda resp: True})
[docs] def archive_project(self, project_id=None): """Archive project with given project_id""" route = self.get_route("archive_project", project_id=project_id) return self.put( route, data=json.dumps({"project": {"archived": True}}), status_response={"200": lambda resp: True}, )
[docs] def modify_aliquot_properties( self, aliquot_id, set_properties={}, delete_properties=[] ): """ Modify the properties of an alquot: If specified set_properties must be dict mapping {str: str} these properties will be set on the aliquot specified. If specified delete_properties must be a list of string properties which will be deleted on the aliquot. """ route = self.get_route("modify_aliquot_properties", aliquot_id=aliquot_id) return self.put( route, json={ "id": aliquot_id, "data": {"set": set_properties, "delete": delete_properties}, }, )
[docs] def packages(self): """Get list of packages in organization""" route = self.get_route("get_packages") return self.get(route)
[docs] def package(self, package_id=None): """Get package with given package_id""" route = self.get_route("get_package", package_id=package_id) return self.get(route)
[docs] def create_package(self, name, description): """Create package with given name and description""" route = self.get_route("create_package") return self.post( route, data=json.dumps( { "name": "%s%s" % ("com.%s." % self.organization_id, name), "description": description, } ), )
[docs] def delete_package(self, package_id=None): """Delete package with given package_id""" route = self.get_route("delete_package", package_id=package_id) return self.delete(route, status_response={"200": lambda resp: True})
[docs] def post_release(self, data, package_id=None): """Create release with given data and package_id""" route = self.get_route("post_release", package_id=package_id) return self.post(route, data=data)
[docs] def get_release_status(self, package_id=None, release_id=None, timestamp=None): """Get status of current release upload""" route = self.get_route( "get_release_status", package_id=package_id, release_id=release_id, timestamp=timestamp, ) return self.get(route)
[docs] def get_quick_launch(self, project_id=None, quick_launch_id=None): """Get quick launch object""" route = self.get_route( "get_quick_launch", project_id=project_id, quick_launch_id=quick_launch_id ) return self.get(route)
[docs] def create_quick_launch(self, data, project_id=None): """Create quick launch object""" route = self.get_route("create_quick_launch", project_id=project_id) return self.post(route, data=data)
[docs] def launch_protocol(self, params, protocol_id=None): """Launch protocol-id with params""" route = self.get_route("launch_protocol", protocol_id=protocol_id) return self.post(route, data=params)
[docs] def get_launch_request(self, protocol_id=None, launch_request_id=None): """Get launch request id""" route = self.get_route( "get_launch_request", protocol_id=protocol_id, launch_request_id=launch_request_id, ) return self.get(route)
[docs] def resolve_quick_launch_inputs( self, raw_inputs, project_id=None, quick_launch_id=None ): """Resolves `raw_inputs` to `inputs` for quick_launch""" route = self.get_route( "resolve_quick_launch_inputs", project_id=project_id, quick_launch_id=quick_launch_id, ) return self.post(route, json=raw_inputs)
[docs] def get_protocols(self): """Get list of available protocols""" route = self.get_route("get_protocols") return self.get(route)
[docs] def resources(self, query): """Get resources""" route = self.get_route("query_resources", query=query) return self.get(route)
[docs] def inventory(self, query, timeout=30.0, page=0): """Get inventory""" route = self.get_route("query_inventory", query=query, page=page) return self.get(route, timeout=timeout)
[docs] def kits(self, query): """Get kits""" route = self.get_route("query_kits", query=query) return self.get(route)
[docs] def payment_methods(self): route = self.get_route("get_payment_methods") return self.get(route)
[docs] def monitoring_data( self, data_type, instruction_id, grouping=None, start_time=None, end_time=None ): """Get monitoring_data""" route = self.get_route( "monitoring_data", data_type=data_type, instruction_id=instruction_id, grouping=grouping, start_time=start_time, end_time=end_time, ) return self.get(route)
[docs] def raw_image_data(self, data_id=None): """Get raw image data""" route = self.get_route("view_raw_image", data_id=data_id) return self.get(route, status_response={"200": lambda resp: resp}, stream=True)
def _get_object(self, obj_id, obj_type=None): """Helper function for loading objects""" # TODO: Migrate away from deref routes for other object types if obj_type == "dataset": route = self.get_route("dataset_short", data_id=obj_id) else: route = self.get_route("deref_route", obj_id=obj_id) err_404 = f"[404] No object found for ID {obj_id}" return self.get(route, status_response={"404": lambda resp: Exception(err_404)})
[docs] def analyze_run(self, protocol, test_mode=False): """Analyze given protocol""" protocol = _parse_protocol(protocol) if "errors" in protocol: raise AnalysisException( ( "Error%s in protocol:\n%s" % ( ("s" if len(protocol["errors"]) > 1 else ""), "".join( ["- " + e["message"] + "\n" for e in protocol["errors"]] ), ) ) ) def error_string(r): return AnalysisException( "Error%s in protocol:\n%s" % ( ("s" if len(r.json()["protocol"]) > 1 else ""), "".join(["- " + e["message"] + "\n" for e in r.json()["protocol"]]), ) ) return self.post( self.get_route("analyze_run"), data=json.dumps({"protocol": protocol, "test_mode": test_mode}), status_response={"422": lambda resp: error_string(resp)}, )
[docs] def submit_run( self, protocol, project_id=None, title=None, test_mode=False, payment_method_id=None, ): """Submit given protocol""" protocol = _parse_protocol(protocol) payload = { "title": title, "protocol": protocol, "test_mode": test_mode, "payment_method_id": payment_method_id, } data = {k: v for k, v in payload.items() if v is not None} route = self.get_route("submit_run", project_id=project_id) err_404 = ( f"Error: Couldn't create run (404).\n Are you sure the " f"project {self.url(project_id)} exists, and that you have " f"access to it?" ) def err_422(resp): f"Error creating run: {resp.text}" return self.post( route, data=json.dumps(data), status_response={ "404": lambda resp: AnalysisException(err_404), "422": lambda resp: AnalysisException(err_422), }, )
[docs] def analyze_launch_request(self, launch_request_id, test_mode=False): return self.post( self.get_route("analyze_launch_request"), data=json.dumps( {"launch_request_id": launch_request_id, "test_mode": test_mode} ), )
[docs] def submit_launch_request( self, launch_request_id, project_id=None, protocol_id=None, title=None, test_mode=False, payment_method_id=None, predecessor_id=None, ): """Submit specified launch request""" payload = { "title": title, "launch_request_id": launch_request_id, "protocol_id": protocol_id, "test_mode": test_mode, "payment_method_id": payment_method_id, "predecessor_id": predecessor_id, } data = {k: v for k, v in payload.items() if v is not None} return self.post( self.get_route("submit_launch_request", project_id=project_id), data=json.dumps(data), status_response={ "404": lambda resp: AnalysisException( "Error: Couldn't create run (404). \n" "Are you sure the project %s " "exists, and that you have access to it?" % self.url(project_id) ), "422": lambda resp: AnalysisException( f"Error creating run: {resp.text}" ), }, )
[docs] def dataset(self, data_id, key="*"): """Get dataset with given data_id""" route = self.get_route("dataset", data_id=data_id, key=key) return self.get(route)
def _get_uploads_from_key(self, key): """Fetches uploads for a data upload key Parameters ---------- key : str data upload key Returns ------- requests.Response """ return self.get( route=self.get_route(method="get_uploads", key=key), status_response={"200": lambda resp: resp}, stream=True, )
[docs] def attachments(self, data_id): """Fetches all attachments for a given dataset id Parameters ---------- data_id : str dataset id Returns ------- dict(str, bytes) a dict of attachment names and contents """ dataset_route = self.get_route("dataset_short", data_id=data_id) dataset_attachments = self.get(dataset_route).get("attachments") attachment_names = [ os.path.basename(_.get("name", "")) for _ in dataset_attachments ] attachment_contents = [ self._get_uploads_from_key(_.get("key")).content for _ in dataset_attachments ] return dict(zip(attachment_names, attachment_contents))
[docs] def datasets(self, project_id=None, run_id=None, timeout=30.0): """Get datasets belonging to run""" route = self.get_route("datasets", project_id=project_id, run_id=run_id) err_404 = ( f"[404] No run found for ID {run_id}. Please ensure you " f"have the right permissions." ) return self.get( route, status_response={"404": lambda resp: Exception(err_404)}, timeout=timeout, )
[docs] def data_object(self, id): """Fetches a data object by id Parameters ---------- id: str data_object id Returns ------- dict attributes dict """ route = self.get_route("data_object", id=id) attributes = self.get(route).get("data").get("attributes") attributes["id"] = id return attributes
[docs] def data_objects(self, dataset_id): """Fetches all data objects given a dataset id Parameters ---------- dataset_id: str dataset id Returns ------- arr[dict] array of attributes dict """ route_base = self.get_route("data_objects", dataset_id=dataset_id) page = 0 limit = 50 has_more = True results = [] while has_more: route = f"{route_base}&page[limit]={limit}&page[offset]=" f"{page * limit}" response = self.get(route) entities = response.get("data") for entity in entities: attributes = entity["attributes"] attributes["id"] = entity["id"] results.append(attributes) page += 1 if len(entities) != limit: has_more = False return results
[docs] def upload_dataset_from_filepath( self, file_path, title, run_id, analysis_tool, analysis_tool_version ): """ Helper for uploading a file as a dataset to the specified run. Uses `upload_dataset`. .. code-block:: python api.upload_dataset_from_filepath( "my_file.txt", title="my cool dataset", run_id="r123", analysis_tool="cool script", analysis_tool_version="v1.0.0" ) Parameters ---------- file: str Path to file to be uploaded title: str Name of dataset run_id: str Run-id analysis_tool: str, optional Name of tool used for analysis analysis_tool_version: str, optional Version of tool used Returns ------- response: dict JSON-formatted response """ try: file_path = os.path.expanduser(file_path) file_handle = open(file_path, "rb") name = os.path.basename(file_handle.name) except (AttributeError, FileNotFoundError): raise ValueError("'file' has to be a valid filepath") try: content_type = magic.from_file(file_path, mime=True) except NameError: # Handle issues with magic import by not decoding content_type content_type = None return self.upload_dataset( file_handle, name, title, run_id, analysis_tool, analysis_tool_version, content_type, )
[docs] def upload_dataset( self, file_handle, name, title, run_id, analysis_tool, analysis_tool_version, content_type=None, ): """ Uploads a file_handle as a dataset to the specified run. .. code-block:: python # Uploading a data_frame via file_handle, using Py3 import io temp_buffer = io.StringIO() my_df.to_csv(temp_buffer) api.upload_dataset( temp_buffer, name="my_df", title="my cool dataset", run_id="r123", analysis_tool="cool script", analysis_tool_version="v1.0.0" ) Parameters ---------- file_handle: file_handle File handle to be uploaded name: str Dataset filename title: str Name of dataset run_id: str Run-id analysis_tool: str, optional Name of tool used for analysis analysis_tool_version: str, optional Version of tool used content_type: str Type of content uploaded Returns ------- response: dict JSON-formatted response """ upload_id = self.upload_to_uri(file_handle, content_type, title, name) upload_datasets_route = self.get_route("upload_datasets") upload_resp = self.post( upload_datasets_route, json={ "upload_id": upload_id, "file_name": name, "title": title, "run_id": run_id, "analysis_tool": analysis_tool, "analysis_tool_version": analysis_tool_version, }, status_response={ "404": lambda resp: "[404] Please double-check your parameters" " and ensure they are valid." }, ) return upload_resp
[docs] def upload_to_uri(self, file_handle, content_type, title, name): """ Helper for uploading files via the `upload` route Parameters ---------- file_handle: file_handle File handle to be uploaded content_type: str Type of content uploaded title: str Title of content to be uploaded name: str Name of file to be uploaded Returns ------- key: str s3 key """ # NOTE(meawoppl) title argument is unused? # TODO: # Currently, we are passing `0` for file_size as it doesn't really # matter for non multipart uploads, though it would be better to # supply the correct value. data = { "attributes": { "file_name": name, "file_size": 0, "last_modified": int(time.time()), "is_multipart": False, } } uri_route = self.get_route("upload") uri_resp = self.post(uri_route, data=json.dumps({"data": data})) try: upload_id = uri_resp["data"]["id"] upload_uri = uri_resp["data"]["attributes"]["upload_url"] except KeyError: raise RuntimeError("Unexpected payload returned for upload_dataset") if isinstance(file_handle, io.StringIO): try: # io.StringIO instances must be converted to bytes file_handle = io.BytesIO(bytes(file_handle.getvalue(), "utf-8")) except AttributeError: raise ValueError("Unable to convert read buffer to bytes") headers = { "Content-Disposition": f"attachment; filename={name}", "Content-Type": content_type, } headers = {k: v for k, v in headers.items() if v} self.put( upload_uri, data=file_handle, headers=headers, status_response={"200": lambda resp: resp}, ) return upload_id
[docs] def get_zip(self, data_id, file_path=None): """ Get zip file with given data_id. Downloads to memory and returns a Python ZipFile by default. When dealing with larger files where it may not be desired to load the entire file into memory, specifying `file_path` will enable the file to be downloaded locally. Example Usage: .. code-block:: python small_zip_id = 'd12345' small_zip = api.get_zip(small_zip_id) my_big_zip_id = 'd99999' api.get_zip(my_big_zip_id, file_path='big_file.zip') Parameters ---------- data_id: data_id Data id of file to download file_path: Optional[str] Path to file which to save the response to. If specified, will not return ZipFile explicitly. Returns ---------- zip: zipfile.ZipFile A Python ZipFile is returned unless `file_path` is specified """ route = self.get_route("get_data_zip", data_id=data_id) req = self.get(route, status_response={"200": lambda resp: resp}, stream=True) if file_path: with open(file_path, "wb") as f: # Buffer download of data into memory with smaller chunk sizes chunk_sz = 1024 # 1kb chunks for chunk in req.iter_content(chunk_sz): if chunk: f.write(chunk) print(f"Zip file downloaded locally to {file_path}.") else: return zipfile.ZipFile(io.BytesIO(req.content))
[docs] def get_route(self, method, **kwargs): """ Helper function to automatically match and supply required arguments """ route_method = getattr(routes, method) route_method_args, _, _, route_defaults = inspect.getargspec(route_method) if route_defaults: route_method_args = route_method_args[: -len(route_defaults)] # Update loaded argument dict with new arguments which are not None new_args = {k: v for k, v in list(kwargs.items()) if v is not None} arg_dict = dict(self.env_args, **new_args) input_args = [] for arg in route_method_args: if arg_dict[arg]: input_args.append(arg_dict[arg]) else: raise Exception( f"For route: {method}, argument {arg} needs to be provided." ) return route_method( # pylint: disable=no-value-for-parameter *tuple(input_args) )
[docs] def get(self, route, **kwargs): return self._call("get", route, **kwargs)
[docs] def put(self, route, **kwargs): return self._call("put", route, **kwargs)
[docs] def post(self, route, **kwargs): return self._call("post", route, **kwargs)
[docs] def delete(self, route, **kwargs): return self._call("delete", route, **kwargs)
def _req_call(self, method, route, **kwargs): return getattr(self.session, method)(route, **kwargs) def _call(self, method, route, status_response={}, **kwargs): """Base function for handling all requests""" if self.verbose: print(f"{method.upper()}: {route}") return self._handle_response( self._req_call(method, route, **kwargs), **status_response ) def _handle_response(self, response, **kwargs): unauthorized_resp = ( "You are not authorized to execute this command. " "For more information on access " "permissions see the package documentation." ) internal_error_resp = ( "An internal server error has occurred. " "Please contact support for assistance." ) default_status_response = { "200": lambda resp: resp.json(), "201": lambda resp: resp.json(), "401": lambda resp: PermissionError( "[%d] %s" % (resp.status_code, unauthorized_resp) ), "403": lambda resp: PermissionError( "[%d] %s" % (resp.status_code, unauthorized_resp) ), "500": lambda resp: Exception( "[%d] %s" % (resp.status_code, internal_error_resp) ), "default": lambda resp: Exception( "[%d] %s" % (resp.status_code, resp.text) ), } status_response = dict(default_status_response, **kwargs) return_val = status_response.get( str(response.status_code), status_response["default"] ) if isinstance(return_val(response), Exception): raise return_val(response) else: return return_val(response) # NOTE(meawoppl) This is only called externally def _post_analytics(self, client_id=None, event_action=None, event_category="cli"): route = "https://www.google-analytics.com/collect" if not client_id: client_id = self.user_id packet = ( f"v=1&tid=UA-28937242-7&cid={client_id}&t=event&" f"ea={event_action}&ec={event_category}" ) requests.post(route, packet)
class AnalysisException(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message def _parse_protocol(protocol): if isinstance(protocol, dict): return protocol try: from autoprotocol import Protocol except ImportError: raise RuntimeError( "Please install `autoprotocol-python` in order " "to work with Protocol objects" ) if isinstance(protocol, Protocol): return protocol.as_dict()