Skip to content
Snippets Groups Projects
Unverified Commit 2a6450d9 authored by Kshitij Sobti's avatar Kshitij Sobti
Browse files

Add LTI 1.1 config on model

parent 1c8cb856
No related branches found
No related tags found
No related merge requests found
Showing
with 183 additions and 87 deletions
......@@ -11,7 +11,7 @@ class LtiConfigurationAdmin(admin.ModelAdmin):
Makes the location field read-only to avoid issues.
"""
readonly_fields = ('location', )
readonly_fields = ('location', 'config_id')
admin.site.register(LtiConfiguration, LtiConfigurationAdmin)
......
......@@ -68,10 +68,10 @@ def parse_result_json(json_str):
"""
try:
json_obj = json.loads(json_str)
except (ValueError, TypeError):
except (ValueError, TypeError) as err:
msg = "Supplied JSON string in request body could not be decoded: {}".format(json_str)
log.error("[LTI] %s", msg)
raise Lti1p1Error(msg)
raise Lti1p1Error(msg) from err
# The JSON object must be a dict. If a non-empty list is passed in,
# use the first element, but only if it is a dict
......@@ -112,7 +112,7 @@ def parse_result_json(json_str):
except (TypeError, ValueError) as err:
msg = "Could not convert resultScore to float: {}".format(str(err))
log.error("[LTI] %s", msg)
raise Lti1p1Error(msg)
raise Lti1p1Error(msg) from err
return score, json_obj.get('comment', "")
......@@ -387,4 +387,4 @@ class LtiConsumer1p1:
return verify_oauth_body_signature(request, self.oauth_secret, outcome_service_url)
except (ValueError, Lti1p1Error) as err:
log.error("[LTI]: v2.0 result service -- OAuth body verification failed: %s", str(err))
raise Lti1p1Error(str(err))
raise Lti1p1Error(str(err)) from err
......@@ -11,9 +11,6 @@ from ..consumer import LtiConsumer1p1
def lti_embed(
*,
html_element_id,
lti_launch_url,
oauth_key,
oauth_secret,
resource_link_id,
user_id,
roles,
......@@ -21,6 +18,10 @@ def lti_embed(
context_title,
context_label,
result_sourcedid,
lti_consumer=None,
lti_launch_url=None,
oauth_key=None,
oauth_secret=None,
person_sourcedid=None,
person_contact_email_primary=None,
outcome_service_url=None,
......@@ -49,9 +50,6 @@ def lti_embed(
Arguments:
html_element_id (string): Value to use as the HTML element id in the HTML form
lti_launch_url (string): The URL to send the LTI Launch request to
oauth_key (string): The OAuth consumer key
oauth_secret (string): The OAuth consumer secret
resource_link_id (string): Opaque identifier guaranteed to be unique
for every placement of the link
user_id (string): Unique value identifying the user
......@@ -62,6 +60,11 @@ def lti_embed(
context_label (string): Plain text label for the context
result_sourcedid (string): Indicates the LIS Result Identifier (if any)
and uniquely identifies a row and column within the Tool Consumer gradebook
lti_consumer (LtiConsumer1p1): A pre-configured LtiConsumer1p1 object
as an alternative to providing the launch url, oauth key and oauth secret
lti_launch_url (string): The URL to send the LTI Launch request to
oauth_key (string): The OAuth consumer key
oauth_secret (string): The OAuth consumer secret
person_sourcedid (string): LIS identifier for the user account performing the launch
person_contact_email_primary (string): Primary contact email address of the user
outcome_service_url (string): URL pointing to the outcome service. This
......@@ -78,7 +81,10 @@ def lti_embed(
unicode: HTML template with the form and JavaScript to automatically
launch the LTI embedding
"""
lti_consumer = LtiConsumer1p1(lti_launch_url, oauth_key, oauth_secret)
if lti_consumer is None:
lti_consumer = LtiConsumer1p1(lti_launch_url, oauth_key, oauth_secret)
else:
lti_launch_url = lti_consumer.lti_launch_url
# Set LTI parameters from kwargs
lti_consumer.set_user_data(
......
......@@ -15,7 +15,7 @@ class TestLtiEmbed(TestCase):
"""
def setUp(self):
super(TestLtiEmbed, self).setUp()
super().setUp()
self.html_element_id = 'html_element_id'
self.lti_launch_url = 'lti_launch_url'
self.oauth_key = 'oauth_key'
......
......@@ -56,8 +56,8 @@ def get_oauth_request_signature(key, secret, url, headers, body):
body=body,
headers=headers
)
except ValueError: # Scheme not in url.
raise Lti1p1Error("Failed to sign oauth request")
except ValueError as err: # Scheme not in url.
raise Lti1p1Error("Failed to sign oauth request") from err
return headers['Authorization']
......
......@@ -119,7 +119,7 @@ class TestLtiConsumer1p1(unittest.TestCase):
"""
def setUp(self):
super(TestLtiConsumer1p1, self).setUp()
super().setUp()
self.lti_launch_url = 'lti_launch_url'
self.oauth_key = 'fake_consumer_key'
self.oauth_secret = 'fake_signature'
......
......@@ -404,8 +404,8 @@ class LtiConsumer1p3:
assert response.get("state")
assert response.get("client_id") == self.client_id
assert response.get("redirect_uri") == self.launch_url
except AssertionError:
raise exceptions.PreflightRequestValidationFailure()
except AssertionError as err:
raise exceptions.PreflightRequestValidationFailure() from err
def check_token(self, token, allowed_scopes=None):
"""
......@@ -455,7 +455,7 @@ class LtiAdvantageConsumer(LtiConsumer1p3):
"""
Override parent class and set up required LTI Advantage variables.
"""
super(LtiAdvantageConsumer, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
# LTI AGS Variables
self.ags = None
......
......@@ -52,18 +52,18 @@ class Lti1p3ApiAuthentication(authentication.BaseAuthentication):
try:
lti_configuration = LtiConfiguration.objects.get(pk=lti_config_id)
lti_consumer = lti_configuration.get_lti_consumer()
except Exception:
except Exception as err:
msg = _('LTI configuration not found.')
raise exceptions.AuthenticationFailed(msg)
raise exceptions.AuthenticationFailed(msg) from err
# Verify token validity
# This doesn't validate specific permissions, just checks if the token
# is valid or not.
try:
lti_consumer.check_token(auth[1])
except Exception:
except Exception as err:
msg = _('Invalid token signature.')
raise exceptions.AuthenticationFailed(msg)
raise exceptions.AuthenticationFailed(msg) from err
# Passing parameters back to the view through the request in order
# to avoid implementing a separate authentication backend or
......
......@@ -31,8 +31,8 @@ class UsageKeyField(serializers.Field):
"""
try:
return UsageKey.from_string(data)
except InvalidKeyError:
raise serializers.ValidationError("Invalid usage key: {}".format(data))
except InvalidKeyError as err:
raise serializers.ValidationError("Invalid usage key: {}".format(data)) from err
class LtiAgsLineItemSerializer(serializers.ModelSerializer):
......
......@@ -55,8 +55,8 @@ class ToolKeyHandler:
# Import Key and save to internal state
new_key.load_key(RSA.import_key(raw_key))
self.public_key = new_key
except ValueError:
raise exceptions.InvalidRsaKey()
except ValueError as err:
raise exceptions.InvalidRsaKey() from err
def _get_keyset(self, kid=None):
"""
......@@ -113,12 +113,12 @@ class ToolKeyHandler:
# Else returns decoded message
return message
except NoSuitableSigningKeys:
raise exceptions.NoSuitableKeys()
except BadSyntax:
raise exceptions.MalformedJwtToken()
except WrongNumberOfParts:
raise exceptions.MalformedJwtToken()
except NoSuitableSigningKeys as err:
raise exceptions.NoSuitableKeys() from err
except BadSyntax as err:
raise exceptions.MalformedJwtToken() from err
except WrongNumberOfParts as err:
raise exceptions.MalformedJwtToken() from err
class PlatformKeyHandler:
......@@ -145,8 +145,8 @@ class PlatformKeyHandler:
kid=kid,
key=RSA.import_key(key_pem)
)
except ValueError:
raise exceptions.InvalidRsaKey()
except ValueError as err:
raise exceptions.InvalidRsaKey() from err
def encode_and_sign(self, message, expiration=None):
"""
......@@ -211,7 +211,7 @@ class PlatformKeyHandler:
# Else return token contents
return message
except NoSuitableSigningKeys:
raise exceptions.NoSuitableKeys()
except BadSyntax:
raise exceptions.MalformedJwtToken()
except NoSuitableSigningKeys as err:
raise exceptions.NoSuitableKeys() from err
except BadSyntax as err:
raise exceptions.MalformedJwtToken() from err
......@@ -34,7 +34,7 @@ class TestLtiAuthentication(TestCase):
Unit tests for Lti1p3ApiAuthentication class
"""
def setUp(self):
super(TestLtiAuthentication, self).setUp()
super().setUp()
# Set up consumer
self.lti_consumer = LtiConsumer1p3(
......
......@@ -33,7 +33,7 @@ class TestLtiAuthentication(TestCase):
Unit tests for Lti1p3ApiAuthentication class
"""
def setUp(self):
super(TestLtiAuthentication, self).setUp()
super().setUp()
# Set up consumer
self.lti_consumer = LtiConsumer1p3(
......
......@@ -40,7 +40,7 @@ class TestLti1p3Consumer(TestCase):
Unit tests for LtiConsumer1p3
"""
def setUp(self):
super(TestLti1p3Consumer, self).setUp()
super().setUp()
# Set up consumer
self.lti_consumer = LtiConsumer1p3(
......@@ -557,7 +557,7 @@ class TestLtiAdvantageConsumer(TestCase):
Unit tests for LtiAdvantageConsumer
"""
def setUp(self):
super(TestLtiAdvantageConsumer, self).setUp()
super().setUp()
# Set up consumer
self.lti_consumer = LtiAdvantageConsumer(
......
......@@ -24,7 +24,7 @@ class TestPlatformKeyHandler(TestCase):
Unit tests for PlatformKeyHandler
"""
def setUp(self):
super(TestPlatformKeyHandler, self).setUp()
super().setUp()
self.rsa_key_id = "1"
self.rsa_key = RSA.generate(2048).export_key('PEM')
......@@ -187,7 +187,7 @@ class TestToolKeyHandler(TestCase):
Unit tests for ToolKeyHandler
"""
def setUp(self):
super(TestToolKeyHandler, self).setUp()
super().setUp()
self.rsa_key_id = "1"
......
......@@ -648,10 +648,10 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
if not key:
raise ValueError
key = ':'.join(key)
except ValueError:
except ValueError as err:
msg = 'Could not parse LTI passport: {lti_passport!r}. Should be "id:key:secret" string.'
msg = self.ugettext(msg).format(lti_passport=lti_passport)
raise LtiError(msg)
raise LtiError(msg) from err
if lti_id == self.lti_id.strip():
return key, secret
......@@ -783,11 +783,11 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
for custom_parameter in self.custom_parameters:
try:
param_name, param_value = [p.strip() for p in custom_parameter.split('=', 1)]
except ValueError:
except ValueError as err:
_ = self.runtime.service(self, "i18n").ugettext
msg = 'Could not parse custom parameter: {custom_parameter!r}. Should be "x=y" string.'
msg = self.ugettext(msg).format(custom_parameter=custom_parameter)
raise LtiError(msg)
raise LtiError(msg) from err
# LTI specs: 'custom_' should be prepended before each custom parameter, as pointed in link above.
if param_name not in LTI_PARAMETERS:
......@@ -870,7 +870,7 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
Get Studio View fragment
"""
loader = ResourceLoader(__name__)
fragment = super(LtiConsumerXBlock, self).studio_view(context)
fragment = super().studio_view(context)
fragment.add_javascript(loader.load_unicode("static/js/xblock_studio_view.js"))
fragment.initialize_js('LtiConsumerXBlockInitStudio')
......@@ -929,7 +929,7 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
return fragment
@XBlock.handler
def lti_launch_handler(self, request, suffix=''):
def lti_launch_handler(self, request, suffix=''): # pylint: disable=unused-argument
"""
XBlock handler for launching LTI 1.1 tools.
......@@ -984,7 +984,7 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
return Response(template, content_type='text/html')
@XBlock.handler
def lti_1p3_launch_handler(self, request, suffix=''):
def lti_1p3_launch_handler(self, request, suffix=''): # pylint: disable=unused-argument
"""
XBlock handler for launching the LTI 1.3 tools.
......@@ -1010,7 +1010,7 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
return Response(template, content_type='text/html')
@XBlock.handler
def lti_1p3_launch_callback(self, request, suffix=''):
def lti_1p3_launch_callback(self, request, suffix=''): # pylint: disable=unused-argument
"""
XBlock handler for launching the LTI 1.3 tool.
......@@ -1071,7 +1071,7 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
return Response(template, status=400, content_type='text/html')
@XBlock.handler
def lti_1p3_access_token(self, request, suffix=''):
def lti_1p3_access_token(self, request, suffix=''): # pylint: disable=unused-argument
"""
XBlock handler for creating access tokens for the LTI 1.3 tool.
......@@ -1130,7 +1130,7 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
)
@XBlock.handler
def outcome_service_handler(self, request, suffix=''):
def outcome_service_handler(self, request, suffix=''): # pylint: disable=unused-argument
"""
XBlock handler for LTI Outcome Service requests.
......@@ -1421,7 +1421,7 @@ class LtiConsumerXBlock(StudioEditableXBlockMixin, XBlock):
# return key/value fields in a Python dict object
# values may be numeric / string or dict
# default implementation is an empty dict
xblock_body = super(LtiConsumerXBlock, self).index_dictionary()
xblock_body = super().index_dictionary()
index_body = {
"display_name": self.display_name,
......
# Generated by Django 2.2.17 on 2020-12-21 13:39
from django.db import migrations, models
import jsonfield.encoder
import jsonfield.fields
import uuid
class Migration(migrations.Migration):
dependencies = [
('lti_consumer', '0005_migrate_keyset_to_model'),
]
operations = [
migrations.AddField(
model_name='lticonfiguration',
name='config_id',
field=models.UUIDField(default=uuid.uuid4, editable=False, unique=True),
),
migrations.AddField(
model_name='lticonfiguration',
name='lti_1p1_client_key',
field=models.CharField(blank=True, help_text='Client key provided by the LTI tool provider.', max_length=255),
),
migrations.AddField(
model_name='lticonfiguration',
name='lti_1p1_client_secret',
field=models.CharField(blank=True, help_text='Client secret provided by the LTI tool provider.', max_length=255),
),
migrations.AddField(
model_name='lticonfiguration',
name='lti_1p1_launch_url',
field=models.CharField(blank=True, help_text='The URL of the external tool that initiates the launch.', max_length=255),
),
migrations.AddField(
model_name='lticonfiguration',
name='lti_config',
field=jsonfield.fields.JSONField(blank=True, default=dict, dump_kwargs={'cls': jsonfield.encoder.JSONEncoder, 'separators': (',', ':')}, help_text='LTI configuration data.', load_kwargs={}),
),
migrations.AlterField(
model_name='lticonfiguration',
name='config_store',
field=models.CharField(choices=[('CONFIG_ON_XBLOCK', 'Configuration Stored on XBlock fields'), ('CONFIG_ON_DB', 'Configuration Stored on this model')], default='CONFIG_ON_XBLOCK', max_length=255),
),
]
......@@ -6,9 +6,11 @@ import json
from django.db import models
from django.core.validators import MinValueValidator
from django.core.exceptions import ValidationError
from django.utils.translation import ugettext_lazy as _
from opaque_keys.edx.django.models import UsageKeyField
from Cryptodome.PublicKey import RSA
from opaque_keys.edx.django.models import UsageKeyField
from jsonfield import JSONField
# LTI 1.1
from lti_consumer.lti_1p1.consumer import LtiConsumer1p1
......@@ -60,8 +62,10 @@ class LtiConfiguration(models.Model):
# stored on the block, but should be expanded to
# enable storing LTI configuration in this model.
CONFIG_ON_XBLOCK = 'CONFIG_ON_XBLOCK'
CONFIG_ON_DB = 'CONFIG_ON_DB'
CONFIG_STORE_CHOICES = [
(CONFIG_ON_XBLOCK, 'Configuration Stored on XBlock fields'),
(CONFIG_ON_XBLOCK, _('Configuration Stored on XBlock fields')),
(CONFIG_ON_DB, _('Configuration Stored on this model')),
]
config_store = models.CharField(
max_length=255,
......@@ -69,9 +73,10 @@ class LtiConfiguration(models.Model):
default=CONFIG_ON_XBLOCK,
)
# A secondary ID for this configuration that can be used in URLs without leaking primary id.
config_id = models.UUIDField(unique=True, default=uuid.uuid4, editable=False)
# Block location where the configuration is stored.
# In the future, the LTI configuration will be
# stored in this model in a JSON field.
location = UsageKeyField(
max_length=255,
db_index=True,
......@@ -79,33 +84,71 @@ class LtiConfiguration(models.Model):
blank=True,
)
# This is where the configuration is stored in the model if stored on this model.
lti_config = JSONField(
null=False,
blank=True,
default=dict,
help_text=_("LTI configuration data.")
)
# LTI 1.1 Related variables
lti_1p1_launch_url = models.CharField(
max_length=255,
blank=True,
help_text=_("The URL of the external tool that initiates the launch.")
)
lti_1p1_client_key = models.CharField(
max_length=255,
blank=True,
help_text=_("Client key provided by the LTI tool provider.")
)
lti_1p1_client_secret = models.CharField(
max_length=255,
blank=True,
help_text=_("Client secret provided by the LTI tool provider.")
)
# LTI 1.3 Related variables
lti_1p3_internal_private_key = models.TextField(
blank=True,
help_text="Platform's generated Private key. Keep this value secret.",
help_text=_("Platform's generated Private key. Keep this value secret."),
)
lti_1p3_internal_private_key_id = models.CharField(
max_length=255,
blank=True,
help_text="Platform's generated Private key ID",
help_text=_("Platform's generated Private key ID"),
)
lti_1p3_internal_public_jwk = models.TextField(
blank=True,
help_text="Platform's generated JWK keyset.",
help_text=_("Platform's generated JWK keyset."),
)
lti_1p3_client_id = models.CharField(
max_length=255,
default=generate_client_id,
help_text="Client ID used by LTI tool",
help_text=_("Client ID used by LTI tool"),
)
# Empty variable that'll hold the block once it's retrieved
# from the modulestore or preloaded
_block = None
def clean(self):
if self.config_store == self.CONFIG_ON_XBLOCK and self.location is None:
raise ValidationError({
"config_store": _("LTI Configuration stores on XBlock needs a block location set."),
})
try:
consumer = self.get_lti_consumer()
except NotImplementedError:
consumer = None
if consumer is None:
raise ValidationError(_("Invalid LTI configuration."))
@property
def block(self):
"""
......@@ -114,7 +157,7 @@ class LtiConfiguration(models.Model):
block = getattr(self, '_block', None)
if block is None:
if self.location is None:
raise ValueError("Block location not set, it's not possible to retrieve the block.")
raise ValueError(_("Block location not set, it's not possible to retrieve the block."))
block = self._block = compat.load_block_as_anonymous_user(self.location)
return block
......@@ -188,12 +231,13 @@ class LtiConfiguration(models.Model):
# If LTI configuration is stored in the XBlock.
if self.config_store == self.CONFIG_ON_XBLOCK:
key, secret = self.block.lti_provider_key_secret
launch_url = self.block.launch_url
else:
key = self.lti_1p1_client_key
secret = self.lti_1p1_client_secret
launch_url = self.lti_1p1_launch_url
return LtiConsumer1p1(self.block.launch_url, key, secret)
# There's no configuration stored locally, so throw
# NotImplemented.
raise NotImplementedError
return LtiConsumer1p1(launch_url, key, secret)
def _get_lti_1p3_consumer(self):
"""
......@@ -399,7 +443,7 @@ class LtiAgsScore(models.Model):
if self.score_given and self.score_maximum is None:
raise ValidationError({'score_maximum': 'cannot be unset when score_given is set'})
def save(self, *args, **kwargs): # pylint: disable=arguments-differ
def save(self, *args, **kwargs): # pylint: disable=signature-differs
self.full_clean()
super().save(*args, **kwargs)
......
......@@ -46,32 +46,32 @@ def parse_grade_xml_body(body):
parser = etree.XMLParser(ns_clean=True, recover=True, encoding='utf-8')
root = etree.fromstring(data, parser=parser)
except etree.XMLSyntaxError as ex:
raise LtiError(str(ex) or 'Body is not valid XML')
raise LtiError(str(ex) or 'Body is not valid XML') from ex
try:
imsx_message_identifier = root.xpath("//def:imsx_messageIdentifier", namespaces=namespaces)[0].text or ''
except IndexError:
raise LtiError('Failed to parse imsx_messageIdentifier from XML request body')
except IndexError as ex:
raise LtiError('Failed to parse imsx_messageIdentifier from XML request body') from ex
try:
body = root.xpath("//def:imsx_POXBody", namespaces=namespaces)[0]
except IndexError:
raise LtiError('Failed to parse imsx_POXBody from XML request body')
except IndexError as ex:
raise LtiError('Failed to parse imsx_POXBody from XML request body') from ex
try:
action = body.getchildren()[0].tag.replace('{' + lti_spec_namespace + '}', '')
except IndexError:
raise LtiError('Failed to parse action from XML request body')
except IndexError as ex:
raise LtiError('Failed to parse action from XML request body') from ex
try:
sourced_id = root.xpath("//def:sourcedId", namespaces=namespaces)[0].text
except IndexError:
raise LtiError('Failed to parse sourcedId from XML request body')
except IndexError as ex:
raise LtiError('Failed to parse sourcedId from XML request body') from ex
try:
score = root.xpath("//def:textString", namespaces=namespaces)[0].text
except IndexError:
raise LtiError('Failed to parse score textString from XML request body')
except IndexError as ex:
raise LtiError('Failed to parse score textString from XML request body') from ex
# Raise exception if score is not float or not in range 0.0-1.0 regarding spec.
score = float(score)
......
......@@ -161,7 +161,7 @@ class LtiAgsLineItemViewset(viewsets.ModelViewSet):
url_path='results/(?P<user_id>[^/.]+)?',
renderer_classes=[LineItemResultsRenderer]
)
def results(self, request, user_id=None, **kwargs):
def results(self, request, user_id=None, **kwargs): # pylint: disable=unused-argument
"""
Return a Result list for an LtiAgsLineItem
......@@ -199,7 +199,7 @@ class LtiAgsLineItemViewset(viewsets.ModelViewSet):
parser_classes=[LineItemScoreParser],
renderer_classes=[LineItemScoreRenderer]
)
def scores(self, request, *args, **kwargs):
def scores(self, request, *args, **kwargs): # pylint: disable=unused-argument
"""
Create a Score record for an LtiAgsLineItem
......
......@@ -16,7 +16,7 @@ class TestLti1p3KeysetEndpoint(TestCase):
Test `public_keyset_endpoint` method.
"""
def setUp(self):
super(TestLti1p3KeysetEndpoint, self).setUp()
super().setUp()
self.location = 'block-v1:course+test+2020+type@problem+block@test'
self.url = '/lti_consumer/v1/public_keysets/{}'.format(self.location)
......@@ -70,7 +70,7 @@ class TestLti1p3LaunchGateEndpoint(TestCase):
Test `launch_gate_endpoint` method.
"""
def setUp(self):
super(TestLti1p3LaunchGateEndpoint, self).setUp()
super().setUp()
self.location = 'block-v1:course+test+2020+type@problem+block@test'
self.url = '/lti_consumer/v1/launch/'
......@@ -113,7 +113,7 @@ class TestLti1p3AccessTokenEndpoint(TestCase):
Test `access_token_endpoint` method.
"""
def setUp(self):
super(TestLti1p3AccessTokenEndpoint, self).setUp()
super().setUp()
self.location = 'block-v1:course+test+2020+type@problem+block@test'
self.url = '/lti_consumer/v1/token/{}'.format(self.location)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment