Skip to content
Snippets Groups Projects
views.py 17.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • """
    LTI consumer plugin passthrough views
    """
    
    from django.contrib.auth import get_user_model
    
    from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
    
    from django.db import transaction
    
    from django.views.decorators.csrf import csrf_exempt
    from django.views.decorators.http import require_http_methods
    
    from django.views.decorators.clickjacking import xframe_options_sameorigin
    
    from django.shortcuts import render
    
    from django_filters.rest_framework import DjangoFilterBackend
    
    from opaque_keys.edx.keys import UsageKey
    
    from rest_framework import viewsets, status
    from rest_framework.decorators import action
    from rest_framework.response import Response
    
    from rest_framework.status import HTTP_403_FORBIDDEN
    
    from lti_consumer.models import (
        LtiConfiguration,
        LtiAgsLineItem,
        LtiDlContentItem,
    )
    
    
    from lti_consumer.lti_1p3.exceptions import (
        Lti1p3Exception,
        LtiDeepLinkingContentTypeNotSupported,
    )
    
    from lti_consumer.lti_1p3.extensions.rest_framework.constants import LTI_DL_CONTENT_TYPE_SERIALIZER_MAP
    
    from lti_consumer.lti_1p3.extensions.rest_framework.serializers import (
        LtiAgsLineItemSerializer,
        LtiAgsScoreSerializer,
        LtiAgsResultSerializer,
    
        LtiNrpsContextMembershipBasicSerializer,
        LtiNrpsContextMembershipPIISerializer,
    )
    from lti_consumer.lti_1p3.extensions.rest_framework.permissions import (
        LtiAgsPermissions,
        LtiNrpsContextMembershipsPermissions,
    
    from lti_consumer.lti_1p3.extensions.rest_framework.authentication import Lti1p3ApiAuthentication
    
    from lti_consumer.lti_1p3.extensions.rest_framework.renderers import (
        LineItemsRenderer,
        LineItemRenderer,
        LineItemScoreRenderer,
    
        LineItemResultsRenderer,
        MembershipResultRenderer,
    
    )
    from lti_consumer.lti_1p3.extensions.rest_framework.parsers import (
        LineItemParser,
        LineItemScoreParser,
    )
    
    from lti_consumer.lti_1p3.extensions.rest_framework.utils import IgnoreContentNegotiation
    
    from lti_consumer.plugin import compat
    from lti_consumer.utils import _, expose_pii_fields
    
    def user_has_staff_access(user, course_key):
        """
        Check if an user has write permissions to a given course.
        """
    
        return compat.user_has_access(user, "staff", course_key)
    
    
    
    def has_block_access(user, block, course_key):
        """
        Checks if a user has access to given xblock.
    
        ``has_access`` doesn't checks for course enrollment. On the otherhand, ``get_course_with_access``
        only checks for the course itself. There is no way to check access for specific xblock. This function
        has been created to perform a combination of check for both enrollment and access for specific xblock.
    
        Args:
            user: User Object
            block: xblock Object to check permission for
            course_key: A course_key specifying which course run this access is for.
    
        Returns:
            bool: True if user has access, False otherwise.
        """
        # Get the course
    
        course = compat.get_course_by_id(course_key)
    
    
        # Check if user is authenticated & enrolled
    
        course_access = compat.user_course_access(course, user, 'load', check_if_enrolled=True, check_if_authenticated=True)
    
    
        # Check if user has access to xblock
    
        block_access = compat.user_has_access(user, 'load', block, course_key)
    
    
        # Return True if the user has access to xblock and is enrolled in that specific course.
        return course_access and block_access
    
    
    
    @require_http_methods(["GET"])
    
    def public_keyset_endpoint(request, usage_id=None):
        """
        Gate endpoint to fetch public keysets from a problem
    
        This is basically a passthrough function that uses the
        OIDC response parameter `login_hint` to locate the block
        and run the proper handler.
        """
        try:
    
            lti_config = LtiConfiguration.objects.get(
                location=UsageKey.from_string(usage_id)
    
    
            if lti_config.version != lti_config.LTI_1P3:
                raise LtiError(
                    "LTI Error: LTI 1.1 blocks do not have a public keyset endpoint."
                )
    
            # Retrieve block's Public JWK
            # The underlying method will generate a new Private-Public Pair if one does
            # not exist, and retrieve the values.
            response = JsonResponse(lti_config.lti_1p3_public_jwk)
            response['Content-Disposition'] = 'attachment; filename=keyset.json'
            return response
    
        except (LtiError, InvalidKeyError, ObjectDoesNotExist) as exc:
            log.info("Error while retrieving keyset for usage_id %r: %s", usage_id, exc)
            raise Http404 from exc
    
    @require_http_methods(["GET", "POST"])
    
    def launch_gate_endpoint(request, suffix):
        """
        Gate endpoint that triggers LTI launch endpoint XBlock handler
    
        This is basically a passthrough function that uses the
        OIDC response parameter `login_hint` to locate the block
        and run the proper handler.
        """
        try:
    
            usage_key_str = request.GET.get('login_hint')
            usage_key = UsageKey.from_string(usage_key_str)
    
            return compat.run_xblock_handler(
    
                request=request,
                course_id=str(usage_key.course_key),
                usage_id=str(usage_key),
                handler='lti_1p3_launch_callback',
                suffix=suffix
            )
    
        except Exception as exc:
            log.warning("Error preparing LTI 1.3 launch for hint %r: %s", usage_key_str, exc)
            raise Http404 from exc
    
    
    
    @csrf_exempt
    @require_http_methods(["POST"])
    def access_token_endpoint(request, usage_id=None):
        """
        Gate endpoint to enable tools to retrieve access tokens
        """
        try:
            usage_key = UsageKey.from_string(usage_id)
    
    
            return compat.run_xblock_handler_noauth(
    
                request=request,
                course_id=str(usage_key.course_key),
                usage_id=str(usage_key),
                handler='lti_1p3_access_token'
            )
    
        except Exception as exc:
            log.warning("Error retrieving an access token for usage_id %r: %s", usage_id, exc)
            raise Http404 from exc
    
    # Post from external tool that doesn't
    # have access to CSRF tokens
    @csrf_exempt
    # This URL should work inside an iframe
    @xframe_options_sameorigin
    # Post only, as required by LTI-DL Specification
    @require_http_methods(["POST"])
    def deep_linking_response_endpoint(request, lti_config_id=None):
        """
        Deep Linking response endpoint where tool can send back
        """
        try:
            # Retrieve LTI configuration
            lti_config = LtiConfiguration.objects.get(id=lti_config_id)
    
            # First, check if the user has sufficient permissions to
            # save LTI Deep Linking content through the student.auth API.
            course_key = lti_config.location.course_key
            if not user_has_staff_access(request.user, course_key):
                raise PermissionDenied()
    
            # Get LTI consumer
            lti_consumer = lti_config.get_lti_consumer()
    
            # Retrieve Deep Linking return message and validate parameters
            content_items = lti_consumer.check_and_decode_deep_linking_token(
                request.POST.get("JWT")
            )
    
            # On a transaction, clear older DeepLinking selections, then
            # verify and save each content item passed from the tool.
            with transaction.atomic():
                # Erase older deep linking selection
                LtiDlContentItem.objects.filter(lti_configuration=lti_config).delete()
    
                for content_item in content_items:
    
                    content_type = content_item.get('type')
    
    
                    # Retrieve serializer (or raise)
                    if content_type not in LTI_DL_CONTENT_TYPE_SERIALIZER_MAP.keys():
                        raise LtiDeepLinkingContentTypeNotSupported()
    
                    serializer_cls = LTI_DL_CONTENT_TYPE_SERIALIZER_MAP[content_type]
    
    
                    # Validate content item data
                    serializer = serializer_cls(data=content_item)
                    serializer.is_valid(True)
    
                    # Save content item
                    LtiDlContentItem.objects.create(
                        lti_configuration=lti_config,
    
                        content_type=content_type,
    
                        attributes=serializer.validated_data,
                    )
    
    
            # Display sucess page to indicate that LTI DL Content was
            # saved successfully and auto-close after a few seconds.
            return render(request, 'html/lti-dl/dl_response_saved.html')
    
    
        # If LtiConfiguration doesn't exist, error with 404 status.
    
        except LtiConfiguration.DoesNotExist as exc:
            log.info("LtiConfiguration %r does not exist: %s", lti_config_id, exc)
            raise Http404 from exc
        # If the deep linking content type is not supported
        except LtiDeepLinkingContentTypeNotSupported as exc:
            log.info("One of the selected LTI Content Types is not supported: %s", exc)
            return render(
                request,
                'html/lti-dl/dl_response_error.html',
                {"error": _("The selected content type is not supported by Open edX.")},
                status=400
            )
        # Bad JWT message, invalid token, or any other message validation issues
        except (Lti1p3Exception, PermissionDenied) as exc:
            log.warning(
                "Permission on LTI Config %r denied for user %r: %s",
                lti_config,
                request.user,
                exc,
            )
            return render(
                request,
                'html/lti-dl/dl_response_error.html',
                {
                    "error": _("You don't have access to save LTI Content Items."),
                    "explanation": _("Please check that you have course staff permissions "
                                     "and double check this block's LTI settings."),
                },
                status=403
            )
    
    @require_http_methods(['GET'])
    
    def deep_linking_content_endpoint(request, lti_config_id=None):
        """
        Deep Linking endpoint for rendering Deep Linking Content Items.
        """
        try:
            # Get LTI Configuration
            lti_config = LtiConfiguration.objects.get(id=lti_config_id)
    
        except LtiConfiguration.DoesNotExist as exc:
            log.info("LtiConfiguration %r does not exist: %s", lti_config_id, exc)
            raise Http404 from exc
    
    
        # check if user has proper access
        if not has_block_access(request.user, lti_config.block, lti_config.location.course_key):
    
            log.warning(
                "Permission on LTI Config %r denied for user %r.",
                lti_config_id,
                request.user,
            )
            raise PermissionDenied
    
    
        # Get all LTI-DL contents
        content_items = LtiDlContentItem.objects.filter(lti_configuration=lti_config)
    
        # If no LTI-DL contents found for current configuration, throw 404 error
        if not content_items.exists():
    
            log.info("There's no Deep linking content for LTI configuration %s.", lti_config)
            raise Http404
    
    
        # Render LTI-DL contents
        return render(request, 'html/lti-dl/render_dl_content.html', {
            'content_items': content_items,
            'block': lti_config.block,
        })
    
    
    
    class LtiAgsLineItemViewset(viewsets.ModelViewSet):
        """
        LineItem endpoint implementation from LTI Advantage.
    
        See full documentation at:
        https://www.imsglobal.org/spec/lti-ags/v2p0#line-item-service
        """
        serializer_class = LtiAgsLineItemSerializer
        pagination_class = None
    
        # Custom permission classes for LTI APIs
        authentication_classes = [Lti1p3ApiAuthentication]
        permission_classes = [LtiAgsPermissions]
    
        # Renderer/parser classes to accept LTI AGS content types
        renderer_classes = [
            LineItemsRenderer,
            LineItemRenderer,
        ]
        parser_classes = [LineItemParser]
    
        # Filters
        filter_backends = [DjangoFilterBackend]
        filterset_fields = [
            'resource_link_id',
            'resource_id',
            'tag'
        ]
    
        def get_queryset(self):
            lti_configuration = self.request.lti_configuration
    
            # Return all LineItems related to the LTI configuration.
            # TODO:
            # Note that each configuration currently maps 1:1
            # to each resource link (block), and this filter needs
            # improved once we start reusing LTI configurations.
            return LtiAgsLineItem.objects.filter(
                lti_configuration=lti_configuration
            )
    
        def perform_create(self, serializer):
            lti_configuration = self.request.lti_configuration
            serializer.save(lti_configuration=lti_configuration)
    
    
        @action(
            detail=True,
            methods=['GET'],
            url_path='results/(?P<user_id>[^/.]+)?',
    
            renderer_classes=[LineItemResultsRenderer],
            content_negotiation_class=IgnoreContentNegotiation,
    
        def results(self, request, user_id=None, **kwargs):
    
            """
            Return a Result list for an LtiAgsLineItem
    
            URL Parameters:
              * user_id (string): String external user id representation.
    
            Query Parameters:
              * limit (integer): The maximum number of records to return. Records are
                    sorted with most recent timestamp first
    
            Returns:
              * An array of Result records, formatted by LtiAgsResultSerializer
                    and returned with the media-type for LineItemResultsRenderer
            """
            line_item = self.get_object()
            scores = line_item.scores.filter(score_given__isnull=False).order_by('-timestamp')
    
            if user_id:
                scores = scores.filter(user_id=user_id)
    
            if request.query_params.get('limit'):
                scores = scores[:int(request.query_params.get('limit'))]
    
            serializer = LtiAgsResultSerializer(
                list(scores),
                context={'request': self.request},
                many=True,
            )
    
            return Response(serializer.data)
    
        @action(
            detail=True,
            methods=['POST'],
            parser_classes=[LineItemScoreParser],
    
            renderer_classes=[LineItemScoreRenderer],
            content_negotiation_class=IgnoreContentNegotiation,
    
        def scores(self, request, *args, **kwargs):
    
            """
            Create a Score record for an LtiAgsLineItem
    
            Data:
              * A JSON object capable of being serialized by LtiAgsScoreSerializer
    
            Returns:
              * An copy of the saved record, formatted by LtiAgsScoreSerializer
                    and returned with the media-type for LineItemScoreRenderer
            """
            line_item = self.get_object()
    
            user_id = request.data.get('userId')
    
            # Using `filter` and `first` so that when a score does not exist,
            # `existing_score` is set to `None`. Using `get` will raise `DoesNotExist`
            existing_score = line_item.scores.filter(user_id=user_id).first()
    
            serializer = LtiAgsScoreSerializer(
                instance=existing_score,
                data=request.data,
                context={'request': self.request},
            )
            serializer.is_valid(raise_exception=True)
            serializer.save(line_item=line_item)
            headers = self.get_success_headers(serializer.data)
            return Response(
                serializer.data,
                status=status.HTTP_201_CREATED,
                headers=headers
            )
    
    
    
    class LtiNrpsContextMembershipViewSet(viewsets.ReadOnlyModelViewSet):
        """
        LTI NRPS Context Membership Service endpoint.
    
        See full documentation at:
        http://imsglobal.org/spec/lti-nrps/v2p0
        """
    
        # Custom permission classes for LTI APIs
        authentication_classes = [Lti1p3ApiAuthentication]
        permission_classes = [LtiNrpsContextMembershipsPermissions]
    
        # Renderer classes to accept LTI NRPS content types
        renderer_classes = [
            MembershipResultRenderer,
        ]
    
        def attach_external_user_ids(self, data):
            """
            Preprocess the output of `get_membership` method amd appends external ids to each user.
            """
    
            # batch get or create external ids for all users
            user_ids = data.keys()
            users = get_user_model().objects.prefetch_related('profile').filter(id__in=user_ids)
    
            # get external ids
            external_ids = compat.batch_get_or_create_externalids(users)
    
            for userid in user_ids:
                # append external ids to user
                data[userid]['external_id'] = external_ids[userid].external_user_id
    
        def get_serializer_class(self):
            """
            Overrides ModelViewSet's `get_serializer_class` method.
            Checks if PII fields can be exposed and returns appropiate serializer.
            """
            if expose_pii_fields(self.request.lti_configuration.location.course_key):
                return LtiNrpsContextMembershipPIISerializer
            else:
                return LtiNrpsContextMembershipBasicSerializer
    
        def list(self, *args, **kwargs):
            """
            Overrides default list method of ModelViewSet. Calls LMS `get_course_members`
            API and returns result.
            """
    
            # get course key
            course_key = self.request.lti_configuration.location.course_key
    
            try:
                data = compat.get_course_members(course_key)
                self.attach_external_user_ids(data)
    
                # build correct format for the serializer
                result = {
                    'id': self.request.build_absolute_uri(),
                    'context': {
                        'id': course_key
                    },
                    'members': data.values(),
                }
    
                # Serialize and return data NRPS reponse.
                serializer = self.get_serializer_class()(result)
                return Response(serializer.data)
    
            except LtiError as ex:
                log.warning("LTI NRPS Error: %s", ex)
                return Response({
                    "error": "above_response_limit",
                    "explanation": "The number of retrieved users is bigger than the maximum allowed in the configuration.",
                }, status=HTTP_403_FORBIDDEN)