Source code for ecs_composex.ecs.service_networking.ingress_helpers

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING

import ecs_composex.common.troposphere_tools

if TYPE_CHECKING:
    from ecs_composex.ecs.ecs_family import ComposeFamily
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.common.stacks import ComposeXStack

from json import dumps

from compose_x_common.compose_x_common import keyisset, keypresent, set_else_none
from troposphere import AWS_ACCOUNT_ID, GetAtt, Ref, Sub
from troposphere.ec2 import SecurityGroupIngress

from ecs_composex.cloudmap.cloudmap_params import RES_KEY as CLOUDMAP_KEY
from ecs_composex.common.cfn_params import Parameter
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import add_parameters
from ecs_composex.ecs.ecs_params import SERVICE_NAME
from ecs_composex.ingress_settings import Ingress
from ecs_composex.vpc.vpc_params import SG_ID_TYPE


[docs]def handle_ext_sources(existing_sources: list, new_sources: list) -> None: """ Adds up external sources if they are not defined yet :param existing_sources: :param new_sources: """ set_ipv4_sources = [ s[Ingress.ipv4_key] for s in existing_sources if keyisset(Ingress.ipv4_key, s) ] for new_s in new_sources: if new_s[Ingress.ipv4_key] not in set_ipv4_sources: existing_sources.append(new_s)
[docs]def handle_aws_sources(existing_sources: list, new_sources: list) -> None: """ Function to handle merge of aws sources between two services for one family :param existing_sources: :param new_sources: :return: """ set_ids = [s["Id"] for s in existing_sources if keyisset("Id", s)] for new_s in new_sources: if keyisset("Id", new_s) and new_s["Id"] not in set_ids: existing_sources.append(new_s)
[docs]def handle_services(existing_sources: list, new_sources: list) -> None: """ Function to merge source services definitions :param list existing_sources: :param list new_sources: :return: """ set_ids = [s["Name"] for s in existing_sources if keyisset("Name", s)] for new_s in new_sources: if new_s["Name"] not in set_ids: existing_sources.append(new_s)
[docs]def handle_ingress_rules(source_config: dict, ingress_config: dict) -> None: valid_keys = [ ("Myself", bool, None), (Ingress.ext_sources_key, list, handle_ext_sources), (Ingress.aws_sources_key, list, handle_aws_sources), (Ingress.services_key, list, handle_services), ] for key in valid_keys: if key[1] is bool: ingress_config[key[0]] = set_else_none( key[0], source_config, alt_value=False, eval_bool=True ) elif keyisset(key[0], source_config) and key[2]: key[2](ingress_config[key[0]], source_config[key[0]])
[docs]def merge_family_network_setting( family, key: str, definition: dict, network: dict, network_config: dict ) -> None: """ Merges a network setting (key) and its definition (definition) with new definition (network) into network_config If the key is x-cloudmap, and is unset, set to value. If another service of the family comes in, comes second. :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param str key: :param dict definition: :param dict network: :param dict network_config: :return: """ if keyisset(key, network) and key == Ingress.master_key: handle_ingress_rules(network[key], network_config[key]) elif keyisset(key, network) and key == CLOUDMAP_KEY: if definition: LOG.warning( family.name, f"x-network.{CLOUDMAP_KEY}", "is already set to", definition, ) else: network_config[CLOUDMAP_KEY] = network[CLOUDMAP_KEY]
[docs]def merge_family_services_networking(family: ComposeFamily) -> dict: """ Merge the different services network configuration definitions :param ecs_composex.ecs.ecs_family.ComposeFamily family: :return: The family network definition :rtype: dict """ network_config = { Ingress.master_key: { "Myself": False, Ingress.ext_sources_key: [], Ingress.aws_sources_key: [], Ingress.services_key: [], }, CLOUDMAP_KEY: {}, } x_network_ingress = [s.x_network for s in family.ordered_services if s.x_network] for network in x_network_ingress: for key, definition in network_config.items(): merge_family_network_setting( family, key, definition, network, network_config ) LOG.debug(family.name) LOG.debug(dumps(network_config, indent=2)) return network_config
[docs]def add_independent_rules( dst_family: ComposeFamily, service_name: str, root_stack: ComposeXStack ) -> None: """ Adds security groups rules in the root stack as both services need to be created (with their SG) before the ingress rule can be defined. :param dst_family: :param service_name: :param root_stack: :return: """ src_service_stack = root_stack.stack_template.resources[service_name] for port in dst_family.service_networking.ports: target_port = set_else_none( "published", port, alt_value=set_else_none("target", port, None) ) if target_port is None: raise ValueError( "Wrong port definition value for security group ingress", port ) ingress_rule = SecurityGroupIngress( f"From{src_service_stack.title}To{dst_family.logical_name}On{target_port}", FromPort=target_port, ToPort=target_port, IpProtocol=port["protocol"], Description=Sub( f"From {src_service_stack.title} to {dst_family.logical_name}" f" on port {target_port}/{port['protocol']}" ), GroupId=GetAtt( dst_family.stack.title, f"Outputs.{dst_family.logical_name}GroupId", ), SourceSecurityGroupId=GetAtt( src_service_stack.title, f"Outputs.{src_service_stack.title}GroupId", ), SourceSecurityGroupOwnerId=Ref(AWS_ACCOUNT_ID), ) if ingress_rule.title not in root_stack.stack_template.resources: root_stack.stack_template.add_resource(ingress_rule)
[docs]def add_dependant_ingress_rules( dst_family: ComposeFamily, dst_family_sg_param: Parameter, src_family: ComposeFamily ) -> None: for port in dst_family.service_networking.ports: target_port = set_else_none( "published", port, alt_value=set_else_none("target", port, None) ) if target_port is None: raise ValueError( "Wrong port definition value for security group ingress", port ) common_args = { "FromPort": target_port, "ToPort": target_port, "IpProtocol": port["protocol"], "SourceSecurityGroupOwnerId": Ref(AWS_ACCOUNT_ID), "Description": Sub( f"From ${{{SERVICE_NAME.title}}} to {dst_family.stack.title} on port {target_port}" ), } src_family.template.add_resource( SecurityGroupIngress( f"From{src_family.logical_name}To{dst_family.stack.title}On{target_port}", SourceSecurityGroupId=GetAtt( src_family.service_networking.security_group, "GroupId" ), GroupId=Ref(dst_family_sg_param), **common_args, ) )
[docs]def set_compose_services_ingress( root_stack, dst_family: ComposeFamily, families: list, settings: ComposeXSettings ) -> None: """ Function to crate SG Ingress between two families / services. Presently, the ingress rules are set after all services have been created :param ecs_composex.common.stacks.ComposeXStack root_stack: :param ecs_composex.ecs.ecs_family.ComposeFamily dst_family: :param list families: The list of family names. :param ecs_composex.common.settings.ComposeXSettings settings: """ for service in dst_family.service_networking.ingress.services: service_name = service["Name"] if service_name not in families: raise KeyError( f"The service {service_name} is not among the services created together. Valid services are", families, ) if not keypresent("DependsOn", service): add_independent_rules(dst_family, service_name, root_stack) else: src_family = settings.families[service_name] if dst_family.stack.title not in src_family.stack.DependsOn: src_family.stack.DependsOn.append(dst_family.stack.title) dst_family_sg_param = Parameter( f"{dst_family.stack.title}GroupId", Type=SG_ID_TYPE ) add_parameters(src_family.template, [dst_family_sg_param]) src_family.stack.Parameters.update( { dst_family_sg_param.title: GetAtt( dst_family.stack.title, f"Outputs.{dst_family.logical_name}GroupId", ), } ) add_dependant_ingress_rules(dst_family, dst_family_sg_param, src_family)
[docs]def handle_str_cloudmap_config( family: ComposeFamily, family_mappings: dict, cloudmap_config: str, ports: list ) -> None: """ Handle cloudmap config when config is set as str :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param dict family_mappings: :param str cloudmap_config: :param list ports: """ if cloudmap_config not in family_mappings.keys(): family_mappings[cloudmap_config] = { "Port": ports[0], "Name": family.family_hostname, } else: LOG.warning( f"{family.name}.x-network.x-cloudmap - {cloudmap_config} is set multiple times. " f"Preserving {family_mappings[cloudmap_config]}" )
[docs]def handle_dict_cloudmap_config( family: ComposeFamily, family_mappings: dict, cloudmap_config: dict, ports: list ) -> None: """ Handles cloudmap config settings when set as a mapping/dict :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param dict family_mappings: :param dict cloudmap_config: :param list ports: """ for map_name, config in cloudmap_config.items(): if map_name in family_mappings.keys(): LOG.warning( f"{family.name}.x-network.x-cloudmap - {cloudmap_config} is set multiple times. " f"Preserving {family_mappings[map_name]}" ) else: if keyisset("Port", config): for port in ports: if port["target"] == config["Port"]: family_mappings[map_name] = { "Port": port, "Name": set_else_none( "Name", config, family.family_hostname ), } break else: family_mappings[map_name] = { "Port": ports[0], "Name": set_else_none("Name", config, family.family_hostname), }
[docs]def merge_cloudmap_settings(family: ComposeFamily, ports: list) -> dict: """ Function to merge the x_cloudmap from the service :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param list[dict] ports: :return: The cloudmap config for the given family :rtype: dict """ cloudmap_configs = [ svc.x_cloudmap for svc in family.ordered_services if svc.x_cloudmap ] if not cloudmap_configs or not ports: return {} family_mappings = {} for cloudmap_config in cloudmap_configs: if isinstance(cloudmap_config, str): handle_str_cloudmap_config(family, family_mappings, cloudmap_config, ports) elif isinstance(cloudmap_config, dict): handle_dict_cloudmap_config(family, family_mappings, cloudmap_config, ports) return family_mappings