Skip to content
Snippets Groups Projects
Commit 5d371542 authored by Stefan Hynek's avatar Stefan Hynek :drooling_face:
Browse files

Merge branch '20-use-tgsearch-from-tgclients-lib' into 'main'

Resolve "use tgsearch from tgclients lib"

Closes #20

See merge request !11
parents 61644a05 86b7989a
No related branches found
No related tags found
1 merge request!11Resolve "use tgsearch from tgclients lib"
Pipeline #277334 passed
...@@ -8,13 +8,12 @@ from pprint import pformat ...@@ -8,13 +8,12 @@ from pprint import pformat
from tgclients.auth import TextgridAuth from tgclients.auth import TextgridAuth
from tgclients.config import TextgridConfig from tgclients.config import TextgridConfig
from tgclients.crud import TextgridCRUD from tgclients.crud import TextgridCRUD
from tgclients.search import TextgridSearch
from wsgidav.dav_provider import DAVCollection, DAVNonCollection, DAVProvider from wsgidav.dav_provider import DAVCollection, DAVNonCollection, DAVProvider
from wsgidav.util import join_uri, pop_path from wsgidav.util import join_uri, pop_path
from repdav.stream_tools import FileLikeQueue from repdav.stream_tools import FileLikeQueue
from .tgapi import TextgridSearch
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
...@@ -80,6 +79,8 @@ class TextgridProject(DAVCollection): ...@@ -80,6 +79,8 @@ class TextgridProject(DAVCollection):
_logger.debug("Called TextgridProject.__init__(self, %s, environ).", path) _logger.debug("Called TextgridProject.__init__(self, %s, environ).", path)
DAVCollection.__init__(self, path, environ) DAVCollection.__init__(self, path, environ)
self._sid = environ["wsgidav.auth.user_name"] self._sid = environ["wsgidav.auth.user_name"]
self._tgconfig = TextgridConfig()
self._tgsearch = TextgridSearch(self._tgconfig.search)
def create_empty_resource(self, name): def create_empty_resource(self, name):
pass pass
...@@ -99,14 +100,22 @@ class TextgridProject(DAVCollection): ...@@ -99,14 +100,22 @@ class TextgridProject(DAVCollection):
# return names # return names
# #
# path resolution has to be rewritten before we can work with resource titles # path resolution has to be rewritten before we can work with resource titles
resources = TextgridSearch().get_project_contents( response = self._tgsearch.list_project_root(self.path.split("/")[-1], self._sid)
self._sid, self.path.split("/")[-1] names = []
) for result in response.result:
return resources.keys() names.append(result.object_value.generic.generated.textgrid_uri.value)
return names
def get_member(self, name): def get_member(self, name):
_logger.debug("Called TextgridProject.get_member(self, %s).", name) _logger.debug("Called TextgridProject.get_member(self, %s).", name)
info = TextgridSearch().info(self._sid, name) response = self._tgsearch.info(name, self._sid)
info = {
name: {
"title": response.result[0].object_value.generic.provided.title,
"format": response.result[0].object_value.generic.provided.format,
"extent": response.result[0].object_value.generic.generated.extent,
}
}
_logger.info("INFO: %s", info) _logger.info("INFO: %s", info)
if "aggregation" in info[name]["format"]: if "aggregation" in info[name]["format"]:
return TextgridAggregation(join_uri(self.path, name), self.environ, info) return TextgridAggregation(join_uri(self.path, name), self.environ, info)
...@@ -161,6 +170,8 @@ class TextgridAggregation(DAVCollection): ...@@ -161,6 +170,8 @@ class TextgridAggregation(DAVCollection):
DAVCollection.__init__(self, path, environ) DAVCollection.__init__(self, path, environ)
self._sid = environ["wsgidav.auth.user_name"] self._sid = environ["wsgidav.auth.user_name"]
self._info = info self._info = info
self._tgconfig = TextgridConfig()
self._tgsearch = TextgridSearch(self._tgconfig.search)
def create_empty_resource(self, name): def create_empty_resource(self, name):
pass pass
...@@ -173,15 +184,22 @@ class TextgridAggregation(DAVCollection): ...@@ -173,15 +184,22 @@ class TextgridAggregation(DAVCollection):
def get_member_names(self): def get_member_names(self):
_logger.debug("Called TextgridAggregation.get_member_names(self).") _logger.debug("Called TextgridAggregation.get_member_names(self).")
resources = TextgridSearch().get_aggregation_contents( response = self._tgsearch.list_aggregation(self.path.split("/")[-1], self._sid)
self._sid, self.path.split("/")[-1] names = []
) for result in response.result:
# _logger.debug("RESOURCES: %s", resources) names.append(result.object_value.generic.generated.textgrid_uri.value)
return resources.keys() return names
def get_member(self, name): def get_member(self, name):
_logger.debug("Called TextgridAggregation.get_member(self, %s).", name) _logger.debug("Called TextgridAggregation.get_member(self, %s).", name)
info = TextgridSearch().info(self._sid, name) response = self._tgsearch.info(name, self._sid)
info = {
name: {
"title": response.result[0].object_value.generic.provided.title,
"format": response.result[0].object_value.generic.provided.format,
"extent": response.result[0].object_value.generic.generated.extent,
}
}
_logger.info("INFO: %s", info) _logger.info("INFO: %s", info)
if "aggregation" in info[name]["format"]: if "aggregation" in info[name]["format"]:
return TextgridAggregation(join_uri(self.path, name), self.environ, info) return TextgridAggregation(join_uri(self.path, name), self.environ, info)
...@@ -260,7 +278,7 @@ class TextgridResource(DAVNonCollection): ...@@ -260,7 +278,7 @@ class TextgridResource(DAVNonCollection):
) )
queue = FileLikeQueue(int(self._size)) queue = FileLikeQueue(int(self._size))
config = TextgridConfig("http://textgridlab.org/") config = TextgridConfig()
crud = TextgridCRUD(config.crud) crud = TextgridCRUD(config.crud)
metadata = crud.read_metadata(self.path.split("/")[-1], self._sid).content metadata = crud.read_metadata(self.path.split("/")[-1], self._sid).content
......
"""Communicate with the different Textgrid APIs."""
import io
import xml.etree.ElementTree as ET
import requests
#from typing import List
from zeep import Client
from zeep.exceptions import TransportError
from .config import TextgridConfig
__docformat__ = 'restructuredtext en'
DEFAULT_API_VERSION = "1.0"
def lookup_api_path(internal_name: str, api_version: str = DEFAULT_API_VERSION) -> str:
"""Helper function that returns an uri fragment to the named API endpoints.
:param internal_name: Internal API name
:type internal_name: str
:param api_version: API version, defaults to DEFAULT_API_VERSION
:type api_version: str, optional
:raises NotImplementedError: Raises if an unsupported API version is specified
:return: Uri fragment
:rtype: str
"""
if api_version == "1.0":
mapping = {
"search": "/tgsearch/search/",
"info": "/tgsearch/info/",
"navigation": "/tgsearch/navigation/",
"agg": "/tgsearch/navigation/agg/",
"toplevel": "/tgsearch/navigation/toplevel/",
"facet": "/tgsearch/facet/",
"relation": "/tgsearch/relation/",
"crud": "/tgcrud/rest/",
}
elif api_version == "2.0":
raise NotImplementedError
else:
raise NotImplementedError
return mapping[internal_name]
class TextgridAuth:
"""Provide access to the Textgrid Authorization Service.
"""
def __init__(self):
self._config = TextgridConfig()
def _connect(self) -> Client:
"""Internal helper that provides a SOAP client that is configured for
the use with the Textgrid Auth service.
:return: A SOAP client
:rtype: zeep.Client
"""
client = Client(self._config.auth_wsdl)
# this is a dirty hack; should be remediated
client.service._binding_options["address"] = self._config.auth_address
return client
# replace ":" with " -> List | None:" when switching to python3.10
def assigned_projects(self, sid: str):
"""Get assigned projects.
:return: A list of project id strings
:rtype: List | None
"""
client = self._connect()
try:
return client.service.tgAssignedProjects(sid)
except TransportError:
return None
class TextgridSearch:
"""Provide access to the Textgrid Search Service."""
def __init__(self):
self._config = TextgridConfig()
def info(self, sid: str, textgrid_uri: str) -> dict:
"""Get info from tg-uri.
:return: A decent choice of metadata
:rtype: dict
"""
uri = self._config.host + DEFAULT_API_VERSION + lookup_api_path("info")
response = requests.get(uri + textgrid_uri + "?sid=" + sid)
# TODO implement error handling on http status not in 200..299
return self._process_response(response)
def get_project_contents(self, sid: str, project_id: str) -> dict:
"""Get objects belonging to a project, filtered by objects that are in
an aggregation in the same project.
:return: A dictionary with keys of Textgrid URIs and values of metadata dictionaries
:rtype: dict
"""
response = requests.get(
self._config.nav_address + project_id + "?sid=" + sid)
return self._process_response(response)
def get_aggregation_contents(self, sid: str, textgrid_uri: str) -> dict:
"""Get child resources of an aggregation.
:param sid: Session ID
:type sid: str
:param textgrid_uri: Textgrid URI
:type textgrid_uri: str
:return: A dictionary with keys of Textgrid URIs and values of metadata dictionaries
:rtype: dict
"""
response = requests.get(self._config.nav_address + "agg/" +
textgrid_uri + "?sid=" + sid)
return self._process_response(response)
def _process_response(self, response: requests.Response) -> dict:
"""Process the response of a Textgrid Search Service into a dictionary.
:param response: Response of the Textgrid Search Service
:type response: requests.Response
:return: A dictionary with keys of Textgrid URIs and values of metadata dictionaries
:rtype: dict
"""
namespaces = self._register_namespaces(response.text)
xml_stream = io.StringIO(response.text)
element_tree = ET.parse(xml_stream)
# findall direct children
tg_results = element_tree.findall("tgs:result", namespaces=namespaces)
result_dict = {}
for result in tg_results:
result_dict[result.find("./object/generic/generated/textgridUri", namespaces).text] = {
# title^=name, format^=content_type, extent^=content_length
"title": result.find("./object/generic/provided/title", namespaces).text,
"format": result.find("./object/generic/provided/format", namespaces).text,
"extent": int(result.find("./object/generic/generated/extent", namespaces).text),
}
return result_dict
@staticmethod
def _register_namespaces(xml: str) -> dict:
"""Register namespaces to the global ElementTree and return a
dictionary of them.
:param xml: XML Document
:type xml: str
:return: Namespaces dictionary
:rtype: dict
"""
xml_stream = io.StringIO(xml)
# Uses a list comprehension and element tree's iterparse function to
# create a dictionary containing the namespace prefix and it's uri. The
# underscore is utilized to remove the "start-ns" output. `iterparse`
# returns an iterator providing (event, elem) pairs.
# see https://medium.datadriveninvestor.com/getting-started-using-pythons-elementtree-to-navigate-xml-files-dc9bc720eaa6
namespaces = {node[0]: node[1]
for _, node in ET.iterparse(xml_stream, events=['start-ns'])}
# Iterates through the newly created namespace dictionary registering the prefixes
for prefix, uri in namespaces.items():
ET.register_namespace(prefix + "_", uri)
return namespaces
class TextgridCRUD:
"""Provide access to the Textgrid CRUD Service."""
def __init__(self):
self._config = TextgridConfig()
def get_data(self, sid: str, textgrid_uri: str) -> bytes:
"""Get resource data.
:param sid: Session ID
:type sid: str
:param textgrid_uri: Textgrid URI
:type textgrid_uri: str
:return: Response content
:rtype: bytes
"""
response = self._get_resource(sid, textgrid_uri)
return response.content
def get_metadata(self, sid: str, textgrid_uri: str):
# currently unused
response = self._get_resource(sid, textgrid_uri)
return self._process_response(response)
def _get_resource(self, sid: str, textgrid_uri: str) -> requests.Response:
"""Helper function to get the response headers of a requested resource
and defer the download of the message body.
:param sid: Session ID
:type sid: str
:param textgrid_uri: Textgrid URI
:type textgrid_uri: str
:return: Response object
:rtype: requests.Response
"""
uri = self._config.host + DEFAULT_API_VERSION + lookup_api_path("crud")
# defer downloading the response body until accessing Response.content
response = requests.get(uri + textgrid_uri +
"/data?sessionId=" + sid, stream=True)
# TODO implement error handling on http status not in 200..299
return response
@staticmethod
def _process_response(response: requests.Response) -> dict:
# only called by `get_metadata` => currently unused
result_dict = {}
result_dict["content-type"] = response.headers.get("content-type")
result_dict["content-length"] = int(
response.headers.get("content-length") or 0)
return result_dict
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment