Source code for schema_parser.parser

#  Copyright (c) 3-2023.  OCX Consortium https://3docx.org. See the LICENSE

from collections import defaultdict
from logging import Logger
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union

import requests
from lxml.etree import Element
from lxml.etree import QName
from requests import HTTPError

from . import DEFAULT_SCHEMA
from . import PROCESS_SCHEMA_TYPES
from . import SCHEMA_FOLDER
from . import W3C_SCHEMA_BUILT_IN_TYPES
from .data_classes import SchemaSummary
from .data_classes import SchemaType
from .elements import OcxAttribute
from .elements import OcxChildElement
from .elements import OcxGlobalElement
from .helpers import SchemaHelper
from .xparse import LxmlElement
from .xparse import LxmlParser


[docs]class OcxSchema: """The OcxSchema provides functionality for parsing the OCX xsd schema and storing all the elements. Args: logger: The main python logger Attributes: _namespace: The dict of all namespaces on the form (prefix, namespace) key-value pairs resulting from parsing all schema files, `W3C <https://www.w3.org/TR/xml-names/#sec-namespaces>`_. _ocx_global_elements: Hash table as key-value pairs `(tag, OcxSchemaElement)` for all parsed schema elements _is_parsed: True if a schema has been parsed, False otherwise _schema_version: The version of the parsed schema _local_folder: The local folder where any external schemas will be downloaded _schema_changes: A list of all schema changes described by the tag SchemaChange contained in the xsd file. _schema_types: The list of xsd types to be parsed. Only these types will be stored. _default_schema: The default schema to be parsed _builtin_xs_types: W3C primitive data types. `www.w3.org <https://www.w3.org/TR/xmlschema-2/#built-in-primitive-datatypes>`_. Defined in ``config.yaml`` """ def __init__(self, logger: Logger, local_folder: str = SCHEMA_FOLDER): self._parser = LxmlParser(logger) self.log = logger # Default namespace map for the reserved prefix xml. See https://www.w3.org/TR/xml-names/#sec-namespaces self._namespace = {"xml": "http://www.w3.org/XML/1998/namespace"} Path(SCHEMA_FOLDER).mkdir(parents=True, exist_ok=True) self._is_parsed = False self._local_folder = local_folder self._default_schema = DEFAULT_SCHEMA self._all_schema_elements = {} # Hash table with tag as key schema_elements[tag] = lxml.etree.Element self._ocx_global_elements = {} # Hash table with tag as key, value pairs(tag, OcxGlobalElement) self._all_types = defaultdict(list) # Hash table with tag as key: all_types[tag] = lxml.etree.Element self._schema_types = PROCESS_SCHEMA_TYPES self._schema_version = None self._schema_changes = defaultdict(list) # w3c primitive data types ref https://www.w3.org/TR/xmlschema-2/#built-in-primitive-datatypes self._builtin_xs_types = W3C_SCHEMA_BUILT_IN_TYPES def _add_global_ocx_element(self, tag: str, element: OcxGlobalElement): """Add a global OCX element to the hash table Args: tag: The hash key element: The global OCX element to add """ # self.log.debug(f'(Added schema element with tag {tag}') self._ocx_global_elements[tag] = element def _add_schema_element(self, tag: str, element: Element): """Add a new schema element to the hash table Args: tag: The hash key element: The schema ``Element`` to add """ # self.log.debug(f'(Added schema element with tag {tag}') self._all_schema_elements[tag] = element def _add_schema_type(self, schema_type: str, tag: str): """Add a new schema type to the hash table Args: tag: The hash key schema_type: The schema type """ # self.log.debug(f'(Added schema element with type {schema_type} and tag {tag}') self._all_types[schema_type].append(tag)
[docs] def process_schema(self, schema_url: str = DEFAULT_SCHEMA) -> bool: """Process the XSD schema file and create all hash tables of global elements. Returns: True of processed OK, False otherwise. """ if self._parse_schema(schema_url): self._process_ocx_elements() # Sort the hash table # self._sort_schema_elements() ToDo: This function changes the dict to a list. Fix it! return True else: return False
[docs] def get_schema_folder(self) -> str: """Return the local folder where the schemas are stored. The local folder is relative to the project root. Returns: The relative path to the local schema folder. """ return self._local_folder
[docs] def put_schema_folder(self, local_folder: str): """Set the local folder where the schemas are stored. The local folder is relative to the project root.""" self._local_folder = local_folder
[docs] def put_default_schema(self, schema_url: str): """Return the default schema to be parsed. Args: schema_url: The location of the schema """ self._default_schema = schema_url
[docs] def get_default_schema(self) -> str: """Return the default schema to be parsed. Returns: The default schema url. """ return self._default_schema
def _parse_schema(self, schema_url: str = DEFAULT_SCHEMA) -> bool: """Parse the OCX xsd schema. The method will traverse any referenced (using the tag xs:import) schemas and parse these also. If the referenced schema url is not a local file, the method will download the file before the schema is parsed. Args: schema_url: the path or URL to the xsd file Returns: True if all schemas are parsed successfully, else returns False """ if "http" not in schema_url: if not Path(schema_url).exists(): self.log.error(f"The xsd file {schema_url} does not exist") return False else: try: remote_file = Path(schema_url).name file = Path(self._local_folder) / remote_file r = requests.get(schema_url) with open(file, "wb") as f: f.write(r.content) self.log.debug(f'Successfully downloaded remote schema "{schema_url}" ' f'to local folder "{self._local_folder}"') schema_url = file except HTTPError as e: self.log.error(f'Failed to access schema from "{schema_url}: {e}""') return False try: self._is_parsed = self._parser.parse(schema_url) except BaseException as e: self.log.error(e.with_traceback) return False if self._is_parsed: self.log.debug(f'Successfully parsed xsd schema with location "{schema_url}"') root = self._parser.get_root() ns = self._parser.get_namespaces() # Add the ns to the global namespace dict n = self._add_namespace(ns) # The target namespace for the current schema target_ns = self._parser.get_target_namespace() if target_ns not in self._namespace.values(): self.log.error(f'The target _namespace "{target_ns}" is not registered in the _namespace listing {self._namespace}') self._is_parsed = False return False # Retrieve the OCX schema version version = SchemaHelper.get_schema_version(root) if version != "Missing": self._schema_version = version self.log.debug(f'Added {n} new namespaces for schema "{schema_url}"') if LxmlElement.has_child_with_name(root, "SchemaChange"): changes = SchemaHelper.find_schema_changes(root) if len(changes) > 0: self._schema_changes = changes # Build the look-up tables for all global element types for schema_type in self._schema_types: # Only search for selected element types types = LxmlElement.find_all_children_with_name_and_attribute(root, schema_type, "name") for e in types: # Add element to look-up table name = LxmlElement.get_name(e) if name is not None: # add the schema type tag = SchemaHelper.unique_tag(name, target_ns) schema_type = LxmlElement.get_localname(e) # Only process the selected element types and store in hash tables if schema_type in self._schema_types: self._add_schema_element(tag, e) self._add_schema_type(schema_type, tag) # Parse any imported schemas references = self._parser.get_referenced_files() for ns in references: url = references[ns] self._is_parsed = self._parse_schema(url) if self._is_parsed: if ns not in list(self.get_namespaces().values()): self.log.error(f'Mismatched _namespace "{ns}" in xsd with url: "{url}"') else: break return self._is_parsed
[docs] def is_parsed(self) -> bool: return self._is_parsed
def _process_ocx_elements(self): """Process all parsed elements and build the hash table of OcxSchemaElement""" # All schema elements of type element elements = self._get_schema_element_types() for tag in elements: e = self._get_element(tag) qn = QName(tag) name = qn.localname self.log.debug(f"Adding global element {name}") ocx = OcxGlobalElement(e, tag, self.log) # store in look-up table self._add_global_ocx_element(tag, ocx) # Find all parents and add them to the instance self._find_all_my_parents(ocx) # Process all xs:attribute elements including all supertypes self._process_attributes(ocx) # Process ald children including super type children self._process_children(ocx) return def _process_attributes(self, ocx: OcxGlobalElement): """Process all xs:attributes of the global element Args: ocx: The parent OCX element """ # Process all xs:attribute elements including all supertypes attributes = LxmlElement.find_attributes(ocx.get_schema_element()) for a in attributes: ocx.add_attribute(self._process_attribute(a)) # Iterate over parents parents = ocx.get_parents() for t in parents: attributes = LxmlElement.find_attributes(parents[t]) for a in attributes: ocx.add_attribute(self._process_attribute(a)) # Process all xs:attributeGroup elements including all supertypes attributeGroups groups = LxmlElement.find_attribute_groups(ocx.get_schema_element()) for group in groups: # Get the reference ref = LxmlElement.get_reference(group) if ref is not None: tag, at_group = self._get_element_from_type(ref) if at_group is not None: attributes = LxmlElement.find_attributes(at_group) for a in attributes: ocx.add_attribute(self._process_attribute(a)) else: self.log.error(f"Attribute group {ref} is not found in the global look-up table") # Iterate over parents parents = ocx.get_parents() for t in parents: groups = LxmlElement.find_attribute_groups(parents[t]) for group in groups: # Get the reference ref = LxmlElement.get_reference(group) if ref is not None: tag, at_group = self._get_element_from_type(ref) if at_group is not None: attributes = LxmlElement.find_attributes(at_group) for a in attributes: ocx.add_attribute(self._process_attribute(a)) else: self.log.error(f"Attribute group {ref} is not found in the global look-up table") return def _process_children(self, ocx: OcxGlobalElement): """Process all xs:element of the global element Args: ocx: The parent OCX element """ # Process all xs:element elements including all supertypes elements = LxmlElement.find_all_children_with_name(ocx.get_schema_element(), "element") for e in elements: ocx.add_child(self._process_child(e)) # Iterate over parents parents = ocx.get_parents() for t in parents: elements = LxmlElement.find_all_children_with_name(parents[t], "element") for e in elements: ocx.add_child(self._process_child(e)) return def _process_attribute(self, xs_attribute: Element) -> OcxAttribute: """Process an xs:attribute element Returns: An instance of the OcxAttribute """ attribute = OcxAttribute(xs_attribute) reference = LxmlElement.get_reference(xs_attribute) if reference is not None: # Get the referenced element tag, a = self._get_element_from_type(reference) attribute.put_name(LxmlElement.get_name(a)) if attribute.get_description() == "": attribute.put_description(LxmlElement.get_element_text(a)) attribute.put_type(SchemaHelper.get_type(a)) return attribute def _process_child(self, xs_element: Element) -> OcxChildElement: """Process an xs:element child element Returns: An instance of the OcxChildElement """ child = OcxChildElement(xs_element) reference = LxmlElement.get_reference(xs_element) if reference is not None: # Get the referenced element tag, a = self._get_element_from_type(reference) child.put_name(LxmlElement.get_name(a)) if child.get_description() == "": child.put_description(LxmlElement.get_element_text(a)) child.put_type(SchemaHelper.get_type(a)) child.put_reference(tag) return child def _get_element(self, tag: str) -> Union[Element, None]: """Private function to get the ``etree.Element`` with the key 'tag' Returns: The ``OcxGlobalElement`` instance """ if tag in self._builtin_xs_types: self.log.debug(f"{__class__}: The tag {tag} is a built-in type {self._builtin_xs_types[tag]}") return None if tag not in self._all_schema_elements.keys(): self.log.debug(f"{__class__}: The tag {tag} is not in the look-up table") return self._all_schema_elements.get(tag) def _get_element_from_type(self, schema_type: str) -> Tuple[Any, Any]: """Private method to retrieve the schema element ``etree.Element`` with the key 'type' Returns: A tuple of the element unique tag and the element (tag, Element) """ name = LxmlElement.strip_namespace_prefix(schema_type) if LxmlElement.namespace_prefix(schema_type) in self._namespace: namespace = self._namespace[LxmlElement.namespace_prefix(schema_type)] else: self.log.debug(f"The type {schema_type} has an unknown _namespace prefix") return None, None tag = SchemaHelper.unique_tag(name, namespace) if tag in self._builtin_xs_types: self.log.debug(f"The tag {tag} is a built-in type {self._builtin_xs_types[tag]}") return None, None if tag not in self._all_schema_elements: self.log.debug(f"{__class__}: The tag {tag} is not in the look-up table") return None, None else: return tag, self._all_schema_elements.get(tag) def _find_parents(self, child_tag: str, ocx: OcxGlobalElement): """Recursively find all ancestors of the global element ``OxcGlobalElement`` Args: child_tag: The unique tag of a child ocx: The global element (the root to start the search from) """ # Look up the xsd element ocx.get_name() e = self._get_element(child_tag) if e is not None: # The element's type is the parent schema_type = SchemaHelper.get_type(e) if schema_type is not None: # Look up the parent xsd element from its type parent_tag, parent_element = self._get_element_from_type(schema_type) # Add the parent to the global ocx if parent_tag is not None: ocx.put_parent(parent_tag, parent_element) assertion = LxmlElement.find_assertion(parent_element) if assertion is not None: ocx.add_assertion(assertion) self._find_parents(parent_tag, ocx) else: return return def _find_all_my_parents(self, ocx: OcxGlobalElement): """Recursively find all the xsd schema parents of a global xsd element(parent, grandparent ...) The parents found is added to the ocx instance (child) Args: ocx: The global ocx instance to search from """ # Get the unique tag of the global element tag = ocx.get_tag() # Find my parents self._find_parents(tag, ocx)
[docs] def get_ocx_element_from_type(self, schema_type: str) -> Union[OcxGlobalElement, None]: """Method to retrieve the schema ``element etree.Element`` with the key 'type' Args: schema_type: the ocx type on the form ``prefix:name`` Returns: The ``OcxGlobalElement`` instance """ nsprefix = LxmlElement.namespace_prefix(schema_type) name = LxmlElement.strip_namespace_prefix(schema_type) if nsprefix not in self._namespace.values(): for prefix in self._namespace: if prefix == LxmlElement.namespace_prefix(schema_type): namespace = self._namespace[prefix] tag = SchemaHelper.unique_tag(name, namespace) if tag not in self._all_schema_elements: self.log.debug(f"{__class__}: The tag {tag} is not in the look-up table") return None else: return self._ocx_global_elements[tag] else: self.log.debug(f'{__class__}: The _namespace prefix "{nsprefix}" is not defined') return None
def _get_prefix_from_namespace(self, namespace: str) -> str: """Return the namespace prefix Returns: the namespace prefix """ prefix = "None" if namespace not in list(self._namespace.values()): self.log.debug(f"The _namespace {namespace} is not in the global _namespace dict") for item in self._namespace: if namespace == self._namespace[item]: prefix = item return prefix def _add_namespace(self, namespace: dict) -> int: """Add new namespaces to the global namespace dict' Returns: The number of new namespaces added """ ns = self._namespace # Check if any keys exists for prefix in ns: if prefix in namespace.keys(): self.log.debug( f'The _namespace prefix "{prefix}" already exists. ' f"Dropping new _namespace {namespace[prefix]} from the _namespace table" ) self.log.debug(f'The existing _namespace with prefix "{prefix}" is: {self._namespace[prefix]}') del namespace[prefix] self._namespace = {**self._namespace, **namespace} return len(self._namespace) - len(ns)
[docs] def get_namespaces(self) -> Dict: """The parsed namespaces' Returns: The dict of namespaces as (namespace,prefix) key-value pairs """ return self._namespace
def _get_all_schema_elements(self) -> Dict: """All ``lxml.etree.Element`` schema elements Returns: The dict of all global xsd lxml.etree.Element elements with tag as key """ return self._all_schema_elements def _sort_schema_elements(self): """Sorts the schema hash table""" sorted_dict = sorted(self._all_schema_elements.items(), key=lambda kv: kv[0]) self._all_schema_elements = sorted_dict def _get_schema_types(self, schema_type: str) -> List[str]: """Internal function to retrieve a list of tags of ``lxml.etree.Element`` schema elements of a specific type Returns: The sorted list of all tags of ``lxml.etree.Element`` of type ``schema_type`` """ elements = [] for tag in self._all_types[schema_type]: elements.append(tag) return sorted(elements)
[docs] def get_ocx_elements(self) -> List: """All ocx ``OcxGlobalElement`` elements Returns: The list of all parsed ``OcxGlobalElement`` instances """ return list(self._ocx_global_elements.values())
[docs] def get_schema_version(self) -> str: """The OCX schema version Returns: The coded version string of the OCX schema """ return self._schema_version
[docs] def get_schema_changes(self) -> Dict: """The OCX schema change history Returns: The schema changes for all schema versions """ return self._schema_changes
def _get_schema_element_types(self) -> List: """All schema elements of type ``element`` Returns: The list of all etree.Element of type ``element`` """ return self._get_schema_types("element") def _get_schema_complex_types(self) -> List[str]: """All tags for schema elements of type ``complexType`` Returns: The list of tags of all ``etree.Element`` of type ``complexType`` """ return self._get_schema_types("complexType") def _get_schema_simple_types(self) -> List[str]: """Alle schema elements of type ``simpleType`` Returns: The list of tags of all etree.Element of type ``simpleType`` """ return self._get_schema_types("simpleType") def _get_schema_attribute_tyepes(self) -> List[str]: """All schema elements of type ``attribute' Returns: The list of unique tags for all etree.Element of type ``attribute`` """ return self._get_schema_types("attribute") def _get_schema_attribute_group_types(self) -> List[str]: """All schema elements of type ``attributeGroup`` Returns: The list of all etree.Element of type ``attributeGroup`` """ return self._get_schema_types("attributeGroup")
[docs] def tbl_summary(self) -> SchemaSummary: """The summary of the parsed schema and any referenced schemas' Returns: The schema summary """ schema_version = [("Schema Version", self.get_schema_version())] schema_types = [(schema_type, len(self._all_types[schema_type])) for schema_type in self._all_types] namespaces = [(ns, self._namespace[ns]) for ns in self._namespace] return SchemaSummary(schema_version, schema_types, namespaces)
[docs] def tbl_attribute_groups(self) -> Dict: """All parsed ``attributeGroup`` types in the schema and any referenced schemas' Returns: List of ``SchemaType`` data class holding ``attributeGroup`` attributes. """ table = {} elements = self._get_schema_attribute_group_types() for tag in elements: table[tag] = self._get_schema_type_data_class(tag).to_dict() return table
[docs] def tbl_simple_types(self) -> Dict: """The table of all parsed ``simpleType`` elements in the schema and any referenced schemas' Returns: The ``SchemaType`` data class attributes of ``simpleType`` """ table = {} elements = self._get_schema_simple_types() for tag in elements: table[tag] = self._get_schema_type_data_class(tag).to_dict() return table
[docs] def tbl_attribute_types(self) -> Dict: """The table of all parsed attribute elements in the schema and any referenced schemas' Returns: The ``SchemaType`` data class attributes of ``attributeType`` """ table = {} elements = self._get_schema_attribute_tyepes() for tag in elements: table[tag] = self._get_schema_type_data_class(tag).to_dict() return table
[docs] def tbl_element_types(self) -> Dict: """The table of all parsed elements of type element in the schema and any referenced schemas' Returns: The ``SchemaType`` data class attributes of ``element`` """ table = {} elements = self._get_schema_element_types() for tag in elements: table[tag] = self._get_schema_type_data_class(tag).to_dict() return table
[docs] def tbl_complex_types(self) -> Dict: """The table of all parsed complexType elements in the schema and any referenced schemas' Returns: The ``SchemaType`` data class attributes of ``complexType`` """ table = {} elements = self._get_schema_complex_types() for tag in elements: table[tag] = self._get_schema_type_data_class(tag).to_dict() return table
def _get_schema_type_data_class(self, tag: str) -> SchemaType: """Return the ``SchemaType`` dataclass of the schema type with ``tag`` Args: tag: the schema ``tag`` Returns: A ``dataclass`` with the attributes of the element with the ``tag`` '""" e = self._get_element(tag) qn = QName(tag) prefix = self._get_prefix_from_namespace(qn.namespace) if prefix == "None": self.log.error(f"Tag {tag} has an unknown _namespace") return SchemaType(prefix, LxmlElement.get_name(e), tag, LxmlElement.get_source_line(e))