Newer
Older
"""
LTI consumer plugin passthrough views
"""

Giovanni Cimolin da Silva
committed
import logging
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied

Giovanni Cimolin da Silva
committed
from django.http import JsonResponse, Http404
from django.db import transaction
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.views.decorators.clickjacking import xframe_options_sameorigin
from django.shortcuts import render

Giovanni Cimolin da Silva
committed
from django_filters.rest_framework import DjangoFilterBackend

Giovanni Cimolin da Silva
committed
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import UsageKey
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.status import HTTP_403_FORBIDDEN

Giovanni Cimolin da Silva
committed

Giovanni Cimolin da Silva
committed
from lti_consumer.exceptions import LtiError
from lti_consumer.models import (
LtiConfiguration,
LtiAgsLineItem,
LtiDlContentItem,
)
from lti_consumer.lti_1p3.exceptions import (
Lti1p3Exception,
LtiDeepLinkingContentTypeNotSupported,
)
from lti_consumer.lti_1p3.extensions.rest_framework.constants import LTI_DL_CONTENT_TYPE_SERIALIZER_MAP
from lti_consumer.lti_1p3.extensions.rest_framework.serializers import (
LtiAgsLineItemSerializer,
LtiAgsScoreSerializer,
LtiAgsResultSerializer,
LtiNrpsContextMembershipBasicSerializer,
LtiNrpsContextMembershipPIISerializer,
)
from lti_consumer.lti_1p3.extensions.rest_framework.permissions import (
LtiAgsPermissions,
LtiNrpsContextMembershipsPermissions,

Giovanni Cimolin da Silva
committed
from lti_consumer.lti_1p3.extensions.rest_framework.authentication import Lti1p3ApiAuthentication
from lti_consumer.lti_1p3.extensions.rest_framework.renderers import (
LineItemsRenderer,
LineItemRenderer,
LineItemScoreRenderer,
LineItemResultsRenderer,
MembershipResultRenderer,
)
from lti_consumer.lti_1p3.extensions.rest_framework.parsers import (
LineItemParser,
LineItemScoreParser,
)
from lti_consumer.lti_1p3.extensions.rest_framework.utils import IgnoreContentNegotiation
from lti_consumer.plugin import compat
from lti_consumer.utils import _, expose_pii_fields

Giovanni Cimolin da Silva
committed
log = logging.getLogger(__name__)
def user_has_staff_access(user, course_key):
"""
Check if an user has write permissions to a given course.
"""
return compat.user_has_access(user, "staff", course_key)
def has_block_access(user, block, course_key):
"""
Checks if a user has access to given xblock.
``has_access`` doesn't checks for course enrollment. On the otherhand, ``get_course_with_access``
only checks for the course itself. There is no way to check access for specific xblock. This function
has been created to perform a combination of check for both enrollment and access for specific xblock.
Args:
user: User Object
block: xblock Object to check permission for
course_key: A course_key specifying which course run this access is for.
Returns:
bool: True if user has access, False otherwise.
"""
# Get the course
course = compat.get_course_by_id(course_key)
# Check if user is authenticated & enrolled
course_access = compat.user_course_access(course, user, 'load', check_if_enrolled=True, check_if_authenticated=True)
# Check if user has access to xblock
block_access = compat.user_has_access(user, 'load', block, course_key)
# Return True if the user has access to xblock and is enrolled in that specific course.
return course_access and block_access
@require_http_methods(["GET"])
def public_keyset_endpoint(request, usage_id=None):
"""
Gate endpoint to fetch public keysets from a problem
This is basically a passthrough function that uses the
OIDC response parameter `login_hint` to locate the block
and run the proper handler.
"""
try:

Giovanni Cimolin da Silva
committed
lti_config = LtiConfiguration.objects.get(
location=UsageKey.from_string(usage_id)

Giovanni Cimolin da Silva
committed
if lti_config.version != lti_config.LTI_1P3:
raise LtiError(
"LTI Error: LTI 1.1 blocks do not have a public keyset endpoint."
)
# Retrieve block's Public JWK
# The underlying method will generate a new Private-Public Pair if one does
# not exist, and retrieve the values.
response = JsonResponse(lti_config.lti_1p3_public_jwk)
response['Content-Disposition'] = 'attachment; filename=keyset.json'
return response

Giovanni Cimolin da Silva
committed
except (LtiError, InvalidKeyError, ObjectDoesNotExist) as exc:
log.info("Error while retrieving keyset for usage_id %r: %s", usage_id, exc)
raise Http404 from exc
@require_http_methods(["GET", "POST"])
def launch_gate_endpoint(request, suffix):
"""
Gate endpoint that triggers LTI launch endpoint XBlock handler
This is basically a passthrough function that uses the
OIDC response parameter `login_hint` to locate the block
and run the proper handler.
"""
try:
usage_key_str = request.GET.get('login_hint')
usage_key = UsageKey.from_string(usage_key_str)
return compat.run_xblock_handler(
request=request,
course_id=str(usage_key.course_key),
usage_id=str(usage_key),
handler='lti_1p3_launch_callback',
suffix=suffix
)

Giovanni Cimolin da Silva
committed
except Exception as exc:
log.warning("Error preparing LTI 1.3 launch for hint %r: %s", usage_key_str, exc)
raise Http404 from exc
@csrf_exempt
@require_http_methods(["POST"])
def access_token_endpoint(request, usage_id=None):
"""
Gate endpoint to enable tools to retrieve access tokens
"""
try:
usage_key = UsageKey.from_string(usage_id)
return compat.run_xblock_handler_noauth(
request=request,
course_id=str(usage_key.course_key),
usage_id=str(usage_key),
handler='lti_1p3_access_token'
)

Giovanni Cimolin da Silva
committed
except Exception as exc:
log.warning("Error retrieving an access token for usage_id %r: %s", usage_id, exc)
raise Http404 from exc

Giovanni Cimolin da Silva
committed
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# Post from external tool that doesn't
# have access to CSRF tokens
@csrf_exempt
# This URL should work inside an iframe
@xframe_options_sameorigin
# Post only, as required by LTI-DL Specification
@require_http_methods(["POST"])
def deep_linking_response_endpoint(request, lti_config_id=None):
"""
Deep Linking response endpoint where tool can send back
"""
try:
# Retrieve LTI configuration
lti_config = LtiConfiguration.objects.get(id=lti_config_id)
# First, check if the user has sufficient permissions to
# save LTI Deep Linking content through the student.auth API.
course_key = lti_config.location.course_key
if not user_has_staff_access(request.user, course_key):
raise PermissionDenied()
# Get LTI consumer
lti_consumer = lti_config.get_lti_consumer()
# Retrieve Deep Linking return message and validate parameters
content_items = lti_consumer.check_and_decode_deep_linking_token(
request.POST.get("JWT")
)
# On a transaction, clear older DeepLinking selections, then
# verify and save each content item passed from the tool.
with transaction.atomic():
# Erase older deep linking selection
LtiDlContentItem.objects.filter(lti_configuration=lti_config).delete()
for content_item in content_items:
content_type = content_item.get('type')

Giovanni Cimolin da Silva
committed
# Retrieve serializer (or raise)
if content_type not in LTI_DL_CONTENT_TYPE_SERIALIZER_MAP.keys():
raise LtiDeepLinkingContentTypeNotSupported()
serializer_cls = LTI_DL_CONTENT_TYPE_SERIALIZER_MAP[content_type]
# Validate content item data
serializer = serializer_cls(data=content_item)
serializer.is_valid(True)
# Save content item
LtiDlContentItem.objects.create(
lti_configuration=lti_config,
attributes=serializer.validated_data,
)

Giovanni Cimolin da Silva
committed
# Display sucess page to indicate that LTI DL Content was
# saved successfully and auto-close after a few seconds.
return render(request, 'html/lti-dl/dl_response_saved.html')
# If LtiConfiguration doesn't exist, error with 404 status.

Giovanni Cimolin da Silva
committed
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
except LtiConfiguration.DoesNotExist as exc:
log.info("LtiConfiguration %r does not exist: %s", lti_config_id, exc)
raise Http404 from exc
# If the deep linking content type is not supported
except LtiDeepLinkingContentTypeNotSupported as exc:
log.info("One of the selected LTI Content Types is not supported: %s", exc)
return render(
request,
'html/lti-dl/dl_response_error.html',
{"error": _("The selected content type is not supported by Open edX.")},
status=400
)
# Bad JWT message, invalid token, or any other message validation issues
except (Lti1p3Exception, PermissionDenied) as exc:
log.warning(
"Permission on LTI Config %r denied for user %r: %s",
lti_config,
request.user,
exc,
)
return render(
request,
'html/lti-dl/dl_response_error.html',
{
"error": _("You don't have access to save LTI Content Items."),
"explanation": _("Please check that you have course staff permissions "
"and double check this block's LTI settings."),
},
status=403
)

Giovanni Cimolin da Silva
committed
@xframe_options_sameorigin
def deep_linking_content_endpoint(request, lti_config_id=None):
"""
Deep Linking endpoint for rendering Deep Linking Content Items.
"""
try:
# Get LTI Configuration
lti_config = LtiConfiguration.objects.get(id=lti_config_id)

Giovanni Cimolin da Silva
committed
except LtiConfiguration.DoesNotExist as exc:
log.info("LtiConfiguration %r does not exist: %s", lti_config_id, exc)
raise Http404 from exc
# check if user has proper access
if not has_block_access(request.user, lti_config.block, lti_config.location.course_key):

Giovanni Cimolin da Silva
committed
log.warning(
"Permission on LTI Config %r denied for user %r.",
lti_config_id,
request.user,
)
raise PermissionDenied
# Get all LTI-DL contents
content_items = LtiDlContentItem.objects.filter(lti_configuration=lti_config)
# If no LTI-DL contents found for current configuration, throw 404 error
if not content_items.exists():

Giovanni Cimolin da Silva
committed
log.info("There's no Deep linking content for LTI configuration %s.", lti_config)
raise Http404
# Render LTI-DL contents
return render(request, 'html/lti-dl/render_dl_content.html', {
'content_items': content_items,
'block': lti_config.block,
})

Giovanni Cimolin da Silva
committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class LtiAgsLineItemViewset(viewsets.ModelViewSet):
"""
LineItem endpoint implementation from LTI Advantage.
See full documentation at:
https://www.imsglobal.org/spec/lti-ags/v2p0#line-item-service
"""
serializer_class = LtiAgsLineItemSerializer
pagination_class = None
# Custom permission classes for LTI APIs
authentication_classes = [Lti1p3ApiAuthentication]
permission_classes = [LtiAgsPermissions]
# Renderer/parser classes to accept LTI AGS content types
renderer_classes = [
LineItemsRenderer,
LineItemRenderer,
]
parser_classes = [LineItemParser]
# Filters
filter_backends = [DjangoFilterBackend]
filterset_fields = [
'resource_link_id',
'resource_id',
'tag'
]
def get_queryset(self):
lti_configuration = self.request.lti_configuration
# Return all LineItems related to the LTI configuration.
# TODO:
# Note that each configuration currently maps 1:1
# to each resource link (block), and this filter needs
# improved once we start reusing LTI configurations.
return LtiAgsLineItem.objects.filter(
lti_configuration=lti_configuration
)
def perform_create(self, serializer):
lti_configuration = self.request.lti_configuration
serializer.save(lti_configuration=lti_configuration)
@action(
detail=True,
methods=['GET'],
url_path='results/(?P<user_id>[^/.]+)?',
renderer_classes=[LineItemResultsRenderer],
content_negotiation_class=IgnoreContentNegotiation,
def results(self, request, user_id=None, **kwargs):
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
"""
Return a Result list for an LtiAgsLineItem
URL Parameters:
* user_id (string): String external user id representation.
Query Parameters:
* limit (integer): The maximum number of records to return. Records are
sorted with most recent timestamp first
Returns:
* An array of Result records, formatted by LtiAgsResultSerializer
and returned with the media-type for LineItemResultsRenderer
"""
line_item = self.get_object()
scores = line_item.scores.filter(score_given__isnull=False).order_by('-timestamp')
if user_id:
scores = scores.filter(user_id=user_id)
if request.query_params.get('limit'):
scores = scores[:int(request.query_params.get('limit'))]
serializer = LtiAgsResultSerializer(
list(scores),
context={'request': self.request},
many=True,
)
return Response(serializer.data)
@action(
detail=True,
methods=['POST'],
parser_classes=[LineItemScoreParser],
renderer_classes=[LineItemScoreRenderer],
content_negotiation_class=IgnoreContentNegotiation,
def scores(self, request, *args, **kwargs):
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
"""
Create a Score record for an LtiAgsLineItem
Data:
* A JSON object capable of being serialized by LtiAgsScoreSerializer
Returns:
* An copy of the saved record, formatted by LtiAgsScoreSerializer
and returned with the media-type for LineItemScoreRenderer
"""
line_item = self.get_object()
user_id = request.data.get('userId')
# Using `filter` and `first` so that when a score does not exist,
# `existing_score` is set to `None`. Using `get` will raise `DoesNotExist`
existing_score = line_item.scores.filter(user_id=user_id).first()
serializer = LtiAgsScoreSerializer(
instance=existing_score,
data=request.data,
context={'request': self.request},
)
serializer.is_valid(raise_exception=True)
serializer.save(line_item=line_item)
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data,
status=status.HTTP_201_CREATED,
headers=headers
)
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class LtiNrpsContextMembershipViewSet(viewsets.ReadOnlyModelViewSet):
"""
LTI NRPS Context Membership Service endpoint.
See full documentation at:
http://imsglobal.org/spec/lti-nrps/v2p0
"""
# Custom permission classes for LTI APIs
authentication_classes = [Lti1p3ApiAuthentication]
permission_classes = [LtiNrpsContextMembershipsPermissions]
# Renderer classes to accept LTI NRPS content types
renderer_classes = [
MembershipResultRenderer,
]
def attach_external_user_ids(self, data):
"""
Preprocess the output of `get_membership` method amd appends external ids to each user.
"""
# batch get or create external ids for all users
user_ids = data.keys()
users = get_user_model().objects.prefetch_related('profile').filter(id__in=user_ids)
# get external ids
external_ids = compat.batch_get_or_create_externalids(users)
for userid in user_ids:
# append external ids to user
data[userid]['external_id'] = external_ids[userid].external_user_id
def get_serializer_class(self):
"""
Overrides ModelViewSet's `get_serializer_class` method.
Checks if PII fields can be exposed and returns appropiate serializer.
"""
if expose_pii_fields(self.request.lti_configuration.location.course_key):
return LtiNrpsContextMembershipPIISerializer
else:
return LtiNrpsContextMembershipBasicSerializer
def list(self, *args, **kwargs):
"""
Overrides default list method of ModelViewSet. Calls LMS `get_course_members`
API and returns result.
"""
# get course key
course_key = self.request.lti_configuration.location.course_key
try:
data = compat.get_course_members(course_key)
self.attach_external_user_ids(data)
# build correct format for the serializer
result = {
'id': self.request.build_absolute_uri(),
'context': {
'id': course_key
},
'members': data.values(),
}
# Serialize and return data NRPS reponse.
serializer = self.get_serializer_class()(result)
return Response(serializer.data)
except LtiError as ex:
log.warning("LTI NRPS Error: %s", ex)
return Response({
"error": "above_response_limit",
"explanation": "The number of retrieved users is bigger than the maximum allowed in the configuration.",
}, status=HTTP_403_FORBIDDEN)