Source code for py2neo.database.status
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Copyright 2011-2016, Nigel Small
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from py2neo.compat import xstr
[docs]class GraphError(Exception):
"""
"""
__cause__ = None
code = None
message = None
@classmethod
def hydrate(cls, data):
code = data["code"]
message = data["message"]
_, classification, category, title = code.split(".")
if classification == "ClientError":
try:
error_cls = client_errors[code]
except KeyError:
error_cls = ClientError
message += " [%s]" % code
elif classification == "DatabaseError":
error_cls = DatabaseError
elif classification == "TransientError":
error_cls = TransientError
else:
error_cls = cls
inst = error_cls(message)
inst.code = code
inst.message = message
return inst
def __new__(cls, *args, **kwargs):
try:
exception = kwargs["exception"]
error_cls = type(xstr(exception), (cls,), {})
except KeyError:
error_cls = cls
return Exception.__new__(error_cls, *args)
def __init__(self, *args, **kwargs):
Exception.__init__(self, *args)
for key, value in kwargs.items():
setattr(self, key.lower(), value)
[docs]class ClientError(GraphError):
""" The Client sent a bad request - changing the request might yield a successful outcome.
"""
[docs]class DatabaseError(GraphError):
""" The database failed to service the request.
"""
[docs]class TransientError(GraphError):
""" The database cannot service the request right now, retrying later might yield a successful outcome.
"""
[docs]class ConstraintError(ClientError):
"""
"""
[docs]class CypherSyntaxError(ClientError):
"""
"""
[docs]class CypherTypeError(ClientError):
"""
"""
[docs]class Forbidden(ClientError):
"""
"""
[docs]class Unauthorized(ClientError):
"""
"""
client_errors = {
# ConstraintError
"Neo.ClientError.Schema.ConstraintValidationFailed": ConstraintError,
"Neo.ClientError.Schema.ConstraintViolation": ConstraintError,
"Neo.ClientError.Statement.ConstraintVerificationFailed": ConstraintError,
"Neo.ClientError.Statement.ConstraintViolation": ConstraintError,
# CypherSyntaxError
"Neo.ClientError.Statement.InvalidSyntax": CypherSyntaxError,
"Neo.ClientError.Statement.SyntaxError": CypherSyntaxError,
# CypherTypeError
"Neo.ClientError.Procedure.TypeError": CypherTypeError,
"Neo.ClientError.Statement.InvalidType": CypherTypeError,
"Neo.ClientError.Statement.TypeError": CypherTypeError,
# Forbidden
"Neo.ClientError.General.ForbiddenOnReadOnlyDatabase": Forbidden,
"Neo.ClientError.General.ReadOnly": Forbidden,
"Neo.ClientError.Schema.ForbiddenOnConstraintIndex": Forbidden,
"Neo.ClientError.Schema.IndexBelongsToConstrain": Forbidden,
"Neo.ClientError.Security.Forbidden": Forbidden,
"Neo.ClientError.Transaction.ForbiddenDueToTransactionType": Forbidden,
# Unauthorized
"Neo.ClientError.Security.AuthorizationFailed": Unauthorized,
"Neo.ClientError.Security.Unauthorized": Unauthorized,
}