# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
from __future__ import annotations
import re
from time import sleep
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
from ecs_composex.compose.compose_services.service_image import ServiceImage
from boto3.session import Session
from compose_x_common.compose_x_common import keyisset, set_else_none
try:
from ecr_scan_reporter.ecr_scan_reporter import DEFAULT_THRESHOLDS
from ecr_scan_reporter.images_scanner import trigger_images_scan
except ImportError:
DEFAULT_THRESHOLDS: dict = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
raise ImportError(
"Run pip install ecs-composex[ecrscan] to enable this functionality."
)
from ecs_composex.common.logging import LOG
from .ecr_helpers import define_ecr_session
ECR_URI_RE = re.compile(
r"(?P<account_id>\d{12}).dkr.ecr.(?P<region>[a-z0-9-]+).amazonaws.com/"
r"(?P<repo_name>[a-zA-Z0-9-_./]+)(?P<tag>(?:\@sha[\d]+:[a-z-Z0-9]+$)|(?::[\S]+$))"
)
[docs]def initial_scan_retrieval(
registry, repository_name, image, service_image, trigger_scan, ecr_session=None
):
"""
Function to retrieve the scan findings from ECR, and if none, can trigger scan
:param str registry:
:param str repository_name:
:param dict image:
:param ServiceImage service_image:
:param bool trigger_scan:
:param boto3.session.Session ecr_session:
:return: The scan report
:rtype: dict
"""
if ecr_session is None:
ecr_session = Session()
client = ecr_session.client("ecr")
try:
image_scan_r = client.describe_image_scan_findings(
registryId=registry, repositoryName=repository_name, imageId=image
)
return image_scan_r
except client.exceptions.ScanNotFoundException:
LOG.warning(f"No scan report found for {service_image.image_uri}")
if trigger_scan:
LOG.info(
f"Triggering scan for {service_image.image_uri}, trigger_scan={trigger_scan}"
)
trigger_images_scan(
repo_name=repository_name,
images_to_scan=[image],
ecr_session=ecr_session,
)
else:
LOG.warn(
f"No scan was available and scanning not requested for {service_image.image_uri}. Skipping"
)
return None
[docs]def scan_poll_and_wait(
registry,
repository_name,
image,
image_url,
ecr_session=None,
scan_frequency: str = None,
scan_on_push: bool = False,
):
"""Function to pull the scans results until no longer in progress"""
client = ecr_session.client("ecr")
while True:
try:
image_scan_r = client.describe_image_scan_findings(
registryId=registry,
repositoryName=repository_name,
imageId=image,
)
if image_scan_r["imageScanStatus"]["status"] in ["IN_PROGRESS", "PENDING"]:
LOG.info(
f"{image_url.image_uri} - Scan in progress - waiting 10 seconds"
)
sleep(10)
else:
return image_scan_r
except client.exceptions.ScanNotFoundException:
if scan_frequency and scan_frequency == "CONTINUOUS_SCAN" and scan_on_push:
LOG.info(f"{image_url.image_uri} - Pending enhanced scan")
sleep(10)
except client.exceptions.LimitExceededException:
LOG.warn(f"{image_url} - Exceeding API Calls quota. Waiting 10 seconds")
sleep(10)
[docs]def wait_for_scan_report(
registry,
repository_name,
image,
image_url: ServiceImage,
trigger_scan=False,
ecr_session=None,
) -> dict[str, Union[dict, str]]:
"""
Function to wait for the scan report to go from In Progress to else
:param str registry:
:param str repository_name:
:param image:
:param str image_url::
:param bool trigger_scan:
:param boto3.session.Session ecr_session:
:return:
"""
if not ecr_session:
ecr_session = Session()
findings = {}
scan_frequency = None
scan_on_push = False
try:
scanning_config = ecr_session.client(
"ecr"
).batch_get_repository_scanning_configuration(
repositoryNames=[repository_name]
)[
"scanningConfigurations"
]
scan_frequency = scanning_config[0]["scanFrequency"]
scan_on_push = scanning_config[0]["scanOnPush"]
except Exception as error:
LOG.warning(
f"{repository_name} - Could not determine scanning configuration - {error}"
)
image_scan_r = initial_scan_retrieval(
registry, repository_name, image, image_url, trigger_scan, ecr_session
)
LOG.info(
"ECR Repository Scan configuration: {} - (ScanOnPush/scanFrequency): {}/{}".format(
repository_name, scan_on_push, scan_frequency
)
)
if (
image_scan_r is None
and not trigger_scan
and scan_frequency != "CONTINUOUS_SCAN"
):
return findings
if (image_scan_r is None and scan_frequency == "CONTINUOUS_SCAN") or (
image_scan_r
and (
keyisset("imageScanStatus", image_scan_r)
and image_scan_r["imageScanStatus"]["status"] in ["IN_PROGRESS", "PENDING"]
)
):
image_scan_r = scan_poll_and_wait(
registry,
repository_name,
image,
image_url,
ecr_session,
scan_frequency,
scan_on_push,
)
if image_scan_r is None:
reason = "Failed to retrieve or poll scan report"
LOG.error(reason)
findings: dict = {"FAILED": True, "reason": reason}
elif image_scan_r["imageScanStatus"]["status"] != "FAILED" and keyisset(
"findingSeverityCounts", image_scan_r["imageScanFindings"]
):
findings: dict = image_scan_r["imageScanFindings"]["findingSeverityCounts"]
elif image_scan_r["imageScanStatus"]["status"] == "FAILED":
findings: dict = {
"FAILED": True,
"reason": image_scan_r["imageScanStatus"]["description"],
}
return findings
[docs]def define_result(
image_url: str,
security_findings: dict,
thresholds: dict,
vulnerability_config: dict,
) -> tuple[bool, list[str], list[str]]:
"""
Function to define what to do with findings, if any.
If VulnerabilitiesScan.Fail is False, then ignore the findings and display only
:param str image_url:
:param dict security_findings:
:param dict thresholds:
:param dict vulnerability_config:
:return: Whether there is a breach of thresholds or not
:rtype: bool
"""
results: list[str] = []
over_the_limit_results: list[str] = []
ignore = keyisset("IgnoreFailure", vulnerability_config)
if not security_findings:
return True, results, over_the_limit_results
elif keyisset("FAILED", security_findings):
LOG.error(
f"{image_url} - Scan of image failed. - {security_findings['reason']}"
)
treat_failed_as = set_else_none(
"TreatFailedAs", vulnerability_config, "Failure"
)
if treat_failed_as == "Success":
LOG.warning("TreatFailedAs set to Success - ignoring scan failure")
return True, results, over_the_limit_results
else:
return False, results, over_the_limit_results
else:
for name, limit in thresholds.items():
level_limit = set_else_none(name, security_findings)
if not level_limit:
continue
if level_limit > limit:
over_the_limit_results.append(f"{name}: {level_limit}/{limit}")
results.append(f"{name}: {level_limit}/{limit}")
if not ignore and over_the_limit_results:
return False, results, over_the_limit_results
return True, results, over_the_limit_results
[docs]def scan_service_image(
service, settings, the_image: dict = None
) -> tuple[bool, list[str], list[str]]:
"""
Function to review the service definition and evaluate scan if properties defined
:param ecs_composex.common.compose_services.ComposeService service:
:param ecs_composex.common.settings.ComposeXSettings settings: The settings for the execution
:param the_image: The image to use for scanning references.
"""
region = None
if validate_input(service):
return True, [], []
vulnerability_config = service.ecr_config["VulnerabilitiesScan"]
if keyisset("Thresholds", vulnerability_config):
thresholds = dict(DEFAULT_THRESHOLDS)
thresholds.update(vulnerability_config["Thresholds"])
else:
LOG.warn(f"No thresholds defined. Using defaults {DEFAULT_THRESHOLDS}")
thresholds = DEFAULT_THRESHOLDS
validate_the_image_input(the_image)
parts = service.image.private_ecr
repo_name = parts.group("repo_name")
account_id = parts.group("account_id")
region = parts.group("region")
session = define_ecr_session(
account_id,
repo_name,
region,
settings,
role_arn=service.ecr_config["RoleArn"]
if keyisset("RoleArn", vulnerability_config)
else None,
)
security_findings = wait_for_scan_report(
registry=account_id,
repository_name=repo_name,
image=the_image,
image_url=service.image,
ecr_session=session,
)
return define_result(
service.image, security_findings, thresholds, vulnerability_config
)