import json import os import os.path from collections import defaultdict from typing import List import time import netaddr import paho.mqtt.client as mqtt import radiusd def log(level, s): """Log function.""" radiusd.radlog(level, __name__ + ": " + s) # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): log(radiusd.L_INFO, "Connected with result code " + str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("router7/#") def get_leases(): with open("/config/leases.json", encoding="utf-8") as file: msgs = defaultdict(lambda: defaultdict(str), json.loads(file.read())) return msgs def select_acls(lease): log(radiusd.L_INFO, str(lease)) log(radiusd.L_INFO, str(lease["vendor_identifier"])) acls = [] if "MSFT" in lease["vendor_identifier"]: acls.append("windows") acls.append("web") return acls def ciscoize_acl_names(acls): cisco = [] for acl in acls: cisco.append( tuple( [ "Cisco-AVPair", "+=", f"ACS:CiscoSecure-Defined-ACL=#ACSACL#-IP-{acl}-588b68cd", ] ) ) log(radiusd.L_INFO, str(cisco)) return cisco def deciscoize_acl_name(acl_name): return acl_name.split("#ACSACL#-IP-")[1][0:-9] def get_acl(acl_name): with open(f"acl/{acl_name}.acl", encoding="utf-8") as file: return file.read().splitlines() def ciscoize_acl(acl: List[str]): cisco = [] for i, line in enumerate(acl): cisco.append( tuple( [ "Cisco-AVPair", "+=", f"ip:inacl#{i+1}={line}", ] ) ) return cisco def authorize(p): os.chdir(os.path.dirname(__file__)) log(radiusd.L_INFO, str(p)) print("*** authorize ***") print("") radiusd.radlog(radiusd.L_INFO, "*** radlog call in authorize ***") print("") print(p) print("") print(radiusd.config) print("") request = defaultdict(str, p["request"]) reply = [ ("User-Name", request["User-Name"]), ] if netaddr.valid_mac(request["User-Name"]): mac = netaddr.EUI(request["User-Name"]) mac.dialect = netaddr.mac_unix_expanded leases = get_leases() log(radiusd.L_INFO, str(leases)) reply.extend(ciscoize_acl_names(select_acls(leases[str(mac)]))) elif "#ACSACL#" in request["User-Name"]: reply.extend(ciscoize_acl(get_acl(deciscoize_acl_name(request["User-Name"])))) conf = [ ("Auth-Type", "Accept"), ] log(radiusd.L_INFO, str(reply)) log(radiusd.L_INFO, str(conf)) return (radiusd.RLM_MODULE_OK, tuple(reply), tuple(conf)) def authenticate(p): os.chdir(os.path.dirname(__file__)) log(radiusd.L_INFO, str(p)) radiusd.radlog(radiusd.L_INFO, "*** radlog call in authenticate ***") print("") print(p) print("") print(radiusd.config) return radiusd.RLM_MODULE_OK