Source code for ecs_composex.kinesis.kinesis_stack

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Module to handle import/create AWS Kinesis Data Streams
"""
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.mods_manager import XResourceModule, ModManager

from botocore.exceptions import ClientError
from compose_x_common.aws.kinesis import KINESIS_STREAM_ARN_RE
from compose_x_common.compose_x_common import attributes_to_mapping, keyisset
from troposphere import GetAtt, Ref
from troposphere.kinesis import Stream as CfnStream

from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.compose.x_resources.api_x_resources import ApiXResource
from ecs_composex.kinesis.kinesis_params import STREAM_ARN, STREAM_ID, STREAM_KMS_KEY_ID
from ecs_composex.kinesis.kinesis_template import create_streams_template
from ecs_composex.kinesis_firehose.kinesis_firehose_stack import DeliveryStream
from ecs_composex.resource_settings import handle_resource_to_services

from .kcl_helpers import add_cloudwatch_metric_data_permission, add_dynamodb_permissions
from .kinesis_kinesis_firehose import kinesis_to_firehose


[docs]def get_stream_config(stream, account_id, resource_id): """ Function to get the configuration of KMS Stream from API :param Stream stream: :param str account_id: :param str resource_id: :return: """ client = stream.lookup_session.client("kinesis") stream_mapping = { STREAM_ARN: "StreamDescription::StreamARN", STREAM_ID: "StreamDescription::StreamName", STREAM_KMS_KEY_ID: "StreamDescription::KeyId", } try: stream_r = client.describe_stream(StreamName=resource_id) stream_config = attributes_to_mapping(stream_r, stream_mapping) return stream_config except client.exceptions.ResourceNotFoundException: return None except ClientError as error: LOG.error(error)
[docs]class Stream(ApiXResource): """ Class to represent a Kinesis Stream """ def __init__( self, name, definition, module: XResourceModule, settings: ComposeXSettings ): super().__init__( name, definition, module, settings, ) self.arn_parameter = STREAM_ARN self.ref_parameter = STREAM_ID self.cloud_control_attributes_mapping = { STREAM_ARN: "Arn", STREAM_ID: "Name", STREAM_KMS_KEY_ID: "StreamEncryption::KeyId", } self.support_defaults = True
[docs] def init_outputs(self): self.output_properties = { STREAM_ID: (self.logical_name, self.cfn_resource, Ref, None), STREAM_ARN: ( f"{self.logical_name}{STREAM_ARN.title}", self.cfn_resource, GetAtt, STREAM_ARN.return_value, ), }
[docs] def to_ecs( self, settings: ComposeXSettings, modules: ModManager, root_stack: ComposeXStack = None, targets_overrides: list = None, ) -> None: """ Maps API only based resource to ECS Services """ LOG.info(f"{self.module.res_key}.{self.name} - Linking to services") handle_resource_to_services( settings, self, arn_parameter=self.arn_parameter, nested=False, targets_overrides=targets_overrides, ) if self.parameters and keyisset("SetupIAMForKCL", self.parameters): self.setup_iam_for_kcl(settings) if self.predefined_resource_service_scaling_function: self.predefined_resource_service_scaling_function(self, settings)
[docs] def setup_iam_for_kcl(self, settings: ComposeXSettings) -> None: """When SetupIAMForKCL is set in MacroParameters, grant IAM permissions access to other resources""" macro_param = self.parameters["SetupIAMForKCL"] for family_name, kcl_def in macro_param.items(): print(f"Granting IAM Permissions for {family_name}") if family_name not in self.services: print( f"Family {family_name} wasn't set as a consumer/producer to this table" ) continue _family = settings.families[family_name] if isinstance(kcl_def, bool): add_cloudwatch_metric_data_permission(_family) add_dynamodb_permissions(_family, True) elif isinstance(kcl_def, dict): if keyisset("CloudWatchPutMetricData", kcl_def): add_cloudwatch_metric_data_permission(_family) if keyisset("DynamoDB", kcl_def): add_dynamodb_permissions(_family, kcl_def["DynamoDB"]) else: raise TypeError( f"kcl_def is {type(kcl_def)}. Expected one of", [bool, dict] )
[docs] def handle_x_dependencies( self, settings: ComposeXSettings, root_stack: ComposeXStack ) -> None: """ Updates other resources and replace the values for `x-kinesis` wherever applicable. :param settings: :param root_stack: :return: """ for resource in settings.get_x_resources(include_mappings=False): if not resource.cfn_resource: continue if not resource.stack: LOG.debug( f"resource {resource.name} has no `stack` attribute defined. Skipping" ) continue mappings = [(DeliveryStream, kinesis_to_firehose)] for target in mappings: if isinstance(resource, target[0]) or issubclass( type(resource), target[0] ): target[1]( self, resource, resource.stack, settings, )
[docs]def resolve_lookup( lookup_resources: list[Stream], settings: ComposeXSettings, module: XResourceModule ) -> None: """ Lookup AWS Kinesis streams and creates CFN Mappings """ if not keyisset(module.mapping_key, settings.mappings): settings.mappings[module.mapping_key] = {} for resource in lookup_resources: LOG.info( f"{resource.module.res_key}.{resource.logical_name} - Looking up AWS Resource" ) resource.lookup_resource( KINESIS_STREAM_ARN_RE, get_stream_config, CfnStream.resource_type, "kinesis:stream", ) LOG.info(f"{module.res_key}.{resource.name} - Matched to {resource.arn}") settings.mappings[module.mapping_key].update( {resource.logical_name: resource.mappings} )
[docs]class XStack(ComposeXStack): """ Class to represent Kinesis Data Streams stack """ def __init__( self, title, settings: ComposeXSettings, module: XResourceModule, **kwargs ): if module.lookup_resources: resolve_lookup(module.lookup_resources, settings, module) if module.new_resources: stack_template = create_streams_template(module.new_resources, settings) super().__init__(title, stack_template, **kwargs) if not hasattr(self, "DeletionPolicy"): setattr(self, "DeletionPolicy", module.module_deletion_policy) else: self.is_void = True for resource in module.resources_list: resource.stack = self