Source code for pybatfish.datamodel.primitives

#   Copyright 2018 The Batfish Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
from abc import ABCMeta, abstractmethod
from enum import Enum
from typing import Any, Dict, List  # noqa: F401

import attr
from pandas.core.indexes.frozen import FrozenList

from pybatfish.util import escape_html, escape_name, get_html

__all__ = [
    "Assertion",
    "AssertionType",
    "VariableType",
    "AutoCompleteSuggestion",
    "Edge",
    "FileLines",
    "Interface",
    "Issue",
    "IssueType",
    "ListWrapper",
]


@attr.s
class DataModelElement(object):
    __metaclass__ = ABCMeta

    def dict(self) -> Dict[str, Any]:
        return attr.asdict(self, recurse=True)

    @classmethod
    @abstractmethod
    def from_dict(cls, json_dict):
        raise NotImplementedError("Datamodel elements must implement from_dict")

    def _repr_html_(self) -> str:
        """Override this method to enable custom HTML formatting of the dataclass."""
        return escape_html(repr(self))


class AssertionType(str, Enum):
    """Assertion type."""

    COUNT_EQUALS = "countequals"  #: Number of results equals
    COUNT_LESSTHAN = "countlessthan"  #: Number of results is less than
    COUNT_MORETHAN = "countmorethan"  #: Number of results is more than
    EQUALS = "equals"  #: Result equals to value (list of rows). **Experimental**


@attr.s(frozen=True)
class Assertion(DataModelElement):
    """A Batfish assertion.

    Assertions are combined with a :py:class:`~pybatfish.question.question.Question`
    to create a Batfish check. An assertion can be on the number of results
    return by the question, or on the value of the answer itself.

    :ivar type: an :py:class:`AssertionType`
    :ivar expect: the expected value (a.k.a as right-hand side)
        for the assertion to return True.
    """

    type = attr.ib(type=AssertionType)
    expect = attr.ib(type=Any)

    @classmethod
    def from_dict(cls, json_dict):
        # type: (Dict) -> Assertion
        return Assertion(AssertionType[json_dict["type"]], json_dict["expect"])

    def dict(self):
        return dict(type=self.type.value, expect=self.expect)


