129 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			129 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import json
 | 
						|
import os
 | 
						|
import os.path
 | 
						|
from collections import defaultdict
 | 
						|
from typing import List
 | 
						|
import time
 | 
						|
 | 
						|
import netaddr
 | 
						|
import paho.mqtt.client as mqtt
 | 
						|
import radiusd
 | 
						|
 | 
						|
 | 
						|
def log(level, s):
 | 
						|
    """Log function."""
 | 
						|
    radiusd.radlog(level, __name__ + ": " + s)
 | 
						|
 | 
						|
 | 
						|
# The callback for when the client receives a CONNACK response from the server.
 | 
						|
def on_connect(client, userdata, flags, rc):
 | 
						|
    log(radiusd.L_INFO, "Connected with result code " + str(rc))
 | 
						|
 | 
						|
    # Subscribing in on_connect() means that if we lose the connection and
 | 
						|
    # reconnect then subscriptions will be renewed.
 | 
						|
    client.subscribe("router7/#")
 | 
						|
 | 
						|
 | 
						|
def get_leases():
 | 
						|
    with open("/config/leases.json", encoding="utf-8") as file:
 | 
						|
        msgs = defaultdict(lambda: defaultdict(str), json.loads(file.read()))
 | 
						|
 | 
						|
    return msgs
 | 
						|
 | 
						|
 | 
						|
def select_acls(lease):
 | 
						|
    log(radiusd.L_INFO, str(lease))
 | 
						|
    log(radiusd.L_INFO, str(lease["vendor_identifier"]))
 | 
						|
    acls = []
 | 
						|
    if "MSFT" in lease["vendor_identifier"]:
 | 
						|
        acls.append("windows")
 | 
						|
        acls.append("web")
 | 
						|
 | 
						|
    return acls
 | 
						|
 | 
						|
 | 
						|
def ciscoize_acl_names(acls):
 | 
						|
    cisco = []
 | 
						|
    for acl in acls:
 | 
						|
        cisco.append(
 | 
						|
            tuple(
 | 
						|
                [
 | 
						|
                    "Cisco-AVPair",
 | 
						|
                    "+=",
 | 
						|
                    f"ACS:CiscoSecure-Defined-ACL=#ACSACL#-IP-{acl}-588b68cd",
 | 
						|
                ]
 | 
						|
            )
 | 
						|
        )
 | 
						|
    log(radiusd.L_INFO, str(cisco))
 | 
						|
    return cisco
 | 
						|
 | 
						|
 | 
						|
def deciscoize_acl_name(acl_name):
 | 
						|
    return acl_name.split("#ACSACL#-IP-")[1][0:-9]
 | 
						|
 | 
						|
 | 
						|
def get_acl(acl_name):
 | 
						|
    with open(f"acl/{acl_name}.acl", encoding="utf-8") as file:
 | 
						|
        return file.read().splitlines()
 | 
						|
 | 
						|
 | 
						|
def ciscoize_acl(acl: List[str]):
 | 
						|
    cisco = []
 | 
						|
    for i, line in enumerate(acl):
 | 
						|
        cisco.append(
 | 
						|
            tuple(
 | 
						|
                [
 | 
						|
                    "Cisco-AVPair",
 | 
						|
                    "+=",
 | 
						|
                    f"ip:inacl#{i+1}={line}",
 | 
						|
                ]
 | 
						|
            )
 | 
						|
        )
 | 
						|
    return cisco
 | 
						|
 | 
						|
 | 
						|
def authorize(p):
 | 
						|
    os.chdir(os.path.dirname(__file__))
 | 
						|
    log(radiusd.L_INFO, str(p))
 | 
						|
    print("*** authorize ***")
 | 
						|
    print("")
 | 
						|
    radiusd.radlog(radiusd.L_INFO, "*** radlog call in authorize ***")
 | 
						|
    print("")
 | 
						|
    print(p)
 | 
						|
    print("")
 | 
						|
    print(radiusd.config)
 | 
						|
    print("")
 | 
						|
    request = defaultdict(str, p["request"])
 | 
						|
    reply = [
 | 
						|
        ("User-Name", request["User-Name"]),
 | 
						|
    ]
 | 
						|
 | 
						|
    if netaddr.valid_mac(request["User-Name"]):
 | 
						|
        mac = netaddr.EUI(request["User-Name"])
 | 
						|
        mac.dialect = netaddr.mac_unix_expanded
 | 
						|
        leases = get_leases()
 | 
						|
        log(radiusd.L_INFO, str(leases))
 | 
						|
 | 
						|
        reply.extend(ciscoize_acl_names(select_acls(leases[str(mac)])))
 | 
						|
    elif "#ACSACL#" in request["User-Name"]:
 | 
						|
        reply.extend(ciscoize_acl(get_acl(deciscoize_acl_name(request["User-Name"]))))
 | 
						|
 | 
						|
    conf = [
 | 
						|
        ("Auth-Type", "Accept"),
 | 
						|
    ]
 | 
						|
 | 
						|
    log(radiusd.L_INFO, str(reply))
 | 
						|
    log(radiusd.L_INFO, str(conf))
 | 
						|
    return (radiusd.RLM_MODULE_OK, tuple(reply), tuple(conf))
 | 
						|
 | 
						|
 | 
						|
def authenticate(p):
 | 
						|
    os.chdir(os.path.dirname(__file__))
 | 
						|
    log(radiusd.L_INFO, str(p))
 | 
						|
    radiusd.radlog(radiusd.L_INFO, "*** radlog call in authenticate ***")
 | 
						|
    print("")
 | 
						|
    print(p)
 | 
						|
    print("")
 | 
						|
    print(radiusd.config)
 | 
						|
    return radiusd.RLM_MODULE_OK
 |