I have been using a self-written Python script for several years now to automatically provide OpenVPN configurations for our employees https://vpngw.im-c.de. A few weeks ago the script would no longer work. The reason were outdated libraries which I used to create the necessary things (CSR, CA, Cert, Keypair, ...). So it was time for an overhaul. Many things had to be completely redone. I want to present the results to you here today

What does the script do for you

To get a working OpenVPN Profile (.ovpn) File the following things are handled with the script:

  • Get group members from onprem Active Directory and creates a certificate for members of that group.
  • if the user is removed from the Group the created certificate gets revoked (CRL)
  • creates a keypair (private key, public key)
  • creates a CSR (Certificate Signing Request)
  • creates a client Certificate (CA is needed)
  • finaly creates a ovpn file which contains all needed Informations to open a tunnel

Environment Setup

Before we can start to code a python environment is needed. Inside this env we need some libraries. So we will start setting up our environment:

Get the requirements.txt and openvpnconfighead File and put this files in the created folder!

mkdir autovpn
cd autovpn
mkdir rootca
mkdir usercerts
python -m venv ./
source ./bin/activate
pip install -r requirements.txt

pip should install all needed dependencies. next we need to paste the ta.key to the openvpnconfighead.txt file. If you do not have a ta.key create one with the following command (openvpn needs to be installed):

openvpn --genkey --secret ta.key

The contents of the ta.key looks like this and must be copied between the <tls-auth>and </tls-auth> block:

#
# 2048 bit OpenVPN static key
#
-----BEGIN OpenVPN Static key V1-----
c53b6c099d6ccce8dd34a2cd93b740d1
53538aad3c1b6999182f052febb5676e
706ef75b98d19e709a7479129a46ecc3
ddc4289b78b66963a75db1f296f1f66d
e8ffb3f076a9c56e14aab1841583d9b1
96a626eae6258686f848d42552451a92
73d32189e7cd684ded2b2793875ea575
c6d8b0ce6e2c38f6941305de1a46b20c
a12732d8919a4e8e931b053d0dc67e7f
82f5ddf1096333e93b121040c36b5e22
68c7f5e832eeeb0f47120f777337c76e
e0ae6284c2d436335193f32580447648
c3854e8bfe480d442cec45463dc8aeee
da07062c1a95880e844f67c6d8ecd394
2704c491d95e73998b76d04ff8b1810f
c5dd6c6dd539624a4147515e29003558
-----END OpenVPN Static key V1-----

the openvpn server needs the same ta.key in his config but this is not part of this Post!

now we are ready for starting to write our Python Script:

vpnconfgen.py

First, we build a few auxiliary functions. The name of the functions corresponds to the task they have to perform 😉

# VPN-Config and Cert Creator created by ♞ Raffael.Willems
# Help needed? read https://cryptography.io/en/latest/x509/tutorial/ and https://cryptography.io/en/latest/hazmat/primitives/asymmetric/
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import datetime
import csv
import os
import ldap3
from progress.bar import Bar
from OpenSSL import crypto
myhome = "./"

# DEFINITION
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

print(bcolors.HEADER + "Raffis VPN-Config/Cert Creator V1.6\n"+bcolors.ENDC)

def make_keypair():
    return rsa.generate_private_key(public_exponent=65537, key_size=2048)

def make_csr(pkey, CN):
    return x509.CertificateSigningRequestBuilder().subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, CN)])).sign(pkey, hashes.SHA256())

def loadkeyfile(keyfile):
    with open(keyfile, 'rb') as fp:
        return serialization.load_pem_private_key(fp.read(), password=None)

def loadcertfile(certfile):
    with open(certfile, 'rb') as fp:
        return x509.load_pem_x509_certificate(fp.read())

