129 lines
3.1 KiB
Python
129 lines
3.1 KiB
Python
import json
|
|
import os
|
|
import os.path
|
|
from collections import defaultdict
|
|
from typing import List
|
|
import time
|
|
|
|
import netaddr
|
|
import paho.mqtt.client as mqtt
|
|
import radiusd
|
|
|
|
|
|
def log(level, s):
|
|
"""Log function."""
|
|
radiusd.radlog(level, __name__ + ": " + s)
|
|
|
|
|
|
# The callback for when the client receives a CONNACK response from the server.
|
|
def on_connect(client, userdata, flags, rc):
|
|
log(radiusd.L_INFO, "Connected with result code " + str(rc))
|
|
|
|
# Subscribing in on_connect() means that if we lose the connection and
|
|
# reconnect then subscriptions will be renewed.
|
|
client.subscribe("router7/#")
|
|
|
|
|
|
def get_leases():
|
|
with open("/config/leases.json", encoding="utf-8") as file:
|
|
msgs = defaultdict(lambda: defaultdict(str), json.loads(file.read()))
|
|
|
|
return msgs
|
|
|
|
|
|
def select_acls(lease):
|
|
log(radiusd.L_INFO, str(lease))
|
|
log(radiusd.L_INFO, str(lease["vendor_identifier"]))
|
|
acls = []
|
|
if "MSFT" in lease["vendor_identifier"]:
|
|
acls.append("windows")
|
|
acls.append("web")
|
|
|
|
return acls
|
|
|
|
|
|
def ciscoize_acl_names(acls):
|
|
cisco = []
|
|
for acl in acls:
|
|
cisco.append(
|
|
tuple(
|
|
[
|
|
"Cisco-AVPair",
|
|
"+=",
|
|
f"ACS:CiscoSecure-Defined-ACL=#ACSACL#-IP-{acl}-588b68cd",
|
|
]
|
|
)
|
|
)
|
|
log(radiusd.L_INFO, str(cisco))
|
|
return cisco
|
|
|
|
|
|
def deciscoize_acl_name(acl_name):
|
|
return acl_name.split("#ACSACL#-IP-")[1][0:-9]
|
|
|
|
|
|
def get_acl(acl_name):
|
|
with open(f"acl/{acl_name}.acl", encoding="utf-8") as file:
|
|
return file.read().splitlines()
|
|
|
|
|
|
def ciscoize_acl(acl: List[str]):
|
|
cisco = []
|
|
for i, line in enumerate(acl):
|
|
cisco.append(
|
|
tuple(
|
|
[
|
|
"Cisco-AVPair",
|
|
"+=",
|
|
f"ip:inacl#{i+1}={line}",
|
|
]
|
|
)
|
|
)
|
|
return cisco
|
|
|
|
|
|
def authorize(p):
|
|
os.chdir(os.path.dirname(__file__))
|
|
log(radiusd.L_INFO, str(p))
|
|
print("*** authorize ***")
|
|
print("")
|
|
radiusd.radlog(radiusd.L_INFO, "*** radlog call in authorize ***")
|
|
print("")
|
|
print(p)
|
|
print("")
|
|
print(radiusd.config)
|
|
print("")
|
|
request = defaultdict(str, p["request"])
|
|
reply = [
|
|
("User-Name", request["User-Name"]),
|
|
]
|
|
|
|
if netaddr.valid_mac(request["User-Name"]):
|
|
mac = netaddr.EUI(request["User-Name"])
|
|
mac.dialect = netaddr.mac_unix_expanded
|
|
leases = get_leases()
|
|
log(radiusd.L_INFO, str(leases))
|
|
|
|
reply.extend(ciscoize_acl_names(select_acls(leases[str(mac)])))
|
|
elif "#ACSACL#" in request["User-Name"]:
|
|
reply.extend(ciscoize_acl(get_acl(deciscoize_acl_name(request["User-Name"]))))
|
|
|
|
conf = [
|
|
("Auth-Type", "Accept"),
|
|
]
|
|
|
|
log(radiusd.L_INFO, str(reply))
|
|
log(radiusd.L_INFO, str(conf))
|
|
return (radiusd.RLM_MODULE_OK, tuple(reply), tuple(conf))
|
|
|
|
|
|
def authenticate(p):
|
|
os.chdir(os.path.dirname(__file__))
|
|
log(radiusd.L_INFO, str(p))
|
|
radiusd.radlog(radiusd.L_INFO, "*** radlog call in authenticate ***")
|
|
print("")
|
|
print(p)
|
|
print("")
|
|
print(radiusd.config)
|
|
return radiusd.RLM_MODULE_OK
|