Policy as Codeでインフラのコンプライアンスを自動実現! 「Pulumi CrossGuard」を活用してみよう

2023年7月4日(火)
大関 研丞 (Kenneth Ozeki)
第5回となる今回は、Policy as Codeの概要を解説し、「Pulumi CrossGuard」を利用してPulumiでリソースを作成する際のポリシーチェック を行うハンズオンを実践していきます。

ポリシーカスタマイズ

折角なので、試しにポリシー設定をカスタマイズし、新たなポリシーを追加してみます。

  1. ポリシー用ディレクトリに移動します。
    $ cd ../policypack 
    
    $ pwd
    /***/test-crossguard/policypack
  2. 以下のポリシーコードに書き換えます。
    import ipaddress
    import pulumi
    from pulumi_policy import (
        EnforcementLevel,
        PolicyPack,
        ReportViolation,
        ResourceValidationArgs,
        ResourceValidationPolicy,
    )
    
    # スタックの名前を取得
    stack_name = pulumi.get_stack()
    
    # スタックの設定を読み込む
    config = pulumi.Config()
    
    
    # S3 公開設定チェック
    def s3_no_public_read_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        if args.resource_type == "aws:s3/bucket:Bucket" and "acl" in args.props:
            acl = args.props["acl"]
            if acl == "public-read" or acl == "public-read-write":
                report_violation(
                    "You cannot set public-read or public-read-write on an S3 bucket. " +
                    "Read more about ACLs here: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html"
                )
    
    s3_no_public_read = ResourceValidationPolicy(
        name="s3-no-public-read",
        description="Prohibits setting the publicRead or publicReadWrite permission on AWS S3 buckets.",
        validate=s3_no_public_read_validator,
    )
    
    
    # リソース名 prefixチェック
    def name_prefix_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        prefix_check_resources = ["aws:s3/bucket:Bucket","aws:ec2/subnet:Subnet","aws:ec2/instance:Instance"]
        resource_prefix = f"{stack_name}-"
        if args.resource_type in prefix_check_resources:
            if not args.name.startswith(resource_prefix):
                report_violation(
                    f"Resource name {args.name} should have prefix '{resource_prefix}'"
                )
    
    name_prefix = ResourceValidationPolicy(
        name="name_prefix",
        description="Check resource prefix",
        validate=name_prefix_validator
    )
    
    
    # VPC/Subnet IPレンジチェック
    def network_cidr_block_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_cidr_block = "10.0.0.0/16"
        if args.resource_type in ["aws:ec2/vpc:Vpc", "aws:ec2/subnet:Subnet"] and "cidrBlock" in args.props:
            cidr_block = ipaddress.ip_network(args.props["cidrBlock"])
            if not cidr_block.subnet_of(ipaddress.ip_network(allowed_cidr_block)):
                report_violation(
                    f"{args.resource_type} {args.name} uses restricted CIDR block¥n" +
                    f"Please use in ip range of {allowed_cidr_block}", 
                )
    
    network_cidr_block = ResourceValidationPolicy(
        name="network_cidr_block",
        description="Check VPC/Subnet CIDR block",
        validate=network_cidr_block_validator
    )
    
    
    # Subnet Avaliability Zone チェック
    def subnet_availability_zone_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_az = ["ap-northeast-1a", "ap-northeast-1c"]
        if args.resource_type == "aws:ec2/subnet:Subnet" and "availabilityZone" in args.props:
            if not args.props["availabilityZone"] in allowed_az:
                report_violation(
                    f"Subnet {args.name} uses restricted availability zone¥n" +
                    f"Please use in any az of {allowed_az}",
                )
    
    subnet_availability_zone = ResourceValidationPolicy(
        name="subnet_availability_zone",
        description="Check subnet availability zone",
        validate=subnet_availability_zone_validator
    )
    
    # EC2 インスタンスタイプチェック
    def ec2_instance_type_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_instance_types = ["t2.nano", "t2.micro", "t2.small"]
        if args.resource_type == "aws:ec2/instance:Instance" and "instanceType" in args.props:
            if not args.props["instanceType"] in allowed_instance_types:
                report_violation(
                    f"EC2 instance {args.name} uses restricted instance type¥n" +
                    f"Please use in any type of {allowed_instance_types}",
                )
    
    ec2_instance_type = ResourceValidationPolicy(
        name="ec2_instance_type",
        description="Check EC2 instance type",
        validate=ec2_instance_type_validator
    )
    
    
    PolicyPack(
        name="aws-python",
        enforcement_level=EnforcementLevel.MANDATORY,
        policies=[
            s3_no_public_read,
            name_prefix,
            network_cidr_block,
            subnet_availability_zone,
            ec2_instance_type,
        ],
    )
    実際のプロジェクトでよくありそうなポリシーチェックを想定して、以下のポリシーを追加しました。
    • リソース名のprefixに環境名(dev/stg/prdなど)を必ず付ける
    • VPC/SubnetのネットワークリソースのIPレンジを特定のレンジに限定する
    • Subnetのavailability zoneを特定のazに限定する
    • EC2のインスタンスタイプを特定のタイプに限定する
    また、許可したいEC2インスタンスタイプが複数ある場合など、ポリシーチェックで、あるpropertyの取り換える値が複数存在する場合は、以下のようにリストで指定しています。
    # EC2 インスタンスタイプチェック
    def ec2_instance_type_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_instance_types = ["t2.nano", "t2.micro", "t2.small"]
        if args.resource_type == "aws:ec2/instance:Instance" and "instanceType" in args.props:
            if not args.props["instanceType"] in allowed_instance_types:
    また、ipレンジの比較はpythonの「ipaddress」ライブラリを利用して比較しています。
    # VPC/Subnet IPレンジチェック
    def network_cidr_block_validator(args: ResourceValidationArgs, report_violation: ReportViolation):
        allowed_cidr_block = "10.0.0.0/16"
        if args.resource_type in ["aws:ec2/vpc:Vpc", "aws:ec2/subnet:Subnet"] and "cidrBlock" in args.props:
            cidr_block = ipaddress.ip_network(args.props["cidrBlock"])
            if not cidr_block.subnet_of(ipaddress.ip_network(allowed_cidr_block)):
    Policyについては1つのPolicyPackにまとめました。「特定のPolicyだけEnforcement LevelをAdvisoryにしたい」などの場合は、PolicyPackを分けることもできます。
    PolicyPack(
        name="aws-python",
        enforcement_level=EnforcementLevel.MANDATORY,
        policies=[
            s3_no_public_read,
            name_prefix,
            network_cidr_block,
            subnet_availability_zone,
            ec2_instance_type,
        ],
    )
    1つ注意点としては、Pulumi Programでは各リソースのpropertyのkeyをスネークケース(cidr_block、availability_zone、instance_typeなど)で記載していますが、Policyのvalidatorを定義する際、propertyのkey指定はキャメルケース(cidrBlock、availabilityZone、instanceType)となるので、気をつけてください。
    print(args.props._DictProxy__map)
    
    ↓
    
    {'vpcId': '04da6b54-80e4-46f7-96ec-b56ff0331ba9', 'enableResourceNameDnsARecordOnLaunch': False, 'cidrBlock': '192.168.1.0/24', 'mapPublicIpOnLaunch': False, 'enableResourceNameDnsAaaaRecordOnLaunch': False, 'enableDns64': False, 'ipv6Native': False, 'availabilityZone': 'us-east-1a', 'assignIpv6AddressOnCreation': False}
    【validator定義の際に利用する args.props(ResourceValidationArgs)objectの中身】
