Browse Source

black

radius_api
chapeau 5 years ago
parent
commit
118f5af269
  1. 4
      freeradius_utils/auth.py
  2. 5
      radius/api/serializers.py
  3. 21
      radius/api/urls.py
  4. 85
      radius/api/views.py
  5. 2
      radius/urls.py

4
freeradius_utils/auth.py

@ -96,7 +96,7 @@ def radius_event(fun):
""" """
def new_f(auth_data): def new_f(auth_data):
"""The function transforming the tuples as dict """ """ The function transforming the tuples as dict """
if isinstance(auth_data, dict): if isinstance(auth_data, dict):
data = auth_data data = auth_data
else: else:
@ -161,7 +161,7 @@ def authorize(data):
@radius_event @radius_event
def post_auth(data): def post_auth(data):
""" Function called after the user is authenticated""" """Function called after the user is authenticated"""
nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None)) nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None))
nas_instance = find_nas_from_request(nas) nas_instance = find_nas_from_request(nas)

5
radius/api/serializers.py

@ -38,8 +38,7 @@ class InterfaceSerializer(Serializer):
user_pk = serializers.CharField(source="machine.user.pk") user_pk = serializers.CharField(source="machine.user.pk")
machine_short_name = serializers.CharField(source="machine.short_name") machine_short_name = serializers.CharField(source="machine.short_name")
is_ban = serializers.BooleanField(source="machine.user.is_ban") is_ban = serializers.BooleanField(source="machine.user.is_ban")
vlan_id = serializers.IntegerField( vlan_id = serializers.IntegerField(source="machine_type.ip_type.vlan.vlan_id")
source="machine_type.ip_type.vlan.vlan_id")
class NasSerializer(Serializer): class NasSerializer(Serializer):
@ -108,6 +107,7 @@ class AuthorizeResponseSerializer(Serializer):
"""Serializer for AuthorizeResponse objects """Serializer for AuthorizeResponse objects
See views.py for the declaration of AuthorizeResponse See views.py for the declaration of AuthorizeResponse
""" """
nas = NasSerializer(read_only=True) nas = NasSerializer(read_only=True)
user = UserSerializer(read_only=True) user = UserSerializer(read_only=True)
user_interface = InterfaceSerializer(read_only=True) user_interface = InterfaceSerializer(read_only=True)
@ -117,6 +117,7 @@ class PostAuthResponseSerializer(Serializer):
"""Serializer for PostAuthResponse objects """Serializer for PostAuthResponse objects
See views.py for the declaration of PostAuthResponse See views.py for the declaration of PostAuthResponse
""" """
nas = NasSerializer(read_only=True) nas = NasSerializer(read_only=True)
room_users = UserSerializer(many=True) room_users = UserSerializer(many=True)
port = PortSerializer() port = PortSerializer()

21
radius/api/urls.py

@ -22,11 +22,20 @@
from . import views from . import views
urls_functional_view = [ urls_functional_view = [
(r"radius/authorize/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", (
views.authorize, None), r"radius/authorize/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
(r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$", views.authorize,
views.post_auth, None), None,
(r"radius/autoregister/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", ),
views.autoregister_machine, None), (
r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$",
views.post_auth,
None,
),
(
r"radius/autoregister/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.autoregister_machine,
None,
),
(r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", views.assign_ip, None), (r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", views.assign_ip, None),
] ]

85
radius/api/views.py