def make_client_cert(csr, cakey, cacert, serial):
    subject_key_id = x509.SubjectKeyIdentifier.from_public_key(csr.public_key())  # Extract SKI from the CSR public key
    authority_key_id = x509.SubjectKeyIdentifier.from_public_key(cacert.public_key())  # Extract CA's SKI
    cert = x509.CertificateBuilder().subject_name(csr.subject)\
        .issuer_name(cacert.issuer)\
        .public_key(csr.public_key())\
        .serial_number(x509.random_serial_number())\
        .not_valid_before(datetime.datetime.now(datetime.timezone.utc))\
        .not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=3652))\
        .add_extension(x509.SubjectKeyIdentifier(subject_key_id.digest), critical=False)\
        .add_extension(x509.AuthorityKeyIdentifier(key_identifier=authority_key_id.digest,
                                                   authority_cert_issuer=[x509.DirectoryName(cacert.subject)],
                                                   authority_cert_serial_number=cacert.serial_number), critical=False)\
        .sign(cakey, hashes.SHA256())
    return cert

def extract_cn(ldap_dn):
    parsed_dn = ldap3.utils.dn.parse_dn(ldap_dn)
    for rdn in parsed_dn:
        if rdn[0].lower() == 'cn':
            return rdn[1]
    return None

Then we can build the make_ovpn_file function. In this function, the auxiliary functions are used to:

  1. create a keypair for the new client certificate
  2. create a CSR for the new client certificate
  3. create the client certificate (signed by our CA)
  4. converts the cert, key, ca, etc. to pem and build a ovpn file
  5. additionaly save the client and key to our usercerts folder
def make_ovpn_file(ca_cert, ca_key, clientname, serial, commonoptspath, filepath, dname):
    with open(commonoptspath, 'r') as f:  # Read our common options file first
        common = f.read()
    cacert = loadcertfile(ca_cert)  # load the Certificate Authority
    cakey = loadkeyfile(ca_key)  # load the CA keyfile (keyfile has no password)
    key = make_keypair()  # generate a new keypair for the new Client Certificate
    csr = make_csr(key, clientname)  # generate a new certificate signing request for the client certificate
    crt = make_client_cert(csr, cakey, cacert, serial)  # create client certificate and sign the created certificate with the CA
    # Now we have a successfully signed certificate. We must now
    # create a .ovpn file and then dump it somewhere.
    clientkey = key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
    clientcert = crt.public_bytes(serialization.Encoding.PEM).decode('utf-8')
    cacertdump = cacert.public_bytes(serialization.Encoding.PEM).decode('utf-8')

    ovpn = "%s<ca>\n%s</ca>\n<cert>\n%s</cert>\n<key>\n%s</key>\n" % (common, cacertdump, clientcert, clientkey)
    # Write our file.
    with open(filepath, 'w') as f:
        f.write(ovpn)
    # Write Cert and Key
    with open(myhome + 'usercerts/' + dname + '.crt', 'w') as f:
        f.write(clientcert)
    with open(myhome + 'usercerts/' + dname + '.key', 'w') as f:
        f.write(clientkey)
    with open(myhome + 'serials.db', 'a') as f:
        f.write(clientname)
        f.write(';')
        f.write(str(crt.serial_number))
        f.write('\n')

Now we are ready to start asking the AD and build the certificates.

Here we have to adapt some things to the respective Active Directory. I have highlighted the relevant lines

  • line 5 whe need a reachable Active Directory server.
  • line 6 a bind user to get the needed informations
  • line 15 the searchbase of your LDAP/GC
  • line 16 the filter to get the Group (GOG_VPN)
# START MAIN
if __name__ == "__main__":
    # Create LDAP Connection
    print(bcolors.OKBLUE + "Establish LDAP-Connection:"+bcolors.ENDC, end=' ')
    server = ldap3.Server('server IP or DNS', use_ssl=False)
    conn = ldap3.Connection(server, 'user', 'changeme', client_strategy=ldap3.SAFE_SYNC, auto_bind=True)
    conn.protocol_version = 3
    conn.bind()
    print(bcolors.OKGREEN + "OK!"+bcolors.ENDC)
    # print(conn)

# get all Members of Group
    print(bcolors.OKBLUE + "Get Group Members of GOG_SSLVPN:"+bcolors.ENDC, end=' ')
    result_id = conn.search(
        search_base='DC=contoso,DC=local',
        search_filter='(&(objectClass=group)(cn=GOG_VPN))',
        search_scope=ldap3.SUBTREE,
        attributes=['member'],
        size_limit=0
        )
    print(bcolors.OKGREEN + "OK!"+bcolors.ENDC)
    # print(result_id[2][0]["attributes"])
    # print(extract_cn(result_id[2][0]["attributes"]['member'][0]))