著者
大関 研丞 (Kenneth Ozeki)
クリエーションライン株式会社 Data Platform Team
前職では保険や金融エンタープライズのミッションクリティカルシステム(オンプレミス、仮想サーバー、CDN等のインフラ系業務)の設計/構築を経験。クリエーションラインに転職後はクラウドエンジニアとしてGCP関連の案件でインフラの設計/構築、IaCやCI/CDを用いたDevOpsの導入、コンテナ(Kubernetes)基盤の構築、運用自動化ツールの作成などを担当。
クリエーションラインの技術ブログをチェック

連載バックナンバー

システム運用技術解説
第10回

Pulumiの最新機能「Pulumi ESC」を使ってみよう

2023/12/26
最終回となる今回は、2023年12月時点でリリースされている新機能の紹介と、その新機能の中から「Pulumi ESC」を用いたハンズオンを実践していきます。
システム運用技術解説
第9回

TerraformからPulumiへの移行

2023/11/28
第9回となる今回は、既にTerraformでAWS環境に作成されているリソースをPulumiへ移行するケースを想定して、CoexistenceとConversionのハンズオンを実践していきます
システム運用技術解説
第8回

既に存在するリソースをPulumiで管理してみよう

2023/10/19
第8回となる今回は、既にクラウド環境にデプロイされているリソースをPulumiで管理(import)する方法について、ハンズオンで実践していきます。

Think ITメルマガ会員登録受付中

Think ITでは、技術情報が詰まったメールマガジン「Think IT Weekly」の配信サービスを提供しています。メルマガ会員登録を済ませれば、メルマガだけでなく、さまざまな限定特典を入手できるようになります。

Think ITメルマガ会員のサービス内容を見る

他にもこの記事が読まれています