-
Giovanni Cimolin da Silva authored
This fixes issues with some tools that assume `aud` as a plaing string even though that doesn't follow https://tools.ietf.org/html/rfc7519#section-4.1.3
Giovanni Cimolin da Silva authoredThis fixes issues with some tools that assume `aud` as a plaing string even though that doesn't follow https://tools.ietf.org/html/rfc7519#section-4.1.3
consumer.py 20.76 KiB
"""
LTI 1.3 Consumer implementation
"""
from urllib.parse import urlencode
from . import constants, exceptions
from .constants import (
LTI_1P3_ROLE_MAP,
LTI_BASE_MESSAGE,
LTI_1P3_ACCESS_TOKEN_REQUIRED_CLAIMS,
LTI_1P3_ACCESS_TOKEN_SCOPES,
LTI_1P3_CONTEXT_TYPE,
)
from .key_handlers import ToolKeyHandler, PlatformKeyHandler
from .ags import LtiAgs
from .deep_linking import LtiDeepLinking
class LtiConsumer1p3:
"""
LTI 1.3 Consumer Implementation
"""
def __init__(
self,
iss,
lti_oidc_url,
lti_launch_url,
client_id,
deployment_id,
rsa_key,
rsa_key_id,
tool_key=None,
tool_keyset_url=None,
):
"""
Initialize LTI 1.3 Consumer class
"""
self.iss = iss
self.oidc_url = lti_oidc_url
self.launch_url = lti_launch_url
self.client_id = client_id
self.deployment_id = deployment_id
# Set up platform message signature class
self.key_handler = PlatformKeyHandler(rsa_key, rsa_key_id)
# Set up tool public key verification class
self.tool_jwt = ToolKeyHandler(
public_key=tool_key,
keyset_url=tool_keyset_url
)
# IMS LTI Claim data
self.lti_claim_user_data = None
self.lti_claim_launch_presentation = None
self.lti_claim_context = None
self.lti_claim_custom_parameters = None
# Extra claims - used by LTI Advantage
self.extra_claims = {}
@staticmethod
def _get_user_roles(role):
"""
Converts platform role into LTI compliant roles
Used in roles claim: should return array of URI values
for roles that the user has within the message's context.
Supported roles:
* Core - Administrator
* Institution - Instructor (non-core role)
* Institution - Student
Reference: http://www.imsglobal.org/spec/lti/v1p3/#roles-claim
Role vocabularies: http://www.imsglobal.org/spec/lti/v1p3/#role-vocabularies
"""
lti_user_roles = set()
if role:
# Raise value error if value doesn't exist in map
if role not in LTI_1P3_ROLE_MAP:
raise ValueError("Invalid role list provided.")
# Add roles to list
lti_user_roles.update(LTI_1P3_ROLE_MAP[role])
return list(lti_user_roles)
def prepare_preflight_url(
self,
callback_url,
hint="hint",
lti_hint="lti_hint"
):
"""
Generates OIDC url with parameters
"""
oidc_url = self.oidc_url + "?"
parameters = {
"iss": self.iss,
"client_id": self.client_id,
"lti_deployment_id": self.deployment_id,
"target_link_uri": callback_url,
"login_hint": hint,
"lti_message_hint": lti_hint
}
return oidc_url + urlencode(parameters)
def set_user_data(
self,
user_id,
role,
full_name=None,
email_address=None
):
"""
Set user data/roles and convert to IMS Specification
User Claim doc: http://www.imsglobal.org/spec/lti/v1p3/#user-identity-claims
Roles Claim doc: http://www.imsglobal.org/spec/lti/v1p3/#roles-claim
"""
self.lti_claim_user_data = {
# User identity claims
# sub: locally stable identifier for user that initiated the launch
"sub": user_id,
# Roles claim
# Array of URI values for roles that the user has within the message's context
"https://purl.imsglobal.org/spec/lti/claim/roles": self._get_user_roles(role)
}
# Additonal user identity claims
# Optional user data that can be sent to the tool, if the block is configured to do so
if full_name:
self.lti_claim_user_data.update({
"name": full_name,
})
if email_address:
self.lti_claim_user_data.update({
"email": email_address,
})
def set_launch_presentation_claim(
self,
document_target="iframe"
):
"""
Optional: Set launch presentation claims
http://www.imsglobal.org/spec/lti/v1p3/#launch-presentation-claim
"""
if document_target not in ['iframe', 'frame', 'window']:
raise ValueError("Invalid launch presentation format.")
self.lti_claim_launch_presentation = {
# Launch presentation claim
"https://purl.imsglobal.org/spec/lti/claim/launch_presentation": {
# Can be one of: iframe, frame, window
"document_target": document_target,
# TODO: Add support for `return_url` handler to allow the tool
# to return error messages back to the lms.
# See the spec referenced above for more information.
},
}
def set_context_claim(
self,
context_id,
context_types=None,
context_title=None,
context_label=None
):
"""
Optional: Set context claims
https://www.imsglobal.org/spec/lti/v1p3/#context-claim
Arguments:
context_id (string): Unique value identifying the user
context_types (list): A list of context type values for the claim
context_title (string): Plain text title of the context
context_label (string): Plain text label for the context
"""
# Set basic claim data
context_claim_data = {
"id": context_id,
}
# Default context_types to a list if nothing is passed in
context_types = context_types or []
# Ensure the value of context_types is a list
if not isinstance(context_types, list):
raise TypeError("Invalid type for context_types. It must be a list.")
# Explicitly ignoring any custom context types
context_claim_types = [
context_type.value
for context_type in context_types
if isinstance(context_type, LTI_1P3_CONTEXT_TYPE)
]
if context_claim_types:
context_claim_data["type"] = context_claim_types
if context_title:
context_claim_data["title"] = context_title
if context_label:
context_claim_data["label"] = context_label
self.lti_claim_context = {
# Context claim
"https://purl.imsglobal.org/spec/lti/claim/context": context_claim_data
}
def set_custom_parameters(
self,
custom_parameters
):
"""
Stores custom parameters configured for LTI launch
"""
if not isinstance(custom_parameters, dict):
raise ValueError("Custom parameters must be a key/value dictionary.")
self.lti_claim_custom_parameters = {
"https://purl.imsglobal.org/spec/lti/claim/custom": custom_parameters
}
def get_lti_launch_message(
self,
resource_link,
include_extra_claims=True,
):
"""
Build LTI message from class parameters
This will add all required parameters from the LTI 1.3 spec and any additional ones set in
the configuration and JTW encode the message using the provided key.
"""
# Start from base message
lti_message = LTI_BASE_MESSAGE.copy()
# Add base parameters
lti_message.update({
# Issuer
"iss": self.iss,
# JWT aud and azp
"aud": self.client_id,
"azp": self.client_id,
# LTI Deployment ID Claim:
# String that identifies the platform-tool integration governing the message
# http://www.imsglobal.org/spec/lti/v1p3/#lti-deployment-id-claim
"https://purl.imsglobal.org/spec/lti/claim/deployment_id": self.deployment_id,
# Target Link URI: actual endpoint for the LTI resource to display
# MUST be the same value as the target_link_uri passed by the platform in the OIDC login request
# http://www.imsglobal.org/spec/lti/v1p3/#target-link-uri
"https://purl.imsglobal.org/spec/lti/claim/target_link_uri": self.launch_url,
# Resource link: stable and unique to each deployment_id
# This value MUST change if the link is copied or exported from one system or
# context and imported into another system or context
# http://www.imsglobal.org/spec/lti/v1p3/#resource-link-claim
"https://purl.imsglobal.org/spec/lti/claim/resource_link": {
"id": resource_link,
# Optional claims
# "title": "Introduction Assignment"
# "description": "Assignment to introduce who you are",
},
})
# Check if user data is set, then append it to lti message
# Raise if isn't set, since some user data is required for the launch
if self.lti_claim_user_data:
lti_message.update(self.lti_claim_user_data)
else:
raise ValueError("Required user data isn't set.")
# Only used when doing normal LTI launches
if include_extra_claims:
# Set optional claims
# Launch presentation claim
if self.lti_claim_launch_presentation:
lti_message.update(self.lti_claim_launch_presentation)
# Context claim
if self.lti_claim_context:
lti_message.update(self.lti_claim_context)
# Custom variables claim
if self.lti_claim_custom_parameters:
lti_message.update(self.lti_claim_custom_parameters)
# Extra claims - From LTI Advantage extensions
if self.extra_claims:
lti_message.update(self.extra_claims)
return lti_message
def generate_launch_request(
self,
preflight_response,
resource_link
):
"""
Build LTI message from class parameters
This will add all required parameters from the LTI 1.3 spec and any additional ones set in
the configuration and JTW encode the message using the provided key.
"""
# Validate preflight response
self._validate_preflight_response(preflight_response)
# Get LTI Launch Message
lti_launch_message = self.get_lti_launch_message(resource_link=resource_link)
# Nonce from OIDC preflight launch request
lti_launch_message.update({
"nonce": preflight_response.get("nonce")
})
return {
"state": preflight_response.get("state"),
"id_token": self.key_handler.encode_and_sign(
message=lti_launch_message,
expiration=300
)
}
def get_public_keyset(self):
"""
Export Public JWK
"""
return self.key_handler.get_public_jwk()
def access_token(self, token_request_data):
"""
Validate request and return JWT access token.
This complies to IMS Security Framework and accepts a JWT
as a secret for the client credentials grant.
See this section:
https://www.imsglobal.org/spec/security/v1p0/#securing_web_services
Full spec reference:
https://www.imsglobal.org/spec/security/v1p0/
Parameters:
token_request_data: Dict of parameters sent by LTI tool as form_data.
Returns:
A dict containing the JSON response containing a JWT and some extra
parameters required by LTI tools. This token gives access to all
supported LTI Scopes from this tool.
"""
# Check if all required claims are present
for required_claim in LTI_1P3_ACCESS_TOKEN_REQUIRED_CLAIMS:
if required_claim not in token_request_data.keys():
raise exceptions.MissingRequiredClaim()
# Check that grant type is `client_credentials`
if token_request_data['grant_type'] != 'client_credentials':
raise exceptions.UnsupportedGrantType()
# Validate JWT token
self.tool_jwt.validate_and_decode(
token_request_data['client_assertion']
)
# Check scopes and only return valid and supported ones
valid_scopes = []
requested_scopes = token_request_data['scope'].split(' ')
for scope in requested_scopes:
# TODO: Add additional checks for permitted scopes
# Currently there are no scopes, because there is no use for
# these access tokens until a tool needs to access the LMS.
# LTI Advantage extensions make use of this.
if scope in LTI_1P3_ACCESS_TOKEN_SCOPES:
valid_scopes.append(scope)
# Scopes are space separated as described in
# https://tools.ietf.org/html/rfc6749
scopes_str = " ".join(valid_scopes)
# This response is compliant with RFC 6749
# https://tools.ietf.org/html/rfc6749#section-4.4.3
return {
"access_token": self.key_handler.encode_and_sign(
{
"sub": self.client_id,
"iss": self.iss,
"scopes": scopes_str
},
# Create token valid for 3600 seconds (1h) as per specification
# https://www.imsglobal.org/spec/security/v1p0/#expires_in-values-and-renewing-the-access-token
expiration=3600
),
"token_type": "bearer",
"expires_in": 3600,
"scope": scopes_str
}
def _validate_preflight_response(self, response):
"""
Validates a preflight response to be used in a launch request
Raises ValueError in case of validation failure
:param response: the preflight response to be validated
"""
try:
assert response.get("nonce")
assert response.get("state")
assert response.get("redirect_uri")
assert response.get("client_id") == self.client_id
except AssertionError as err:
raise exceptions.PreflightRequestValidationFailure() from err
def check_token(self, token, allowed_scopes=None):
"""
Check if token has access to allowed scopes.
"""
token_contents = self.key_handler.validate_and_decode(
token,
# The issuer of the token is the platform
iss=self.iss,
)
# Tokens are space separated
token_scopes = token_contents['scopes'].split(' ')
# Check if token has permission for the requested scope,
# and throws exception if not.
# If `allowed_scopes` is empty, return true (just check
# token validity).
if allowed_scopes:
return any(scope in allowed_scopes for scope in token_scopes)
return True
def set_extra_claim(self, claim):
"""
Adds an additional claim to the LTI Launch message
"""
if not isinstance(claim, dict):
raise ValueError('Invalid extra claim: is not a dict.')
self.extra_claims.update(claim)
class LtiAdvantageConsumer(LtiConsumer1p3):
"""
LTI Advantage Consumer Implementation.
Builds on top of the LTI 1.3 consumer and adds support for
the following LTI Advantage Services:
* Assignments and Grades Service (LTI-AGS): Allows tools to
retrieve and send back grades into the platform.
Note: this is a partial implementation with read-only LineItems.
Reference spec: https://www.imsglobal.org/spec/lti-ags/v2p0
"""
def __init__(self, *args, **kwargs):
"""
Override parent class and set up required LTI Advantage variables.
"""
super().__init__(*args, **kwargs)
# LTI Advantage services
self.ags = None
self.dl = None
@property
def lti_ags(self):
"""
Returns LTI AGS class or throw exception if not set up.
"""
if not self.ags:
raise exceptions.LtiAdvantageServiceNotSetUp(
"The LTI AGS service was not set up for this consumer."
)
return self.ags
def enable_ags(
self,
lineitems_url,
lineitem_url=None,
allow_programmatic_grade_interaction=False,
):
"""
Enable LTI Advantage Assignments and Grades Service.
This will include the LTI AGS Claim in the LTI message
and set up the required class.
"""
self.ags = LtiAgs(
lineitems_url=lineitems_url,
lineitem_url=lineitem_url,
allow_creating_lineitems=allow_programmatic_grade_interaction,
results_service_enabled=True,
scores_service_enabled=True,
)
# Include LTI AGS claim inside the LTI Launch message
self.set_extra_claim(self.ags.get_lti_ags_launch_claim())
def enable_deep_linking(
self,
deep_linking_launch_url,
deep_linking_return_url,
):
"""
Enable LTI Advantage Deep Linking Service.
This will include the LTI DL Claim in the LTI message
and set up the required class.
"""
self.dl = LtiDeepLinking(deep_linking_launch_url, deep_linking_return_url)
def generate_launch_request(
self,
preflight_response,
resource_link
):
"""
Build LTI message for Deep linking launches.
Overrides method from LtiConsumer1p3 to allow handling LTI Deep linking messages
"""
# Check if Deep Linking is enabled and that this is a Deep Link Launch
if self.dl and preflight_response.get("lti_message_hint") == "deep_linking_launch":
# Validate preflight response
self._validate_preflight_response(preflight_response)
# Get LTI Launch Message
lti_launch_message = self.get_lti_launch_message(
resource_link=resource_link,
include_extra_claims=False,
)
# Update message type to LtiDeepLinkingRequest,
# replacing the normal launch request.
lti_launch_message.update({
"https://purl.imsglobal.org/spec/lti/claim/message_type": "LtiDeepLinkingRequest",
})
# Include deep linking claim
lti_launch_message.update(
# TODO: Add extra settings
self.dl.get_lti_deep_linking_launch_claim()
)
# Nonce from OIDC preflight launch request
lti_launch_message.update({
"nonce": preflight_response.get("nonce")
})
# Return new lanch message, used by XBlock to present the launch
return {
"state": preflight_response.get("state"),
"id_token": self.key_handler.encode_and_sign(
message=lti_launch_message,
expiration=300
)
}
# Call LTI Launch if Deep Linking is not
# set up or this isn't a Deep Link Launch
return super().generate_launch_request(
preflight_response,
resource_link
)
def check_and_decode_deep_linking_token(self, token):
"""
Check and decode Deep Linking response, return selected content items.
This either returns a content item list or raises an exception.
"""
if not self.dl:
raise exceptions.LtiAdvantageServiceNotSetUp()
# Decode token, check expiration
deep_link_response = self.tool_jwt.validate_and_decode(token)
# Check the response is a Deep Linking response type
message_type = deep_link_response.get("https://purl.imsglobal.org/spec/lti/claim/message_type")
if not message_type == "LtiDeepLinkingResponse":
raise exceptions.InvalidClaimValue("Token isn't a Deep Linking Response message.")
# Check if supported contentitems were returned
content_items = deep_link_response.get(
'https://purl.imsglobal.org/spec/lti-dl/claim/content_items',
# If not found, return empty list
[],
)
if any(
item['type'] not in constants.LTI_DEEP_LINKING_ACCEPTED_TYPES
for item in content_items
):
raise exceptions.LtiDeepLinkingContentTypeNotSupported()
# Return contentitems
return content_items
def set_dl_content_launch_parameters(
self,
url=None,
custom=None,
):
"""
Overrides LTI Consumer settings to do content presentation.
"""
if url:
self.launch_url = url
if custom:
self.set_custom_parameters(custom)