@ -35,8 +35,7 @@ from re2o.acl import can_view_all_api, can_edit_all_api, can_create_api
class AuthorizeResponse: class AuthorizeResponse:
"""Contains objects the radius needs for the Authorize step """Contains objects the radius needs for the Authorize step"""
"""
def __init__(self, nas, user, user_interface): def __init__(self, nas, user, user_interface):
self.nas = nas self.nas = nas
@ -44,12 +43,11 @@ class AuthorizeResponse:
self.user_interface = user_interface self.user_interface = user_interface
def can_view(self, user): def can_view(self, user):
"""Method to bypass api permissions, because we are using ACL decorators """Method to bypass api permissions, because we are using ACL decorators"""
"""
return (True, None, None) return (True, None, None)
@api_view(['GET']) @api_view(["GET"])
@login_required @login_required
@can_view_all_api(Interface, Domain, IpList, Nas, User) @can_view_all_api(Interface, Domain, IpList, Nas, User)
def authorize(request, nas_id, username, mac_address): def authorize(request, nas_id, username, mac_address):
@ -66,13 +64,11 @@ def authorize(request, nas_id, username, mac_address):
# get the Nas object which made the request (if exists) # get the Nas object which made the request (if exists)
nas_interface = Interface.objects.filter( nas_interface = Interface.objects.filter(
Q(domain__name=nas_id) Q(domain__name=nas_id) | Q(ipv4__ipv4=nas_id)
| Q(ipv4__ipv4=nas_id)
).first() ).first()
nas_type = None nas_type = None
if nas_interface: if nas_interface:
nas_type = Nas.objects.filter( nas_type = Nas.objects.filter(nas_type=nas_interface.machine_type).first()
nas_type=nas_interface.machine_type).first()
# get the User corresponding to the username in the URL # get the User corresponding to the username in the URL
# If no username was provided (wired connection), username="None" # If no username was provided (wired connection), username="None"
@ -82,16 +78,28 @@ def authorize(request, nas_id, username, mac_address):
user_interface = Interface.objects.filter(mac_address=mac_address).first() user_interface = Interface.objects.filter(mac_address=mac_address).first()
serialized = serializers.AuthorizeResponseSerializer( serialized = serializers.AuthorizeResponseSerializer(
AuthorizeResponse(nas_type, user, user_interface)) AuthorizeResponse(nas_type, user, user_interface)
)
return Response(data=serialized.data) return Response(data=serialized.data)
class PostAuthResponse: class PostAuthResponse:
"""Contains objects the radius needs for the Post-Auth step """Contains objects the radius needs for the Post-Auth step"""
"""
def __init__(
def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE): self,
nas,
room_users,
port,
port_profile,
switch,
user_interface,
radius_option,
EMAIL_STATE_UNVERIFIED,
RADIUS_OPTION_REJECT,
USER_STATE_ACTIVE,
):
self.nas = nas self.nas = nas
self.room_users = room_users self.room_users = room_users
self.port = port self.port = port
@ -104,12 +112,11 @@ class PostAuthResponse:
self.USER_STATE_ACTIVE = USER_STATE_ACTIVE self.USER_STATE_ACTIVE = USER_STATE_ACTIVE
def can_view(self, user): def can_view(self, user):
"""Method to bypass api permissions, because we are using ACL decorators """Method to bypass api permissions, because we are using ACL decorators"""
"""
return (True, None, None) return (True, None, None)
@api_view(['GET']) @api_view(["GET"])
@login_required @login_required
@can_view_all_api(Interface, Domain, IpList, Nas, Switch, Port, User) @can_view_all_api(Interface, Domain, IpList, Nas, Switch, Port, User)
def post_auth(request, nas_id, nas_port, user_mac): def post_auth(request, nas_id, nas_port, user_mac):
@ -125,20 +132,19 @@ def post_auth(request, nas_id, nas_port, user_mac):
""" """
# get the Nas object which made the request (if exists) # get the Nas object which made the request (if exists)
nas_interface = Interface.objects.prefetch_related("machine__switch__stack").filter( nas_interface = (
Q(domain__name=nas_id) Interface.objects.prefetch_related("machine__switch__stack")
| Q(ipv4__ipv4=nas_id) .filter(Q(domain__name=nas_id) | Q(ipv4__ipv4=nas_id))
).first() .first()
)
nas_type = None nas_type = None
if nas_interface: if nas_interface:
nas_type = Nas.objects.filter( nas_type = Nas.objects.filter(nas_type=nas_interface.machine_type).first()
nas_type=nas_interface.machine_type).first()
# get the switch (if wired connection) # get the switch (if wired connection)
switch = None switch = None
if nas_interface: if nas_interface:
switch = Switch.objects.filter( switch = Switch.objects.filter(machine_ptr=nas_interface.machine).first()
machine_ptr=nas_interface.machine).first()
# If the switch is part of a stack, get the correct object # If the switch is part of a stack, get the correct object
if hasattr(nas_interface.machine, "switch"): if hasattr(nas_interface.machine, "switch"):
@ -187,12 +193,24 @@ def post_auth(request, nas_id, nas_port, user_mac):
USER_STATE_ACTIVE = User.STATE_ACTIVE USER_STATE_ACTIVE = User.STATE_ACTIVE
serialized = serializers.PostAuthResponseSerializer( serialized = serializers.PostAuthResponseSerializer(
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE)) PostAuthResponse(
nas_type,
room_users,
port,
port_profile,
switch,
user_interface,
radius_option,
EMAIL_STATE_UNVERIFIED,
RADIUS_OPTION_REJECT,
USER_STATE_ACTIVE,
)
)
return Response(data=serialized.data) return Response(data=serialized.data)
@api_view(['GET']) @api_view(["GET"])
@login_required @login_required
@can_view_all_api(Interface, Domain, IpList, Nas, User) @can_view_all_api(Interface, Domain, IpList, Nas, User)
@can_edit_all_api(User, Domain, Machine, Interface) @can_edit_all_api(User, Domain, Machine, Interface)
@ -209,13 +227,11 @@ def autoregister_machine(request, nas_id, username, mac_address):
400 if it failed, and the reason why 400 if it failed, and the reason why
""" """
nas_interface = Interface.objects.filter( nas_interface = Interface.objects.filter(
Q(domain__name=nas_id) Q(domain__name=nas_id) | Q(ipv4__ipv4=nas_id)
| Q(ipv4__ipv4=nas_id)
).first() ).first()
nas_type = None nas_type = None
if nas_interface: if nas_interface:
nas_type = Nas.objects.filter( nas_type = Nas.objects.filter(nas_type=nas_interface.machine_type).first()
nas_type=nas_interface.machine_type).first()
user = User.objects.filter(pseudo__iexact=username).first() user = User.objects.filter(pseudo__iexact=username).first()
@ -225,7 +241,7 @@ def autoregister_machine(request, nas_id, username, mac_address):
return Response(reason, status=400) return Response(reason, status=400)
@api_view(['GET']) @api_view(["GET"])
@can_view_all_api(Interface) @can_view_all_api(Interface)
@can_edit_all_api(Interface) @can_edit_all_api(Interface)
def assign_ip(request, mac_address): def assign_ip(request, mac_address):
@ -238,10 +254,7 @@ def assign_ip(request, mac_address):
200 if it worked 200 if it worked
400 if it failed, and the reason why 400 if it failed, and the reason why
""" """
interface = ( interface = Interface.objects.filter(mac_address=mac_address).first()
Interface.objects.filter(mac_address=mac_address)
.first()
)
try: try:
interface.assign_ipv4() interface.assign_ipv4()

2
radius/urls.py

@ -23,4 +23,4 @@ from . import views
urlpatterns = [] urlpatterns = []
app_name = "radius" app_name = "radius"

Loading…
Cancel
Save