Source code for ecs_composex.kms.kms_stack

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Manage Creation/Deletion of AWS KMS Keys
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.mods_manager import XResourceModule

from botocore.exceptions import ClientError
from compose_x_common.aws.kms import KMS_KEY_ARN_RE
from compose_x_common.compose_x_common import attributes_to_mapping, keyisset
from troposphere import AWS_ACCOUNT_ID, AWS_PARTITION, GetAtt, Ref, Sub
from troposphere.kms import Alias, Key

from ecs_composex.common.cfn_conditions import define_stack_name
from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import build_template
from ecs_composex.compose.x_resources.api_x_resources import ApiXResource
from ecs_composex.compose.x_resources.environment_x_resources import (
    AwsEnvironmentResource,
)
from ecs_composex.kinesis_firehose.kinesis_firehose_stack import DeliveryStream
from ecs_composex.kms import metadata
from ecs_composex.kms.kms_ecs_cluster import handle_ecs_cluster
from ecs_composex.kms.kms_kinesis_firehose import kms_to_firehose
from ecs_composex.kms.kms_params import KMS_KEY_ALIAS_NAME, KMS_KEY_ARN, KMS_KEY_ID
from ecs_composex.kms.kms_s3 import handle_bucket_kms
from ecs_composex.kms.kms_sqs import handle_queue_kms
from ecs_composex.kms.kms_template import create_kms_template
from ecs_composex.resources_import import import_record_properties
from ecs_composex.s3.s3_bucket import Bucket
from ecs_composex.sqs.sqs_stack import Queue


[docs]def get_key_config(key, account_id: str, resource_id: str) -> dict | None: """ :param KmsKey key: :param str account_id: unused :param str resource_id: unused :return: """ key_attributes_mappings = { KMS_KEY_ARN: "KeyMetadata::Arn", KMS_KEY_ID: "KeyMetadata::KeyId", } client = key.lookup_session.client("kms") try: key_desc = client.describe_key(KeyId=key.arn) key_attributes = attributes_to_mapping(key_desc, key_attributes_mappings) key.manager = key_desc["KeyMetadata"]["KeyManager"] try: aliases_r = client.list_aliases(KeyId=key_attributes[KMS_KEY_ID]) if aliases_r["Aliases"]: key_attributes[KMS_KEY_ALIAS_NAME] = aliases_r["Aliases"][0][ "AliasName" ] except client.exceptions.NotFoundException: LOG.debug(f"{key.module.res_key}.{key.name} - No KMS Key Alias.") return key_attributes except client.exceptions.QueueDoesNotExist: return None except ClientError as error: LOG.error(error) raise
[docs]def define_default_key_policy() -> dict: """ Function to return the default KMS management policy allowing root account access. :return: policy :rtype: dict """ policy = { "Version": "2012-10-17", "Id": "auto-secretsmanager-1", "Statement": [ { "Sid": "Allow direct access to key metadata to the account", "Effect": "Allow", "Principal": { "AWS": Sub( f"arn:${{{AWS_PARTITION}}}:iam::${{{AWS_ACCOUNT_ID}}}:root" ) }, "Action": ["kms:*"], "Resource": "*", "Condition": { "StringEquals": {"kms:CallerAccount": Ref(AWS_ACCOUNT_ID)} }, } ], } return policy
[docs]class KmsKey(AwsEnvironmentResource, ApiXResource): """ Class to represent a KMS Key """ def __init__( self, name: str, definition: dict, module: XResourceModule, settings: ComposeXSettings, ): self.manager = "CUSTOMER" super().__init__(name, definition, module, settings) self.arn_parameter = KMS_KEY_ARN self.ref_parameter = KMS_KEY_ID self.support_defaults = True @property def is_cmk(self): if self.manager == "CUSTOMER": return True return False
[docs] def init_outputs(self): self.output_properties = { KMS_KEY_ID: ( f"{self.logical_name}{KMS_KEY_ID.title}", self.cfn_resource, Ref, None, ), KMS_KEY_ARN: ( f"{self.logical_name}{KMS_KEY_ARN.return_value}", self.cfn_resource, GetAtt, KMS_KEY_ARN.return_value, ), }
[docs] def define_kms_key(self): """ Method to set the KMS Key """ if not self.properties: props = { "Description": Sub( f"{self.name} created in ${{STACK_NAME}}", STACK_NAME=define_stack_name(), ), "Enabled": True, "EnableKeyRotation": True, "KeyUsage": "ENCRYPT_DECRYPT", "PendingWindowInDays": 7, } else: props = import_record_properties(self.properties, Key) if not keyisset("KeyPolicy", props): props.update({"KeyPolicy": define_default_key_policy()}) props.update({"Metadata": metadata}) LOG.debug(props) self.cfn_resource = Key(self.logical_name, **props)
[docs] def handle_key_settings(self, template): """ Method to add to the template for additional KMS key related resources. :param troposphere.Template template: """ if self.parameters and keyisset("Alias", self.parameters): alias_name = self.parameters["Alias"] if not (alias_name.startswith("alias/") or alias_name.startswith("aws")): alias_name = Sub( f"alias/${{STACK_NAME}}/{alias_name}", STACK_NAME=define_stack_name(template), ) elif alias_name.startswith("alias/aws") or alias_name.startswith("aws"): raise ValueError(f"Alias {alias_name} cannot start with alias/aws.") Alias( f"{self.logical_name}Alias", template=template, AliasName=alias_name, TargetKeyId=Ref(self.cfn_resource), Metadata=metadata, )
[docs] def handle_x_dependencies(self, settings, root_stack=None) -> None: """ WIll go over all the new resources to create in the execution and search for properties that can be updated with itself :param ecs_composex.common.settings.ComposeXSettings settings: :param ComposeXStack root_stack: Not used. Present for general compatibility """ handle_ecs_cluster(settings, self) for resource in settings.get_x_resources(include_mappings=False): if not resource.cfn_resource: continue if not resource.stack: LOG.debug( f"resource {resource.name} has no `stack` attribute defined. Skipping" ) continue mappings = [ (Bucket, handle_bucket_kms), (Queue, handle_queue_kms), (DeliveryStream, kms_to_firehose), ] for target in mappings: if isinstance(resource, target[0]) or issubclass( type(resource), target[0] ): target[1]( self, resource, resource.stack, settings, )
[docs]class XStack(ComposeXStack): """ Class for KMS Root stack """ def __init__( self, title, settings: ComposeXSettings, module: XResourceModule, **kwargs ): if module.new_resources: stack_template = build_template("Root template for KMS") super().__init__(title, stack_template, **kwargs) create_kms_template(stack_template, module.new_resources, self) if not hasattr(self, "DeletionPolicy"): setattr(self, "DeletionPolicy", module.module_deletion_policy) else: self.is_void = True if module.lookup_resources: if not keyisset(module.mapping_key, settings.mappings): settings.mappings[module.mapping_key] = {} for resource in module.lookup_resources: resource.lookup_resource( KMS_KEY_ARN_RE, get_key_config, Key.resource_type, "kms:key" ) settings.mappings[module.mapping_key].update( {resource.logical_name: resource.mappings} ) for resource in module.resources_list: resource.stack = self