freeradius/freepydius.py
2022-04-26 15:54:25 -07:00

126 lines
3.1 KiB
Python

import json
import os
import os.path
from collections import defaultdict
import netaddr
import paho.mqtt.client as mqtt
import radiusd
def log(level, s):
"""Log function."""
radiusd.radlog(level, __name__ + ": " + s)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("router7/#")
def get_leases():
msgs = defaultdict(lambda: defaultdict(str))
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
lease = json.loads(msg.payload)
lease["identifier"] = os.path.basename(msg.topic)
msgs[lease["identifier"]] = defaultdict(str, lease)
client = mqtt.Client()
client.username_pw_set(os.getenv("MQTT_USERNAME"), os.getenv("MQTT_PASSWORD"))
client.on_connect = on_connect
client.on_message = on_message
client.connect("hassio.narnian.us", 1883, 60)
cont = True
count = 0
while cont:
client.loop(timeout=0.5)
client.loop(timeout=0.5)
client.loop(timeout=0.5)
client.loop(timeout=0.5)
if count == len(msgs):
break
count = len(msgs)
return msgs
def select_acls(lease):
acls = []
if lease["vendor_identifier"].contains("PS4"):
acls.append("DENY")
acls.append("WEB")
return acls
def ciscoize_acl_names(acls):
cisco = []
for acl in acls:
cisco.append(
tuple(
"Cisco-AVPair",
"+=",
f"ACS:CiscoSecure-Defined-ACL=#ACSACL#-${acl}-fuckcisc",
)
)
return cisco
def deciscoize_acl_name(acl_name):
return acl_name.split("#ACSACL#-")[1][0:-9]
def get_acl(acl_name):
with open(acl_name, encoding="utf-8") as f:
return f.read()
def authorize(p):
os.chdir(os.path.dirname(__file__))
log(radiusd.L_INFO, str(p))
print("*** authorize ***")
print("")
radiusd.radlog(radiusd.L_INFO, "*** radlog call in authorize ***")
print("")
print(p)
print("")
print(radiusd.config)
print("")
request = defaultdict(str, p["request"])
reply = [
("User-Name", request["User-Name"]),
]
if netaddr.valid_mac(request["User-Name"]):
leases = get_leases()
reply.extend(ciscoize_acl_names(select_acls(leases[request["User-Name"]])))
elif "#ACSACL#" in request["User-Name"]:
deciscoize_acl_name(request["User-Name"])
conf = [
("Auth-Type", "Accept"),
]
log(radiusd.L_INFO, str(reply))
log(radiusd.L_INFO, str(conf))
return (radiusd.RLM_MODULE_OK, tuple(reply), tuple(conf))
def authenticate(p):
os.chdir(os.path.dirname(__file__))
log(radiusd.L_INFO, str(p))
radiusd.radlog(radiusd.L_INFO, "*** radlog call in authenticate ***")
print("")
print(p)
print("")
print(radiusd.config)
return radiusd.RLM_MODULE_OK