class VariableType(str, Enum):
    """Auto completion type."""

    # Should be in sync with org.batfish.datamodel.questions.Variable.Type

    ADDRESS_GROUP_NAME = "addressGroupName"  #: address group name
    ANSWER_ELEMENT = "answerElement"  #: answer elements
    APPLICATION_SPEC = "applicationSpec"  #: application specifier
    BGP_PEER_PROPERTY_SPEC = "bgpPeerPropertySpec"  #: bgp peer properties
    BGP_PROCESS_PROPERTY_SPEC = "bgpProcessPropertySpec"  #: bgp process properties
    BGP_ROUTES = "bgpRoutes"  #: bgp routes
    BGP_ROUTE_CONSTRAINTS = "bgpRouteConstraints"  # bgp route constraints
    BGP_ROUTE_STATUS_SPEC = "bgpRouteStatusSpec"  # bgp route status
    BGP_SESSION_COMPAT_STATUS_SPEC = (
        "bgpSessionCompatStatusSpec"  #: bgp session compatibility statuses
    )
    BGP_SESSION_STATUS_SPEC = "bgpSessionStatusSpec"  #: bgp session statuses
    BGP_SESSION_TYPE_SPEC = "bgpSessionTypeSpec"  #: bgp session types
    BOOLEAN = "boolean"  #: boolean values
    COMPARATOR = "comparator"  #: comparators (<, <=, ==, >=, >, !=)
    DOUBLE = "double"  #: double values
    DISPOSITION_SPEC = "dispositionSpec"  #: dispositions
    FILTER = "filter"  #: names or regex of filters
    FILTER_NAME = "filter"  #: name of filters
    FILTER_SPEC = "filterSpec"  #: filter specifier
    FLOAT = "float"  #: float values
    HEADER_CONSTRAINT = "headerConstraint"  #: packet header constraints
    INTEGER = "integer"  #: integer values
    INTEGER_SPACE = "integerSpace"  #: integer spaces
    INTERFACE = "interface"  #: names of interfaces
    INTERFACE_GROUP_NAME = "interfaceGroupName"  #: interface group name
    INTERFACE_NAME = "interfaceName"  #: name of interfaces
    INTERFACE_PROPERTY_SPEC = "interfacePropertySpec"  #: interface properties
    INTERFACES_SPEC = "interfacesSpec"  #: interfaces specifier
    IP = "ip"  #: ips
    IP_PROTOCOL = "ipProtocol"  #: ip protocols
    IP_PROTOCOL_SPEC = "ipProtocolSpec"  #: ip protocol specifier
    IP_SPACE_SPEC = "ipSpaceSpec"  #: ip space specifier
    IP_WILDCARD = "ipWildcard"  #: ip protocols
    IPSEC_SESSION_STATUS_SPEC = "ipsecSessionStatusSpec"  #: ipsec session statuses
    JAVA_REGEX = "javaRegex"  #: java regex
    JSON_PATH = "jsonPath"  #: json path
    JSON_PATH_REGEX = "jsonPathRegex"  #: json path regex
    LOCATION_SPEC = "locationSpec"  #: location specifier
    LONG = "long"  #: long values
    MLAG_ID = "mlagId"  #: mlag id
    MLAG_ID_SPEC = "mlagIdSpec"  #: mlag id specifier
    NAMED_STRUCTURE_SPEC = "namedStructureSpec"  #: named structure type
    NODE_NAME = "nodeName"  #: name of nodes
    NODE_PROPERTY_SPEC = "nodePropertySpec"  #: node properties
    NODE_ROLE_DIMENSION_NAME = "nodeRoleDimensionName"  #: names of node role dimension
    NODE_ROLE_NAME = "nodeRoleName"  #: node role name
    NODE_SPEC = "nodeSpec"  #: node specifier
    OSPF_INTERFACE_PROPERTY_SPEC = (
        "ospfInterfacePropertySpec"  #: ospf interface properties
    )
    OSPF_PROCESS_PROPERTY_SPEC = "ospfProcessPropertySpec"  #: ospf process properties
    OSPF_SESSION_STATUS_SPEC = "ospfSessionStatusSpec"  #: ospf session statuses
    PATH_CONSTRAINT = "pathConstraint"  #: path constraints
    PREFIX = "prefix"  #: prefixes
    PREFIX_RANGE = "prefixRange"  #: prefix ranges
    PROTOCOL = "protocol"  #: application-level protocols
    REFERENCE_BOOK_NAME = "referenceBookName"  #: reference book name
    QUESTION = "question"  #: questions
    ROUTING_POLICY_SPEC = "routingPolicySpec"  #: routing policies
    ROUTING_PROTOCOL_SPEC = "routingProtocolSpec"  #: routing protocols
    STRING = "string"  #: string values
    STRUCTURE_NAME = "structureName"  #: names of structures
    SUBRANGE = "subrange"  #: subranges
    VRF = "vrf"  #: names of vrfs
    VXLAN_VNI_PROPERTY_SPEC = "vxlanVniPropertySpec"  #: vxlan vni properties
    ZONE = "zone"  #: names of zones


@attr.s(frozen=True)
class AutoCompleteSuggestion(DataModelElement):
    """Represent one auto complete suggestion.

    Auto complete suggestions are returned by Batfish for auto complete queries.

    :ivar description: Describes the type of the suggestion  (optional)
    :ivar hint: A hint as to why the suggestion is being made (optional)
    :ivar insertion_index: Index in original input string where suggested text should be inserted
    :ivar is_partial: Whether this suggestion represents partial or full text
    :ivar rank: Batfish may assign a rank to the suggestion
    :ivar text: The actual suggested text
    """

    description = attr.ib(type=str)
    hint = attr.ib(type=str)
    insertion_index = attr.ib(type=int)
    is_partial = attr.ib(type=bool)
    rank = attr.ib(type=int)
    text = attr.ib(type=str)

    DEFAULT_RANK = 0x7FFFFFFF

    @classmethod
    def from_dict(cls, json_dict):
        # type: (Dict) -> AutoCompleteSuggestion
        return AutoCompleteSuggestion(
            json_dict.get("description", None),
            json_dict.get("hint", None),
            json_dict.get("insertionIndex", 0),
            json_dict["isPartial"],
            json_dict["rank"],
            json_dict["text"],
        )

    def dict(self):
        return dict(
            description=self.description,
            hint=self.hint,
            insertion_index=self.insertion_index,
            is_partial=self.is_partial,
            rank=self.rank,
            text=self.text,
        )


