Source code for ecs_composex.sqs.sqs_stack

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Module for the XStack SQS
"""
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.mods_manager import XResourceModule

from troposphere import GetAtt, Ref

from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import add_update_mapping, build_template
from ecs_composex.compose.x_resources.api_x_resources import ApiXResource
from ecs_composex.sqs.sqs_ecs_scaling import handle_service_scaling
from ecs_composex.sqs.sqs_helpers import resolve_lookup
from ecs_composex.sqs.sqs_params import SQS_ARN, SQS_KMS_KEY, SQS_NAME, SQS_URL
from ecs_composex.sqs.sqs_template import render_new_queues


[docs]class Queue(ApiXResource): """ Class to represent a SQS Queue """ def __init__( self, name: str, definition: dict, module: XResourceModule, settings: ComposeXSettings, ): self.redrive_queue = None super().__init__(name, definition, module, settings) self.kms_arn_attr = SQS_KMS_KEY self.arn_parameter = SQS_ARN self.ref_parameter = SQS_URL self.predefined_resource_service_scaling_function = handle_service_scaling self.support_defaults = True
[docs] def init_outputs(self): """ Init output properties for a new resource """ self.output_properties = { SQS_URL: (self.logical_name, self.cfn_resource, Ref, None, "Url"), SQS_ARN: ( f"{self.logical_name}{SQS_ARN.return_value}", self.cfn_resource, GetAtt, SQS_ARN.return_value, "Arn", ), SQS_NAME: ( f"{self.logical_name}{SQS_NAME.return_value}", self.cfn_resource, GetAtt, SQS_NAME.return_value, "QueueName", ), }
[docs] def handle_x_dependencies( self, settings: ComposeXSettings, root_stack: ComposeXStack ) -> None: from ecs_composex.s3.s3_bucket import Bucket from .sqs_s3 import s3_to_sqs_notifications from .sqs_sqs import sqs_to_sqs for resource in settings.get_x_resources(include_mappings=True): if resource == self: continue if not resource.cfn_resource: continue if not resource.stack: LOG.debug( f"resource {resource.name} has no `stack` attribute defined. Skipping" ) continue mappings = [(Bucket, s3_to_sqs_notifications), (type(self), sqs_to_sqs)] for target in mappings: if isinstance(resource, target[0]) or issubclass( type(resource), target[0] ): target[1]( self, resource, settings, )
[docs]class XStack(ComposeXStack): """ Class to handle SQS Root stack related actions """ def __init__( self, title: str, settings: ComposeXSettings, module: XResourceModule, **kwargs ): """ :param str title: Name of the stack :param ecs_composex.common.settings.ComposeXSettings settings: :param dict kwargs: """ if module.lookup_resources: resolve_lookup(module.lookup_resources, settings, module) if module.new_resources: template = build_template("SQS template generated by ECS Compose-X") if module.lookup_resources: add_update_mapping( template, module.mapping_key, settings.mappings[module.mapping_key] ) super().__init__(title, stack_template=template, **kwargs) render_new_queues(settings, module.new_resources, self, template) if not hasattr(self, "DeletionPolicy"): setattr(self, "DeletionPolicy", module.module_deletion_policy) else: self.is_void = True for resource in module.resources_list: resource.stack = self