Source code for ecs_composex.kinesis.kinesis_template

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import NoValue, Tags
from troposphere.kinesis import Stream, StreamEncryption

from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import (
    add_outputs,
    add_resource,
    build_template,
)
from ecs_composex.resources_import import import_record_properties


[docs]def handle_encryption(stream): """ Function to define the encryption :param stream: :return: """ return StreamEncryption( EncryptionType="KMS", KeyId=stream.properties["StreamEncryption"]["KeyId"], )
[docs]def create_new_stream(stream): """ Function to create the new Kinesis stream :param ecs_composex.kinesis.kinesis_stack.Stream stream: :param ecs_composex.common.settings.ComposeXSettings settings: :return: """ props = import_record_properties(stream.properties, Stream) stream_mode = set_else_none("StreamModeDetails", props) if ( keyisset("ShardCount", props) and stream_mode and stream_mode.StreamMode == "ON_DEMAND" ): LOG.warning( "{}.{} - ShardCount can't be set with StreamModeDetails.StreamMode ON_DEMAND." " Setting to AWS::NoValue".format(stream.module.res_key, stream.name) ) props["ShardCount"] = NoValue else: if not keyisset("ShardCount", stream.properties) and ( not stream_mode or (stream_mode and stream_mode.StreamMode == "PROVISIONED") ): LOG.warning( "{}.{} - ShardCount must be set if StreamModeDetails isn't set or is set to PROVISIONED." " Defaulting to 1".format(stream.module.res_key, stream.name) ) props["ShardCount"] = 1 props["Tags"] = Tags(Name=stream.logical_name, ComposeName=stream.name) stream.cfn_resource = Stream(stream.logical_name, **props) stream.init_outputs() stream.generate_outputs()
[docs]def create_streams_template(new_resources, settings): """ Function to create the root template for Kinesis streams :param list<ecs_composex.kinesis.kinesis_stack.Stream> new_resources: :param ecs_composex.common.settings.ComposeXSettings settings: :return: """ root_template = build_template("Root stack for ecs_composex.kinesis") for res in new_resources: create_new_stream(res) add_resource(root_template, res.cfn_resource) add_outputs(root_template, res.outputs) return root_template