[docs]@attr.s(frozen=True) class Interface(DataModelElement): """A network interface --- a combination of node and interface names. :ivar hostname: Node hostname to which this interface belongs :ivar interface: Interface name """ hostname = attr.ib(type=str) interface = attr.ib(type=str) @classmethod def from_dict(cls, json_dict): # type: (Dict) -> Interface return Interface(json_dict["hostname"], json_dict["interface"]) def __str__(self): # type: () -> str return "{}[{}]".format(escape_name(self.hostname), escape_name(self.interface)) def _repr_html_(self): # type: () -> str return "{}".format(escape_html(self.__str__()))
def _interface_converter(val): # type: (Any) -> str if isinstance(val, Interface): return str(val.interface) else: return str(val)
[docs]@attr.s(frozen=True) class Edge(DataModelElement): """A network edge (i.e., a link between two node/interface pairs). :ivar node1: First node name :ivar node1interface: First node's interface name :ivar node2: Second node name :ivar node2interface: Second node's interface name """ node1 = attr.ib(type=str) node1interface = attr.ib(type=str, converter=_interface_converter) node2 = attr.ib(type=str) node2interface = attr.ib(type=str, converter=_interface_converter) @classmethod def from_dict(cls, json_dict): # type: (Dict) -> Edge return Edge( node1=json_dict["node1"], node1interface=json_dict["node1interface"], node2=json_dict["node2"], node2interface=json_dict["node2interface"], ) def __str__(self): # type: () -> str return "{}:{} -> {}:{}".format( self.node1, self.node1interface, self.node2, self.node2interface ) def _repr_html_(self): # type: () -> str return "{}:{} &rarr; {}:{}".format( self.node1, self.node1interface, self.node2, self.node2interface )
[docs]@attr.s(frozen=True) class FileLines(DataModelElement): """A class that represents a set of lines in a file. :ivar filename: The filename referenced :ivar lines: A list of lines referenced """ filename = attr.ib(type=str) lines = attr.ib(type=List[int], factory=list) def __str__(self): # type: () -> str return "{filename}:{lines}".format(filename=self.filename, lines=self.lines) @classmethod def from_dict(cls, json_dict): # type: (Dict) -> FileLines return FileLines( filename=json_dict["filename"], lines=json_dict.get("lines", []) )
@attr.s(frozen=True) class IssueType(DataModelElement): """Details about a particular :py:class:`Issue` type. :ivar major: Primary type of the issue :ivar minor: Additional subcategory of the issue """ major = attr.ib(type=str) minor = attr.ib(type=str) @classmethod def from_dict(cls, json_dict): # type: (Dict) -> IssueType return IssueType(json_dict["major"], json_dict["minor"]) @attr.s(frozen=True) class Issue(DataModelElement): """Information about a bug/issue that Batfish has discovered. :ivar severity: The integer severity of the issue :ivar explanation: An explanation for the issue :ivar type: An :py:class:`IssueType` containing more information about the issue """ severity = attr.ib(type=int, converter=int) explanation = attr.ib(type=str) type = attr.ib(type=IssueType) @explanation.default def _default_explanation(self): # type: () -> str return "No explanation" @type.default def _default_type(self): # type: () -> IssueType return IssueType(major="Unknown", minor="Unknown") @classmethod def from_dict(cls, json_dict): # type: (Dict) -> Issue if "severity" not in json_dict: raise ValueError("'severity' not present in the Issue object") return Issue( json_dict["severity"], json_dict.get("explanation", "No explanation"), IssueType(**json_dict.get("type", dict(major="Unknown", minor="Unknown"))), ) def __str__(self): # type: () -> str return "[{}] {}".format(self.severity, self.explanation) class ListWrapper(FrozenList): """Helper list class that implements _repr_html_().""" def _repr_html_(self): # type: () -> str result = "<br><br>".join([get_html(element) for element in self]) # type: str return result