# Extract username from DN Object
    print(bcolors.OKBLUE + "Extracting Usernames vom DN-Array:"+bcolors.ENDC, end=' ')
    sauber = []
    for i in range(len(result_id[2][0]['attributes']['member'])):
        sauber.append(extract_cn(result_id[2][0]['attributes']['member'][i]).replace("\\", "").replace(", ", "_").replace(" ", "_"))
    # print(sauber)
    print(bcolors.OKGREEN + "OK!"+bcolors.ENDC)

# get list with names of created certs (serials.db)
    print(bcolors.OKBLUE+"create List of issued Certs:"+bcolors.ENDC, end=' ')
    list_of_certs = []
    list_of_certserials = []
    with open(myhome + 'serials.db', 'r') as readobj:
        csv_reader = csv.reader(readobj, delimiter=';')
        for row in csv_reader:
            list_of_certs.append(row[0])
            list_of_certserials.append(row)
    print(bcolors.OKGREEN + "OK!"+bcolors.ENDC)

# Create new Certs
    print(bcolors.OKBLUE+"Creating Certs:"+bcolors.WARNING)
    bar = Bar('verarbeite', max=len(sauber)-len(list_of_certs))
    zaehler = 0
    for item in sauber:
        if (item not in list_of_certs):
            zaehler = zaehler + 1
            # print(item)
            make_ovpn_file(myhome + "rootca/ca.pem", myhome + "rootca/canew.key", item, len(list_of_certs) + zaehler, myhome + "openvpnconfighead", "/srv/http/configs/" + item + ".ovpn", item)
            bar.next()
    bar.finish()

We have now created a working OpenVPN client configuration. But what happens if a user is removed from the group of authorized users for the VPN? The issued certificate must be revoked. This can be done with the help of a CRL (Certificate Revocation List).

The last block in our script does exactly that. A CRL is created with the users who are no longer in the group. This can be checked using the serials.db file:

# Create CRL
    print(bcolors.OKBLUE+"Creating CRL:"+bcolors.ENDC, end=' ')
    # Load the CA cert and private key
    cacert = loadcertfile(myhome + "rootca/ca.pem")
    cakey = loadkeyfile(myhome + "rootca/canew.key")

    # Prepare the revoked certificates list
    revoked_list = []
    # Iterate over the revoked certificates
    with open(myhome + 'revoked.db', 'w') as f:
        for item in list_of_certs:
            if item not in sauber:
                for cert_serial in list_of_certserials:
                    if item in cert_serial:
                        revocation_date = datetime.datetime.now(datetime.UTC)
                        revocation_date_str = revocation_date.strftime('%Y%m%d%H%M%SZ')  # RFC 5280 revocation date format
                        revoked_cert = (
                            x509.RevokedCertificateBuilder()
                            .serial_number(int(cert_serial[1]))  # Ensure serial is int
                            .revocation_date(revocation_date)
                            .add_extension(x509.CRLReason(x509.ReasonFlags("affiliationChanged")), critical=False)  # Add the revocation reason
                            .build()
                        )
                        revoked_list.append(revoked_cert)
    # Create the CRL using the revoked certificates
    crl_builder = x509.CertificateRevocationListBuilder()
    crl = (
        crl_builder
        .issuer_name(cacert.subject)  # Issuer is the CA cert subject
        .last_update(datetime.datetime.now(datetime.UTC))
        .next_update(datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=30))  # Optional, you can set the next update period
    )

    # Add the revoked certificates to the CRL
    for revoked in revoked_list:
        crl = crl.add_revoked_certificate(revoked)
    crl = crl.sign(cakey, algorithm=hashes.SHA256())

    # Export the CRL in PEM format
    with open(myhome + 'crl.pem', 'wb') as f:
        f.write(crl.public_bytes(encoding=serialization.Encoding.PEM))
    with open('/etc/openvpn/server/crl.pem', 'wb') as fo:
        fo.write(crl.public_bytes(encoding=serialization.Encoding.PEM))
    print(bcolors.OKGREEN + "OK!" + bcolors.ENDC)

Previous Post Next Post