My workplace has an architecture constraint that requires all network traffic to and from the Internet to pass through a single control point. This means that all cloud traffic passes through our WAN, and thereby our firewall. In my previous post I wrote about dual-stacking firewall rules on an on-prem firewall, but cloud security deserves cloud-native solutions, even if the solution is doubling up on the same rule – better to have it stopped in the cloud than spend money on egress!
I wrote an interactive script that queries a firewall API for a rule/policy and creates matching AWS security groups to shore up this gap. The script asks for a firewall policy name and whether you want to create the security groups directly via AWS API, or if you want either Terraform or a Bash script to apply via AWSCLI. I had had plans to attach the security groups directly to resources (by iterating through ENIs and working up that way) but the permissions needed to do that would have required more paperwork than I was interested in doing! Instead, they’re added to the VPC of the resources for the purposes of making the repository public. I also have/had plans to write a web front-end for this to make it a bit more useable, but that idea has been shelved for the moment.
Unlike my previous tool, the audience for this was myself and a few of our cloud engineers so I stuck with Python. I also worked in a different direction for this one – starting out, the main question was “what do I need from the firewall in order to make a security group?” One might think you just need name, VPC, source, destination, and port and you’d be mostly right. But, annoyingly enough, it turns out the answer changes depending on which AWS tool you’re using to create it! The base data is the same, but how you format it differs greatly.
When I first built this script I decided to do it via AWS API, because that’s what I had access to in my dev environment. When creating a security group via API (in this case, boto3
, the Python SDK for AWS), you use the IpPermissions
JSON object to represent each port/protocol, and inside of that object you use an IpRanges
object for each address:
"IpPermissions": [
{
"IpProtocol": "tcp",
"FromPort": 3306,
"ToPort": 3306,
"IpRanges": [
{
"CidrIp": "10.0.2.1/32",
"Description": "MYSQL"
},
{
"CidrIp": "10.1.3.1/32",
"Description": "MYSQL"
}
]
It was extremely simple to take a firewall policy object and convert it to this format, it was almost 1:1! Hell yeah, we love an easy win.
However, this is real life, and we can’t just use the AWS API to make changes in production willy-nilly. In real life we use IaC! And furthermore, we have to review it like any other code, so I went to rewrite the script to create Terraform instead of direct API calls. This would allow our cloud engineers to make sure any changes would through the proper process.
I had originally planned to just print a lot of strings out to file to reformat it Terraform, but unwieldy doesn’t even begin to describe that mess! Who would do such a thing? (this is foreshadowing. I would do such a thing later in Bash). After some classic style Googling and a bit of ChatGPTing I found the Python library terrascript. Terrascript allows you to write Python code that isn’t an affront to God, and then spits out Terraform JSON, which is kind of like Terraform but a bit blockier. Importantly, Terraform JSON is completely compatible with regular Terraform.
So that solves that, right? Use the same security group JSON we used with with API to create the Terraform files…. except, it turns out, the JSON for the AWS Terraform Provider is completely different! Both Terraform and API allow you to create several IPs for each protocol, but Terraform places all of the addresses in a cidr_blocks
array of strings as opposed to the boto3
‘s use of an IpRanges
array of JSON objects.
{
"resource": {
"aws_security_group": {
"MYSQL_test_terraformLab": {
"name": "MYSQL_test_terraformLab",
"description": "Security group for policy MYSQL_test, built from firewall API",
"vpc_id": "vpc-xxxxxxxxxxxxxxxx",
"ingress": [
{
"from_port": 3306,
"to_port": 3306,
"protocol": "tcp",
"cidr_blocks": [
"10.0.2.1/32",
"10.1.3.1/32",
"10.1.3.2/32",
"10.1.3.3/32",
"10.1.3.4/32",
"10.1.3.5/32",
"10.1.3.6/32",
"10.1.3.7/32",
"10.1.3.8/32",
"10.1.3.9/32",
"10.1.3.10/32",
"10.1.3.11/32",
"10.1.3.12/32"
],
"ipv6_cidr_blocks": [],
"prefix_list_ids": [],
"security_groups": [],
"self": "false",
"description": "MYSQL"
}
]
}
}
}
}
Having reworked the script to massage the data from the firewall into the above format, I had the Terraform working in my dev environment! Back in business!
But it was ugly and inelegant, so I wanted to clean it up some and learn some more Python in the process. I hadn’t done any proper Object-Oriented stuff in a while, so I set out to re-architect the script to make it more chisel and less sledgehammer.
I started by creating a policy
class to store the data from the firewall. When a policy
object gets initialized it queries its policyName
(the only parameter needed) from the firewall and stores the source, destination, and services (ports) from that first API query, then queries the source and destination objects for their CIDRs. Then it scans all of its destination IPs and finds matching VPCs from the AWS account the environment is set up to use. Finally, it creates JSON that represents the security groups it will create later on.
class Policy:
def __init__(self, policyName):
self.api = loadAPI.createAPI()
print(f"Loading policy {policyName} from firewall.")
call_results = self.api.cmdb.firewall.policy.get(filter=f'"name=={policyName}"') #make API call for policy by name
if call_results: #if not empty, assign the needed variables to the first API result
first = call_results[0] #API likes to return a list, but it's always got one object - pretty much any time we directly use the API in here we're going to want to use the first index
self.name = first.get('name')
self.dstaddr = first.get('dstaddr')
self.srcaddr = first.get('srcaddr')
self.service = first.get('service')
self.comments = first.get('comments')
self.VPCs = []
for destination in self.getDestinations(): #check VPCs to see if any policy destinations match their networks - security groups will be applied to these VPCs
vpc = findVPCbyCIDR(destination)
if vpc and vpc not in self.VPCs:
self.VPCs.append(vpc)
if not self.VPCs:
print("No matching VPC found - will attempt to generate but vpc ID will be 'none'")
self.SG_JSON = self._generateJSON()
def __str__(self):
return f"{self.name}"
The full code is available in the repository linked at the bottom, but I ran into the same major headache I had with the dual-stacking script: the firewall uses name
for both address objects and address groups! And they have different API endpoints! And groups can contain both objects and other groups as members! AWS security groups lack logical or hierarchical organization; everything is a plain ole CIDR. That silver lining meant there was no need to recreate a group-member structure on this one, I just needed to iterate through and make sure I had every CIDR represented:
def _computeCIDRs (self, direction):
if direction == "src": #populate addressNames based on source or destination in policy
addressNames = self.srcaddr
elif direction == "dst":
addressNames = self.dstaddr
addressNameTable = list() #list that will contain the name of every address object based on src/dst list (resolves groups as well)
for address in addressNames: #step through policy's addresses and get all of the address objects from it
if address.get('name') not in ("all", "any"):
addressNameTable.append(self._getAddressObj(address.get('name')))
flatTable = [name for sublist in addressNameTable for name in sublist] #flatten the list of addressobject names from list of list of str to list of str
addressCIDRs = list() #create an empty list for CIDRs
for address in flatTable: #walk through addressobject names, call their data from the firewall, and convert them to a CIDR
addrCall = self.api.cmdb.firewall.address.get(name=address)
addrObj = addrCall[0]
if addrObj.get('type') == 'ipmask': #if it's a subnet or individual host
addressRaw = addrObj.get('subnet') #pull the ip info from the object
ip, netmask = addressRaw.split() #it's always one string, formatted [<ip address> <subnetmask>] so instead of regex we can just use split to assign ip and netmask
network = ipaddress.IPv4Network((ip, netmask),strict=False) #likewise, we can cheat the annoying math needed by using the ipaddress class
addressCIDRs.append(network.with_prefixlen) #add to list to be returned
elif addrObj.get('type') == 'iprange': #if it's a range
start = ipaddress.IPv4Address(addrObj.get('start-ip')) #create ipv4 address objects - can't do math on them normally but the class allows for casting to int for comparison
end = ipaddress.IPv4Address(addrObj.get('end-ip'))
for ip in range (int(start), int(end) + 1):
ip_current = ipaddress.IPv4Address(ip) #create an address based on where we are in loop
network = ipaddress.IPv4Network(f"{ip_current}/32") #make a network so we can add the CIDR below
addressCIDRs.append(network.with_prefixlen) #fake the CIDR since each is a host
addressCIDRs = self._cleanCIDRs(addressCIDRs) #check to see if any CIDRs are eclipsed, only return bigger ones
return addressCIDRs #return the list of CIDRs
I did have some funky issues with getting lists of one string from some API calls and just a string from others, so I had to make sure to flatten everything. I was, however, able to make some cool shortcuts thanks to Python’s built-in ipaddress object. I’m proud of how well I commented that snippet up above so feel free to follow and enjoy the logic up there.
Now that every policy had cleanly formatted JSON with everything needed to make a security group, I refactored the Terraform generation code to read from that newly formatted JSON instead of directly from the firewall.. and then I added a method with the previous boto3
code because I could! While I had the momentum I figured why not learn the AWS CLI as well! And so the bash version was born. The CLI version had its own challenge though – I actually had to write the IpPermissions
block to a temp file to make it re-useable for each rule, not to mention the headache of escaping characters to save a shell script from inside Python. All three methods of security group creation can be found in creation.py.
This project was a fantastic way to learn a bit more about the firewall API, and a whole lot more about programmatically interacting with AWS. This tool works in my real life environment (with some organization-specific changes), and might work in yours as well!
Link to repository: https://github.com/pmalley130/Fortigate-Policy-to-Terraform