# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Module for VpcStack
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from troposphere import Template
from boto3.session import Session
from ecs_composex.common.settings import ComposeXSettings
from ecs_composex.mods_manager import XResourceModule
import re
import troposphere
from botocore.exceptions import ClientError
from compose_x_common.aws import get_region_azs
from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import FindInMap, GetAtt, Join, Ref
from troposphere.servicediscovery import PrivateDnsNamespace
from ecs_composex.common.cfn_params import Parameter
from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.resources_import import (
find_aws_properties_in_aws_resource,
find_aws_resources_in_template_resources,
)
from ecs_composex.vpc import aws_mappings
from ecs_composex.vpc.vpc_aws import lookup_x_vpc_settings
from ecs_composex.vpc.vpc_maths import get_subnet_layers
from ecs_composex.vpc.vpc_params import (
APP_SUBNETS,
APP_SUBNETS_CIDR,
DEFAULT_VPC_CIDR,
PUBLIC_SUBNETS,
PUBLIC_SUBNETS_CIDR,
RES_KEY,
STORAGE_SUBNETS,
STORAGE_SUBNETS_CIDR,
SUBNETS_TYPE,
VPC_CIDR,
VPC_ID,
VPC_SINGLE_NAT,
)
from ecs_composex.vpc.vpc_subnets import (
add_apps_subnets,
add_public_subnets,
add_storage_subnets,
)
from ecs_composex.vpc.vpc_template import add_vpc_core, add_vpc_flow
from ..common.troposphere_tools import add_outputs, build_template
from ..compose.x_resources.environment_x_resources import AwsEnvironmentResource
from .vpc_cloudmap import x_vpc_to_x_cloudmap
AZ_INDEX_PATTERN = r"(([a-z0-9-]+)([a-z]{1}$))"
AZ_INDEX_RE = re.compile(AZ_INDEX_PATTERN)
[docs]class Vpc(AwsEnvironmentResource):
"""
Class to represent the VPC
"""
default_ipv4_cidr = DEFAULT_VPC_CIDR
required_subnets = [
APP_SUBNETS.title,
PUBLIC_SUBNETS.title,
STORAGE_SUBNETS.title,
]
def __init__(
self,
name: str,
definition: dict,
module: XResourceModule,
settings: ComposeXSettings,
):
self.vpc = None
self.vpc_cidr = None
self.app_subnets = []
self.public_subnets = []
self.storage_subnets = []
self.subnets = []
self.subnets_parameters = []
self.endpoints = []
self.endpoints_sg = None
self.logging = None
self.layers = None
self.azs = {}
self.zone_ids: dict = {}
super().__init__(name, definition, module, settings)
[docs] def storage_subnets_count(self) -> int:
if self.cfn_resource and self.storage_subnets:
return len(self.storage_subnets[-1])
elif self.mappings:
return len(self.mappings[STORAGE_SUBNETS.title]["Ids"])
else:
raise AttributeError(
f"VPC is not set. Cannot determine the count for {STORAGE_SUBNETS.title}"
)
[docs] def create_vpc(self, template: Template, settings: ComposeXSettings) -> None:
"""Creates a new VPC from Properties (or from defaults)"""
self.endpoints = set_else_none("Endpoints", self.properties, [])
self.vpc_cidr = set_else_none(
VPC_CIDR.title, self.properties, self.default_ipv4_cidr
)
self.dhcp_options = set_else_none("DHCPOptions", self.properties, {})
region_account_zones = settings.session.client(
"ec2"
).describe_availability_zones()
curated_azs = []
current_region_azs = [
zone["ZoneName"] for zone in region_account_zones["AvailabilityZones"][:2]
]
for az in current_region_azs:
if isinstance(az, dict):
curated_azs.append(az["ZoneName"])
elif isinstance(az, str):
curated_azs.append(az)
azs_index = [AZ_INDEX_RE.match(az).groups()[-1] for az in curated_azs]
self.azs[PUBLIC_SUBNETS] = current_region_azs
self.azs[STORAGE_SUBNETS] = current_region_azs
self.azs[APP_SUBNETS] = current_region_azs
self.layers = get_subnet_layers(self.vpc_cidr, len(curated_azs))
vpc_core = add_vpc_core(template, self.vpc_cidr, self.dhcp_options)
self.vpc = vpc_core[0]
self.storage_subnets = add_storage_subnets(
template, self.vpc, azs_index, self.layers
)
self.public_subnets = add_public_subnets(
template,
self.vpc,
azs_index,
self.layers,
vpc_core[-1],
set_else_none(
VPC_SINGLE_NAT.title,
self.properties,
bool(VPC_SINGLE_NAT.Default),
),
set_else_none("DisableNat", self.properties, False),
)
self.app_subnets = add_apps_subnets(
template,
self.vpc,
azs_index,
self.layers,
self.public_subnets[-1],
self.endpoints,
)
if keyisset("EnableFlowLogs", self.properties):
add_vpc_flow(
template,
self.vpc,
boundary=set_else_none("FlowLogsRoleBoundary", self.properties, None),
)
self.cfn_resource = self.vpc
self.subnets_parameters.append(APP_SUBNETS)
self.subnets_parameters.append(PUBLIC_SUBNETS)
self.subnets_parameters.append(STORAGE_SUBNETS)
[docs] def lookup_vpc(self) -> None:
"""Method to set VPC settings from x-vpc"""
vpc_settings = lookup_x_vpc_settings(self)
self.create_vpc_mappings(vpc_settings)
LOG.info(f"{RES_KEY} - Found VPC - {self.mappings[VPC_ID.title][VPC_ID.title]}")
[docs] def create_vpc_mappings(self, vpc_settings):
"""
Generates the VPC CFN Mappings
:param vpc_settings:
:param ecs_composex.common.settings.ComposeXSettings settings:
:return:
"""
self.mappings = {
VPC_ID.title: {VPC_ID.title: vpc_settings[VPC_ID.title]},
APP_SUBNETS.title: {"Ids": vpc_settings[APP_SUBNETS.title]},
STORAGE_SUBNETS.title: {"Ids": vpc_settings[STORAGE_SUBNETS.title]},
PUBLIC_SUBNETS.title: {"Ids": vpc_settings[PUBLIC_SUBNETS.title]},
}
ignored_keys = ["RoleArn", "session"]
self.subnets_parameters.append(APP_SUBNETS)
self.subnets_parameters.append(PUBLIC_SUBNETS)
self.subnets_parameters.append(STORAGE_SUBNETS)
for setting_name in vpc_settings:
if (
setting_name not in self.mappings.keys()
and setting_name not in ignored_keys
):
self.mappings[setting_name] = {"Ids": vpc_settings[setting_name]}
param = Parameter(setting_name, Type=SUBNETS_TYPE)
self.subnets_parameters.append(param)
self.set_azs_from_vpc_import(
vpc_settings,
session=vpc_settings["session"]
if keyisset("session", vpc_settings)
else None,
)
[docs] def set_azs_from_api(self) -> None:
"""Method to set the AWS Azs based on DescribeAvailabilityZones"""
try:
self.aws_azs = get_region_azs(self.lookup_session)
except ClientError as error:
code = error.response["Error"]["Code"]
message = error.response["Error"]["Message"]
if code == "RequestExpired":
LOG.error(message)
LOG.warning(f"Due to error, using default values {self.aws_azs}")
else:
LOG.error(error)
[docs] def set_azs_from_vpc_import(self, subnets: dict, session: Session = None) -> None:
"""Function to get the list of AZs for a given set of subnets"""
if session is None:
client = self.lookup_session.client("ec2")
else:
client = session.client("ec2")
for subnet_name, subnet_definition in subnets.items():
if not isinstance(subnet_definition, list):
continue
for subnet_param in self.subnets_parameters:
if subnet_param.title == subnet_name:
subnets_param = subnet_param
break
else:
raise KeyError(
f"x-vpc.set_azs_from_vpc_import - No parameter defined for {subnet_name}"
)
try:
subnets_r = client.describe_subnets(SubnetIds=subnet_definition)[
"Subnets"
]
azs = [subnet["AvailabilityZone"] for subnet in subnets_r]
zone_ids = [subnet["AvailabilityZoneId"] for subnet in subnets_r]
self.mappings[subnet_name]["Azs"] = azs
self.mappings[subnet_name]["ZoneIds"] = zone_ids
self.azs[subnets_param] = azs
self.zone_ids[subnets_param] = zone_ids
except ClientError:
LOG.warning("Could not define the AZs based on the imported subnets")
[docs] def init_outputs(self) -> None:
"""
Initialize output properties to pass on to the other stacks that need these values
"""
self.output_properties = {
VPC_ID: (VPC_ID.title, self.vpc, Ref, None),
APP_SUBNETS: (
APP_SUBNETS.title,
self.app_subnets[1],
Join,
[",", [Ref(subnet) for subnet in self.app_subnets[1]]],
),
PUBLIC_SUBNETS: (
PUBLIC_SUBNETS.title,
self.public_subnets[1],
Join,
[",", [Ref(subnet) for subnet in self.public_subnets[1]]],
),
STORAGE_SUBNETS: (
STORAGE_SUBNETS.title,
self.storage_subnets[1],
Join,
[",", [Ref(subnet) for subnet in self.storage_subnets[1]]],
),
STORAGE_SUBNETS_CIDR: (
STORAGE_SUBNETS_CIDR.title,
None,
Join,
[",", [cidr for cidr in self.layers["stor"]]],
),
APP_SUBNETS_CIDR: (
APP_SUBNETS_CIDR.title,
None,
Join,
[",", [cidr for cidr in self.layers["app"]]],
),
PUBLIC_SUBNETS_CIDR: (
PUBLIC_SUBNETS_CIDR.title,
None,
Join,
[",", [cidr for cidr in self.layers["pub"]]],
),
}
[docs] def handle_x_dependencies(
self, settings: ComposeXSettings, root_stack: ComposeXStack
) -> None:
"""
Function to have x-vpc update resources that have the x-vpc value where VpcID should be.
"""
for resource in settings.get_x_resources(include_mappings=False):
if not resource.cfn_resource:
continue
resource_stack = resource.stack
if not resource_stack:
LOG.debug(
f"resource {resource.name} has no `stack` attribute defined. Skipping"
)
continue
x_to_x_mappings = [
(
x_vpc_to_x_cloudmap,
(PrivateDnsNamespace,),
str,
"Vpc",
)
]
for update_settings in x_to_x_mappings:
aws_resources_to_update = find_aws_resources_in_template_resources(
resource_stack, update_settings[1]
)
for stack_resource in aws_resources_to_update:
properties_to_update = find_aws_properties_in_aws_resource(
update_settings[2], stack_resource
)
update_settings[0](
self,
stack_resource,
resource_stack,
properties_to_update,
update_settings[3],
settings,
)
[docs]def init_vpc_template() -> troposphere.Template:
"""
Simple wrapper function to create the VPC Template
:rtype: troposhere.Template
"""
template = build_template(
"Vpc Template generated via ECS Compose-X",
)
template.add_mapping("AwsLbAccounts", aws_mappings.AWS_LB_ACCOUNTS)
return template
[docs]class XStack(ComposeXStack):
"""
Class to create the VPC Stack
"""
def __init__(
self, title, settings: ComposeXSettings, module: XResourceModule, **kwargs
):
self.is_void = True
self.vpc_resource = None
if not keyisset(module.res_key, settings.compose_content):
LOG.warning(f"{module.res_key} - not defined. Assuming no VPC")
self.is_void = True
else:
self.vpc_resource = Vpc(
"vpc", settings.compose_content[module.res_key], module, settings
)
if self.vpc_resource.lookup:
self.vpc_resource.lookup_vpc()
elif self.vpc_resource.properties:
template = init_vpc_template()
self.vpc_resource.create_vpc(template, settings)
self.is_void = False
self.vpc_resource.init_outputs()
super().__init__(title, stack_template=template, **kwargs)
self.vpc_resource.generate_outputs()
add_outputs(template, self.vpc_resource.outputs)
self.vpc_resource.stack = self
[docs] def create_new_default_vpc(
self, title: str, vpc_module, settings: ComposeXSettings
):
"""
In case no x-vpc was specified but the deployment settings require a new VPC, allows for an easy way to set one.
"""
self.vpc_resource = Vpc(
name="vpc",
definition={"Properties": {VPC_CIDR.title: Vpc.default_ipv4_cidr}},
module=vpc_module,
settings=settings,
)
template = init_vpc_template()
self.vpc_resource.create_vpc(template, settings)
self.is_void = False
self.vpc_resource.init_outputs()
super().__init__(title, stack_template=template)
self.vpc_resource.generate_outputs()
add_outputs(template, self.vpc_resource.outputs)
self.vpc_resource.stack = self
@property
def vpc_id(self):
"""
Gives the VPC ID
:return:
"""
if not self.is_void and self.vpc_resource:
return GetAtt(self.title, f"Outputs.{VPC_ID.title}")
elif self.is_void and self.vpc_resource.mappings:
return FindInMap("Network", VPC_ID.title, VPC_ID.title)
else:
return None