Amazon Identity and Access Management (IAM) manages access to AWS services and resources securely. As best practice, we should provide access to IAM User, Group etc. with minimum permission to perform their job. But, We often make mistake by providing access to certain Powerful Actions through IAM policy and, enables users, groups etc. to perform actions which may cause intentional or unintentional issues to AWS infrastructure. It may intern causes production downtime and financial loss. We must try to stop such incidents by periodically reviewing defined IAM policies for certain predefined powerful actions.
I developed this Python script to run periodically on AWS. It can be configured as Lambda function running in a schedule and sending email notification through SNS subscription.
Hope you will find it helpful to secure your AWS infrastructure.
Note: You need to provide an AWS profile name and location and update the script. You can set it through you AWS credential configuration on your system. Comment out those lines if you intend to run it as Lambda function. I have provided a list of Powerful Actions based on my requirements. You may add/remove items here. Ideally, we should be moving it out of the main script and set it as a separate JSON/YAML file. Please feel free to enhance it as per your requirements. Organization level policies can prevent us from executing certain actions at the very top level.
Prerequisites:
- Python and boto3 module installed
- AWS Credentials configured
- Setup IAM execution role and SNS topic/subscription configured if you plan to run it as Lambda function
- Change file path to a Linux based path,if you want to run it as Lambda
import boto3,logging,json,os
from botocore.config import Config
import csv
import datetime
from time import gmtime,strftime
PROFILE_NAME = '<Enter AWS Profile Name>'
REGION_NAME = '<AWS location>'
POWERFUL_ACTIONS = ["config:DeleteConfigRule","lambda:AddPermission","lambda:DeleteFunction","lambda:InvokeFunction","kms:CreateKey","kms:Decrypt","kms:DisableKey","athena:DeleteNamedQuery","dynamodb:CreateBackup","dynamodb:DeleteBackup","dynamodb:DeleteItem","dynamodb:DeleteTable","dax:DeleteCluster","dax:DeleteItem","dax:DeleteParameterGroup","dax:DeleteSubnetGroup","dax:RebootNode","ec2:CreateCustomerGateway","ec2:CreateDefaultSubnet","ec2:CreateDefaultVpc","ec2:CreateDhcpOptions","ec2:CreateEgressOnlyInternetGateway","ec2:CreateNetworkAcl","ec2:CreateNetworkAclEntry","ec2:CreateRoute","ec2:CreateRouteTable","ec2:CreateSubnet","ec2:CreateVpc","ec2:CreateVpcEndpoint","ec2:DeleteCustomerGateway","ec2:DeleteDhcpOptions","ec2:DeleteEgressOnlyInternetGateway","ec2:DeleteFlowLogs","ec2:DeleteNetworkAcl","ec2:DeleteNetworkAclEntry","ec2:DeleteRoute","ec2:DeleteRouteTable","ec2:DeleteSecurityGroup","ec2:DeleteSubnet","ec2:DeleteVpc","ec2:DeleteVpcEndpoints","ec2:DeleteVpcPeeringConnection","ec2:DeleteVpnConnection","ec2:DeleteVpnConnectionRoute","ec2:DeleteVpnGateway","cloudformation:DeleteStack","iam:DeleteSAMLProvider", "iam:DeleteSSHPublicKey"]
session = boto3.Session(profile_name=PROFILE_NAME, region_name=REGION_NAME)
iam_client = session.client('iam')
fName = "aws_policy_audit_"+strftime("%m_%d_%Y",gmtime())+".csv"
fPath = "c:\\temp\\"+fName
if os.path.exists(fPath):
os.remove(fPath)
outFile = open(fPath,"w+")
outFile.write("%s,%s,%s,%s,%s,%s\n"%('IAM Resource Type','Resource Name','Action Name','Source Name','Evaluation Decision','Source Policy Id'))
outFile.flush()
def iam_simulation_function(iamResourceType,resourceName,resourceArn,oFile):
try:
all_simlumation_results = iam_client.simulate_principal_policy(
PolicySourceArn = resourceArn, ActionNames = POWERFUL_ACTIONS
).get('EvaluationResults',[])
for simulation_result in all_simlumation_results:
if simulation_result['EvalDecision'] == 'allowed':
actionName = simulation_result.get('EvalActionName')
sourceName = simulation_result.get('EvalResourceName')
evaluationDecision = simulation_result.get('EvalDecision')
for policyInfo in simulation_result.get('MatchedStatements',[]):
sourcePolicyId = policyInfo.get('SourcePolicyId')
oFile.write("%s,%s,%s,%s,%s,%s\n"% iamResourceType,resourceName,actionName,sourceName,evaluationDecision,sourcePolicyId))
oFile.flush()
except Exception as e:
raise Exception("[ErrorMessage]: " + str(e))
# All IAM Users
try:
allIamUsers = iam_client.list_users().get('Users',[])
userList = []
for iamUser in allIamUsers:
userdata = {
'UserName': iamUser['UserName'],
'UserArn': iamUser['Arn']
}
userList.append(userdata)
for usr in userList:
iam_simulation_function('User',usr['UserName'],usr['UserArn'],outFile)
except Exception as e:
raise Exception("[ErrorMessage]: " + str(e))
# All IAM Groups
try:
allIamGroups = iam_client.list_groups().get('Groups',[])
groupList = []
for iamGroup in allIamGroups:
groupData = {
'GroupName': iamGroup['GroupName'],
'GroupArn': iamGroup['Arn']
}
groupList.append(groupData)
for grp in groupList: iam_simulation_function('Group',grp['GroupName'],grp['GroupArn'],outFile)
except Exception as e:
raise Exception("[ErrorMessage]: " + str(e))
# All IAM Roles
try:
allIamRoles = iam_client.list_roles().get('Roles',[])
roleList = []
for iamRole in allIamRoles:
roleData={
'RoleName': iamRole['RoleName'],
'RoleArn': iamRole['Arn']
}
roleList.append(roleData)
for role in roleList:
iam_simulation_function('Role',role['RoleName'],role['RoleArn'],outFile)
except Exception as e:
raise Exception("[ErrorMessage]: " + str(e))
except Exception as e:
raise Exception("[ErrorMessage]: " + str(e))
outFile.close()