|
|
@ -0,0 +1,246 @@ |
|
|
|
|
|
#!/bin/python3 |
|
|
|
|
|
|
|
|
|
|
|
from typing import ( |
|
|
|
|
|
Optional, |
|
|
|
|
|
Tuple |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
import yaml |
|
|
|
|
|
import requests |
|
|
|
|
|
import socket |
|
|
|
|
|
import requests.packages.urllib3.util.connection as urllib3_cn |
|
|
|
|
|
|
|
|
|
|
|
class GandiDNS: |
|
|
|
|
|
api_endpoint = "https://dns.api.gandi.net/api/v5" |
|
|
|
|
|
api_key = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, key: str): |
|
|
|
|
|
""" |
|
|
|
|
|
Constructor |
|
|
|
|
|
""" |
|
|
|
|
|
self.api_key = key |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_api(self, endpoint: str) -> dict: |
|
|
|
|
|
""" |
|
|
|
|
|
Calls the API with a GET request on a given endpoint |
|
|
|
|
|
""" |
|
|
|
|
|
r = requests.get( |
|
|
|
|
|
f"{ self.api_endpoint }/{ endpoint }", |
|
|
|
|
|
headers={ |
|
|
|
|
|
"Content-Type": "application/json", |
|
|
|
|
|
"X-Api-Key": self.api_key |
|
|
|
|
|
} |
|
|
|
|
|
) |
|
|
|
|
|
r.raise_for_status() |
|
|
|
|
|
return r.json() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _post_api(self, endpoint: str, payload: Optional[dict] = None) -> dict: |
|
|
|
|
|
""" |
|
|
|
|
|
Calls the API with a POST request on a given endpoint |
|
|
|
|
|
|
|
|
|
|
|
Arguments: |
|
|
|
|
|
endpoint: api endpoint to call |
|
|
|
|
|
payload: a dict that will be transformed in JSON |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_domain_uuid(self, domain: str) -> str: |
|
|
|
|
|
""" |
|
|
|
|
|
Get the Zone UUID for a specific domain name |
|
|
|
|
|
""" |
|
|
|
|
|
return self._get_api( f"domains/{ domain }" )["zone_uuid"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _replace_ip_record(self, uuid: str, subdomain: str, record: str, ip: str, ttl: int) -> Optional[str]: |
|
|
|
|
|
""" |
|
|
|
|
|
Replaces the IP in a DNS record |
|
|
|
|
|
|
|
|
|
|
|
Arguments: |
|
|
|
|
|
uuid: Zone UUID of the domain to change (see `get_domain_uuid()`) |
|
|
|
|
|
subdomain: name of the subdomain to change |
|
|
|
|
|
record: either 'A' for IPv4, or 'AAAA' for IPv6 |
|
|
|
|
|
ip: IP address to use |
|
|
|
|
|
ttl: time to live of that record |
|
|
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
|
None if the IP address was already correct, |
|
|
|
|
|
the previous IP address otherwise |
|
|
|
|
|
""" |
|
|
|
|
|
endpoint = f"zones/{uuid}/records/{subdomain}/{record}" |
|
|
|
|
|
|
|
|
|
|
|
previous_ip = self._get_api(endpoint)["rrset_values"][0] |
|
|
|
|
|
if previous_ip == ip: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
self._post_api(endpoint, { |
|
|
|
|
|
"rrset_ttl": ttl, |
|
|
|
|
|
"rrset_values": [ip] |
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
return previous_ip |
|
|
|
|
|
|
|
|
|
|
|
def replace_ipv4_record(self, uuid: str, subdomain: str, ip: str, ttl: int) -> Optional[str]: |
|
|
|
|
|
""" |
|
|
|
|
|
Replaces an IPv4 in a DNS record. |
|
|
|
|
|
See _replace_ip_record() for arguments and return values |
|
|
|
|
|
""" |
|
|
|
|
|
return self._replace_ip_record(uuid, subdomain, 'A', ip, ttl) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def replace_ipv6_record(self, uuid: str, subdomain: str, ip: str, ttl: int) -> Optional[str]: |
|
|
|
|
|
""" |
|
|
|
|
|
Replaces an IPv6 in a DNS record. |
|
|
|
|
|
See _replace_ip_record() for arguments and return values |
|
|
|
|
|
""" |
|
|
|
|
|
return self._replace_ip_record(uuid, subdomain, 'AAAA', ip, ttl) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
############# |
|
|
|
|
|
# Functions # |
|
|
|
|
|
############# |
|
|
|
|
|
|
|
|
|
|
|
def force_ipv4(): |
|
|
|
|
|
""" |
|
|
|
|
|
Function to be used as allowed_gai_family, but forces IPv4 |
|
|
|
|
|
""" |
|
|
|
|
|
return socket.AF_INET |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def force_ipv6(): |
|
|
|
|
|
""" |
|
|
|
|
|
Function to be used as allowed_gai_family, but forces IPv6 |
|
|
|
|
|
""" |
|
|
|
|
|
return socket.AF_INET6 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_ip( lookup_v4: str, lookup_v6: Optional[str], *, use_ipv6 = True ) -> Tuple[ str, Optional[str] ]: |
|
|
|
|
|
""" |
|
|
|
|
|
Gets both IPv4 and IPv6 (if possible) of the current computer |
|
|
|
|
|
|
|
|
|
|
|
This uses an external server to find the actual IP. |
|
|
|
|
|
This server should return nothing but the IP. |
|
|
|
|
|
One such server is https://homnomnom.fr/ip/ |
|
|
|
|
|
|
|
|
|
|
|
Arguments: |
|
|
|
|
|
lookup_v4: URL to use for looking up the IPv4 address |
|
|
|
|
|
lookup_v6: URL to use for looking up the IPv6 address. |
|
|
|
|
|
If None, will use lookup_v4 for this |
|
|
|
|
|
use_ipv6: set to False to avoid looking up IPv6 |
|
|
|
|
|
|
|
|
|
|
|
Return: |
|
|
|
|
|
A tuple [ IPv4, IPv6 ] of the addresses. |
|
|
|
|
|
IPv6 may be None if IPv6 is not available, or if use_ipv6 is False |
|
|
|
|
|
""" |
|
|
|
|
|
gai_family = urllib3_cn.allowed_gai_family |
|
|
|
|
|
|
|
|
|
|
|
urllib3_cn.allowed_gai_family = force_ipv4 |
|
|
|
|
|
r = requests.get(lookup_v4) |
|
|
|
|
|
r.raise_for_status() |
|
|
|
|
|
ipv4 = r.text |
|
|
|
|
|
|
|
|
|
|
|
ipv6 = None |
|
|
|
|
|
if urllib3_cn.HAS_IPV6 and use_ipv6: |
|
|
|
|
|
if lookup_v6 is None: |
|
|
|
|
|
lookup_v6 = lookup_v4 |
|
|
|
|
|
|
|
|
|
|
|
urllib3_cn.allowed_gai_family = force_ipv6 |
|
|
|
|
|
try: |
|
|
|
|
|
r = requests.get(lookup_v6) |
|
|
|
|
|
r.raise_for_status() |
|
|
|
|
|
ipv6 = r.text |
|
|
|
|
|
except OSError: |
|
|
|
|
|
ipv6 = None |
|
|
|
|
|
|
|
|
|
|
|
# stop forcing IPv4 / IPv6 |
|
|
|
|
|
urllib3_cn.allowed_gai_family = gai_family |
|
|
|
|
|
return ( ipv4, ipv6 ) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_script_arguments(): |
|
|
|
|
|
""" |
|
|
|
|
|
Parses the arguments |
|
|
|
|
|
""" |
|
|
|
|
|
import argparse |
|
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
|
|
|
|
description="Gandi dynamic DNS" |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
parser.add_argument("config", type=str, |
|
|
|
|
|
help="The config file to use" |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
parser.add_argument("--key", type=str, |
|
|
|
|
|
help="API key to use" |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def display_ip_change( subdomain: str, host: str, old_ip: Optional[str], new_ip: Optional[str]): |
|
|
|
|
|
""" |
|
|
|
|
|
Displays if the old IP was OK or if it was changed |
|
|
|
|
|
""" |
|
|
|
|
|
header = f" - {subdomain}.{host}" |
|
|
|
|
|
if old_ip is None: |
|
|
|
|
|
print( f"{header}: IP already OK") |
|
|
|
|
|
else: |
|
|
|
|
|
print( f"{header}: replaced {old_ip} with {new_ip}") |
|
|
|
|
|
|
|
|
|
|
|
############### |
|
|
|
|
|
# Main script # |
|
|
|
|
|
############### |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
args = parse_script_arguments() |
|
|
|
|
|
with open(args.config, 'r') as file: |
|
|
|
|
|
params = yaml.safe_load(file) |
|
|
|
|
|
|
|
|
|
|
|
if args.key is not None: |
|
|
|
|
|
params["api_key"] = args.key |
|
|
|
|
|
|
|
|
|
|
|
gandi = GandiDNS( params["api_key"] ) |
|
|
|
|
|
|
|
|
|
|
|
ipv4, ipv6 = get_ip( |
|
|
|
|
|
params["ip_lookup"]["v4"], |
|
|
|
|
|
params["ip_lookup"]["v6"] |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
print( f"IPv4 detected: {ipv4}") |
|
|
|
|
|
if ipv6 is not None: |
|
|
|
|
|
print( f"IPv6 detected: {ipv6}") |
|
|
|
|
|
else: |
|
|
|
|
|
print( "IPv6 not supported") |
|
|
|
|
|
|
|
|
|
|
|
for host in params["hosts"]: |
|
|
|
|
|
assert "name" in host |
|
|
|
|
|
assert "ttl" in host |
|
|
|
|
|
assert "domains" in host |
|
|
|
|
|
|
|
|
|
|
|
hostname = host["name"] |
|
|
|
|
|
print( f"{hostname}:" ) |
|
|
|
|
|
|
|
|
|
|
|
uuid = gandi.get_domain_uuid(hostname) |
|
|
|
|
|
print( f" - UUID: {uuid}" ) |
|
|
|
|
|
|
|
|
|
|
|
domains = host["domains"] |
|
|
|
|
|
if "v4" in domains: |
|
|
|
|
|
for domain in domains["v4"]: |
|
|
|
|
|
display_ip_change( |
|
|
|
|
|
domain, |
|
|
|
|
|
hostname, |
|
|
|
|
|
gandi.replace_ipv4_record(uuid, domain, ipv4, host["ttl"]), |
|
|
|
|
|
ipv4 |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if "v6" in domains and ipv6 is not None: |
|
|
|
|
|
for domain in domains["v6"]: |
|
|
|
|
|
display_ip_change( |
|
|
|
|
|
domain, |
|
|
|
|
|
hostname, |
|
|
|
|
|
gandi.replace_ipv6_record(uuid, domain, ipv6, host["ttl"]), |
|
|
|
|
|
ipv6 |
|
|
|
|
|
) |