Source code for runhouse.resources.secrets.provider_secrets.aws_secret

import configparser
import copy
import os

from typing import Dict

from runhouse.resources.secrets.provider_secrets.provider_secret import ProviderSecret
from runhouse.resources.secrets.utils import _check_file_for_mismatches
from runhouse.utils import create_local_dir


[docs]class AWSSecret(ProviderSecret): """ .. note:: To create an AWSSecret, please use the factory method :func:`provider_secret` with ``provider="aws"``. """ _PROVIDER = "aws" _DEFAULT_CREDENTIALS_PATH = "~/.aws/credentials" _DEFAULT_ENV_VARS = { "access_key": "AWS_ACCESS_KEY_ID", "secret_key": "AWS_SECRET_ACCESS_KEY", } @staticmethod def from_config(config: dict, dryrun: bool = False, _resolve_children: bool = True): return AWSSecret(**config, dryrun=dryrun) def _write_to_file( self, path: str, values: Dict, overwrite: bool = False, write_config: bool = True, ): new_secret = copy.deepcopy(self) if not _check_file_for_mismatches( path, self._from_path(path), values, overwrite ): parser = configparser.ConfigParser() section_name = "default" parser.add_section(section_name) parser.set( section=section_name, option="aws_access_key_id", value=values["access_key"], ) parser.set( section=section_name, option="aws_secret_access_key", value=values["secret_key"], ) full_path = create_local_dir(path) with open(full_path, "w+") as f: parser.write(f) if write_config: new_secret._add_to_rh_config(path) new_secret._values = None new_secret.path = path return new_secret def _from_path(self, path: str): config = configparser.ConfigParser() if path and os.path.exists(os.path.expanduser(path)): config.read(os.path.expanduser(path)) else: return {} section_name = "default" access_key = config[section_name]["aws_access_key_id"] secret_key = config[section_name]["aws_secret_access_key"] return { "access_key": access_key, "secret_key": secret_key, }