freeradius/freepydius.py
lordwelch abd70a0d72 Move mqtt to a separate service
mqtt does not want to run under freeradius
2022-05-01 19:33:49 -07:00

129 lines
3.1 KiB
Python

import json
import os
import os.path
from collections import defaultdict
from typing import List
import time
import netaddr
import paho.mqtt.client as mqtt
import radiusd
def log(level, s):
"""Log function."""
radiusd.radlog(level, __name__ + ": " + s)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
log(radiusd.L_INFO, "Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("router7/#")
def get_leases():
with open("/config/leases.json", encoding="utf-8") as file:
msgs = defaultdict(lambda: defaultdict(str), json.loads(file.read()))
return msgs
def select_acls(lease):
log(radiusd.L_INFO, str(lease))
log(radiusd.L_INFO, str(lease["vendor_identifier"]))
acls = []
if "MSFT" in lease["vendor_identifier"]:
acls.append("windows")
acls.append("web")
return acls
def ciscoize_acl_names(acls):
cisco = []
for acl in acls:
cisco.append(
tuple(
[
"Cisco-AVPair",
"+=",
f"ACS:CiscoSecure-Defined-ACL=#ACSACL#-IP-{acl}-588b68cd",
]
)
)
log(radiusd.L_INFO, str(cisco))
return cisco
def deciscoize_acl_name(acl_name):
return acl_name.split("#ACSACL#-IP-")[1][0:-9]
def get_acl(acl_name):
with open(f"acl/{acl_name}.acl", encoding="utf-8") as file:
return file.read().splitlines()
def ciscoize_acl(acl: List[str]):
cisco = []
for i, line in enumerate(acl):
cisco.append(
tuple(
[
"Cisco-AVPair",
"+=",
f"ip:inacl#{i+1}={line}",
]
)
)
return cisco
def authorize(p):
os.chdir(os.path.dirname(__file__))
log(radiusd.L_INFO, str(p))
print("*** authorize ***")
print("")
radiusd.radlog(radiusd.L_INFO, "*** radlog call in authorize ***")
print("")
print(p)
print("")
print(radiusd.config)
print("")
request = defaultdict(str, p["request"])
reply = [
("User-Name", request["User-Name"]),
]
if netaddr.valid_mac(request["User-Name"]):
mac = netaddr.EUI(request["User-Name"])
mac.dialect = netaddr.mac_unix_expanded
leases = get_leases()
log(radiusd.L_INFO, str(leases))
reply.extend(ciscoize_acl_names(select_acls(leases[str(mac)])))
elif "#ACSACL#" in request["User-Name"]:
reply.extend(ciscoize_acl(get_acl(deciscoize_acl_name(request["User-Name"]))))
conf = [
("Auth-Type", "Accept"),
]
log(radiusd.L_INFO, str(reply))
log(radiusd.L_INFO, str(conf))
return (radiusd.RLM_MODULE_OK, tuple(reply), tuple(conf))
def authenticate(p):
os.chdir(os.path.dirname(__file__))
log(radiusd.L_INFO, str(p))
radiusd.radlog(radiusd.L_INFO, "*** radlog call in authenticate ***")
print("")
print(p)
print("")
print(radiusd.config)
return radiusd.RLM_MODULE_OK