今日からはじめる Pulumiでカンタン インフラ運用・管理 5

ポリシーカスタマイズ

ポリシーカスタマイズ

折角なので、試しにポリシー設定をカスタマイズし、新たなポリシーを追加してみます。

  1. ポリシー用ディレクトリに移動します。
    $ cd ../policypack 
    
    $ pwd
    /***/test-crossguard/policypack
  2. 以下のポリシーコードに書き換えます。
    import ipaddress
    import pulumi
    from pulumi_policy import (
        EnforcementLevel,
        PolicyPack,
        ReportViolation,
        ResourceValidationArgs,
        ResourceValidationPolicy,
    )
    
    # スタックの名前を取得
    stack_name = pulumi.get_stack()
    
    # スタックの設定を読み込む
    config = pulumi.Config()
    
    
    # S3 公開設定チェック
    def s3_no_public_read_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        if args.resource_type == "aws:s3/bucket:Bucket" and "acl" in args.props:
            acl = args.props["acl"]
            if acl == "public-read" or acl == "public-read-write":
                report_violation(
                    "You cannot set public-read or public-read-write on an S3 bucket. " +
                    "Read more about ACLs here: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html"
                )
    
    s3_no_public_read = ResourceValidationPolicy(
        name="s3-no-public-read",
        description="Prohibits setting the publicRead or publicReadWrite permission on AWS S3 buckets.",
        validate=s3_no_public_read_validator,
    )
    
    
    # リソース名 prefixチェック
    def name_prefix_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        prefix_check_resources = ["aws:s3/bucket:Bucket","aws:ec2/subnet:Subnet","aws:ec2/instance:Instance"]
        resource_prefix = f"{stack_name}-"
        if args.resource_type in prefix_check_resources:
            if not args.name.startswith(resource_prefix):
                report_violation(
                    f"Resource name {args.name} should have prefix '{resource_prefix}'"
                )
    
    name_prefix = ResourceValidationPolicy(
        name="name_prefix",
        description="Check resource prefix",
        validate=name_prefix_validator
    )
    
    
    # VPC/Subnet IPレンジチェック
    def network_cidr_block_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_cidr_block = "10.0.0.0/16"
        if args.resource_type in ["aws:ec2/vpc:Vpc", "aws:ec2/subnet:Subnet"] and "cidrBlock" in args.props:
            cidr_block = ipaddress.ip_network(args.props["cidrBlock"])
            if not cidr_block.subnet_of(ipaddress.ip_network(allowed_cidr_block)):
                report_violation(
                    f"{args.resource_type} {args.name} uses restricted CIDR block¥n" +
                    f"Please use in ip range of {allowed_cidr_block}", 
                )
    
    network_cidr_block = ResourceValidationPolicy(
        name="network_cidr_block",
        description="Check VPC/Subnet CIDR block",
        validate=network_cidr_block_validator
    )
    
    
    # Subnet Avaliability Zone チェック
    def subnet_availability_zone_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_az = ["ap-northeast-1a", "ap-northeast-1c"]
        if args.resource_type == "aws:ec2/subnet:Subnet" and "availabilityZone" in args.props:
            if not args.props["availabilityZone"] in allowed_az:
                report_violation(
                    f"Subnet {args.name} uses restricted availability zone¥n" +
                    f"Please use in any az of {allowed_az}",
                )
    
    subnet_availability_zone = ResourceValidationPolicy(
        name="subnet_availability_zone",
        description="Check subnet availability zone",
        validate=subnet_availability_zone_validator
    )
    
    # EC2 インスタンスタイプチェック
    def ec2_instance_type_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_instance_types = ["t2.nano", "t2.micro", "t2.small"]
        if args.resource_type == "aws:ec2/instance:Instance" and "instanceType" in args.props:
            if not args.props["instanceType"] in allowed_instance_types:
                report_violation(
                    f"EC2 instance {args.name} uses restricted instance type¥n" +
                    f"Please use in any type of {allowed_instance_types}",
                )
    
    ec2_instance_type = ResourceValidationPolicy(
        name="ec2_instance_type",
        description="Check EC2 instance type",
        validate=ec2_instance_type_validator
    )
    
    
    PolicyPack(
        name="aws-python",
        enforcement_level=EnforcementLevel.MANDATORY,
        policies=[
            s3_no_public_read,
            name_prefix,
            network_cidr_block,
            subnet_availability_zone,
            ec2_instance_type,
        ],
    )
    実際のプロジェクトでよくありそうなポリシーチェックを想定して、以下のポリシーを追加しました。
    • リソース名のprefixに環境名(dev/stg/prdなど)を必ず付ける
    • VPC/SubnetのネットワークリソースのIPレンジを特定のレンジに限定する
    • Subnetのavailability zoneを特定のazに限定する
    • EC2のインスタンスタイプを特定のタイプに限定する
    また、許可したいEC2インスタンスタイプが複数ある場合など、ポリシーチェックで、あるpropertyの取り換える値が複数存在する場合は、以下のようにリストで指定しています。
    # EC2 インスタンスタイプチェック
    def ec2_instance_type_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_instance_types = ["t2.nano", "t2.micro", "t2.small"]
        if args.resource_type == "aws:ec2/instance:Instance" and "instanceType" in args.props:
            if not args.props["instanceType"] in allowed_instance_types:
    また、ipレンジの比較はpythonの「ipaddress」ライブラリを利用して比較しています。
    # VPC/Subnet IPレンジチェック
    def network_cidr_block_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_cidr_block = "10.0.0.0/16"
        if args.resource_type in ["aws:ec2/vpc:Vpc", "aws:ec2/subnet:Subnet"] and "cidrBlock" in args.props:
            cidr_block = ipaddress.ip_network(args.props["cidrBlock"])
            if not cidr_block.subnet_of(ipaddress.ip_network(allowed_cidr_block)):
    Policyについては1つのPolicyPackにまとめました。「特定のPolicyだけEnforcement LevelをAdvisoryにしたい」などの場合は、PolicyPackを分けることもできます。
    PolicyPack(
        name="aws-python",
        enforcement_level=EnforcementLevel.MANDATORY,
        policies=[
            s3_no_public_read,
            name_prefix,
            network_cidr_block,
            subnet_availability_zone,
            ec2_instance_type,
        ],
    )
    1つ注意点としては、Pulumi Programでは各リソースのpropertyのkeyをスネークケース(cidr_block、availability_zone、instance_typeなど)で記載していますが、Policyのvalidatorを定義する際、propertyのkey指定はキャメルケース(cidrBlock、availabilityZone、instanceType)となるので、気をつけてください。
    print(args.props._DictProxy__map)
    
    ↓
    
    {'vpcId': '04da6b54-80e4-46f7-96ec-b56ff0331ba9', 'enableResourceNameDnsARecordOnLaunch': False, 'cidrBlock': '192.168.1.0/24', 'mapPublicIpOnLaunch': False, 'enableResourceNameDnsAaaaRecordOnLaunch': False, 'enableDns64': False, 'ipv6Native': False, 'availabilityZone': 'us-east-1a', 'assignIpv6AddressOnCreation': False}
    【validator定義の際に利用する args.props(ResourceValidationArgs)objectの中身】
この記事のキーワード

この記事をシェアしてください

人気記事トップ10

人気記事ランキングをもっと見る