-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathexample-06.py
More file actions
135 lines (102 loc) · 3.77 KB
/
example-06.py
File metadata and controls
135 lines (102 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
This example illustrates how to use an authorization strategy having more than one set
of criteria to handle authorization, and how to decorate methods so they require
authorization.
"""
from __future__ import annotations
import asyncio
from guardpost import (
AuthorizationContext,
AuthorizationError,
AuthorizationStrategy,
Identity,
Policy,
Requirement,
)
from guardpost.common import AuthenticatedRequirement
class MyAppContext:
"""
This represents a context for an application - it can be anything depending on
use cases and the user's notion of application context.
"""
def __init__(self, identity: Identity) -> None:
self.identity = identity
class RoleRequirement(Requirement):
"""Require a role to authorize an operation."""
def __init__(self, role: str) -> None:
self._role = role
def handle(self, context: AuthorizationContext):
# EXAMPLE: implement here the desired notion / requirements for authorization
#
roles = context.identity["roles"]
if roles and self._role in roles:
context.succeed(self)
else:
context.fail(f"The user lacks role: {self._role}")
# NOTE: a Requirement.handle method can also be async!
async def main():
# In this example, we need to configure a function that can obtain the user identity
# from the application context.
def get_identity(context: MyAppContext):
return context.identity
auth = AuthorizationStrategy().with_default_policy(
Policy("default", AuthenticatedRequirement())
)
auth.identity_getter = get_identity
auth += Policy("admin", RoleRequirement("ADMIN"))
auth += Policy("owner", RoleRequirement("OWNER"))
@auth()
async def some_method(context: MyAppContext):
"""Example method that requires authorization using the default policy."""
@auth("owner")
async def create_user(context: MyAppContext):
"""Example method that requires an OWNER role"""
@auth("admin")
async def create_product(context: MyAppContext):
"""Example method that requires an ADMIN role"""
last_error: AuthorizationError | None = None
# The following will cause an authorization error because the user identity is not
# authenticated, and the default auth policy requires an authenticated user
try:
await some_method(MyAppContext(Identity()))
except AuthorizationError as error:
last_error = error
assert last_error is not None
last_error = None
# The following will work because the context identity is authenticated with a mode
await some_method(
MyAppContext(
Identity({"id": "this is an example"}, authentication_mode="Cookie")
)
)
# The following will cause an authorization error because the user identity does
# not have the proper role to create a user (OWNER).
try:
await create_user(
MyAppContext(
Identity(
{"id": "this is an example", "roles": ["admin"]},
authentication_mode="Cookie",
)
)
)
except AuthorizationError as error:
last_error = error
assert last_error is not None
last_error = None
# The following will cause an authorization error because the user identity does
# not have the proper role to create a product (ADMIN).
try:
await create_product(
MyAppContext(
Identity(
{"id": "this is an example", "roles": ["admin"]},
authentication_mode="Cookie",
)
)
)
except AuthorizationError as error:
last_error = error
assert last_error is not None
last_error = None
asyncio.run(main())