# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Module for the XStack SQS
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ecs_composex.common.settings import ComposeXSettings
from ecs_composex.mods_manager import XResourceModule
from troposphere import GetAtt, Ref
from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import add_update_mapping, build_template
from ecs_composex.compose.x_resources.api_x_resources import ApiXResource
from ecs_composex.sqs.sqs_ecs_scaling import handle_service_scaling
from ecs_composex.sqs.sqs_helpers import resolve_lookup
from ecs_composex.sqs.sqs_params import SQS_ARN, SQS_KMS_KEY, SQS_NAME, SQS_URL
from ecs_composex.sqs.sqs_template import render_new_queues
[docs]class Queue(ApiXResource):
"""
Class to represent a SQS Queue
"""
def __init__(
self,
name: str,
definition: dict,
module: XResourceModule,
settings: ComposeXSettings,
):
self.redrive_queue = None
super().__init__(name, definition, module, settings)
self.kms_arn_attr = SQS_KMS_KEY
self.arn_parameter = SQS_ARN
self.ref_parameter = SQS_URL
self.predefined_resource_service_scaling_function = handle_service_scaling
self.support_defaults = True
[docs] def init_outputs(self):
"""
Init output properties for a new resource
"""
self.output_properties = {
SQS_URL: (self.logical_name, self.cfn_resource, Ref, None, "Url"),
SQS_ARN: (
f"{self.logical_name}{SQS_ARN.return_value}",
self.cfn_resource,
GetAtt,
SQS_ARN.return_value,
"Arn",
),
SQS_NAME: (
f"{self.logical_name}{SQS_NAME.return_value}",
self.cfn_resource,
GetAtt,
SQS_NAME.return_value,
"QueueName",
),
}
[docs] def handle_x_dependencies(
self, settings: ComposeXSettings, root_stack: ComposeXStack
) -> None:
from ecs_composex.s3.s3_bucket import Bucket
from .sqs_s3 import s3_to_sqs_notifications
from .sqs_sqs import sqs_to_sqs
for resource in settings.get_x_resources(include_mappings=True):
if resource == self:
continue
if not resource.cfn_resource:
continue
if not resource.stack:
LOG.debug(
f"resource {resource.name} has no `stack` attribute defined. Skipping"
)
continue
mappings = [(Bucket, s3_to_sqs_notifications), (type(self), sqs_to_sqs)]
for target in mappings:
if isinstance(resource, target[0]) or issubclass(
type(resource), target[0]
):
target[1](
self,
resource,
settings,
)
[docs]class XStack(ComposeXStack):
"""
Class to handle SQS Root stack related actions
"""
def __init__(
self, title: str, settings: ComposeXSettings, module: XResourceModule, **kwargs
):
"""
:param str title: Name of the stack
:param ecs_composex.common.settings.ComposeXSettings settings:
:param dict kwargs:
"""
if module.lookup_resources:
resolve_lookup(module.lookup_resources, settings, module)
if module.new_resources:
template = build_template("SQS template generated by ECS Compose-X")
if module.lookup_resources:
add_update_mapping(
template, module.mapping_key, settings.mappings[module.mapping_key]
)
super().__init__(title, stack_template=template, **kwargs)
render_new_queues(settings, module.new_resources, self, template)
if not hasattr(self, "DeletionPolicy"):
setattr(self, "DeletionPolicy", module.module_deletion_policy)
else:
self.is_void = True
for resource in module.resources_list:
resource.stack = self