Source code for py2neo.ext.batman.jobs

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Copyright 2011-2016, Nigel Small
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import json

from py2neo.compat import ustr
from py2neo.database import Graph, Cursor, cypher_request
from py2neo.database.status import GraphError
from py2neo.packages.httpstream.packages.urimagic import percent_encode, URI
from py2neo.types import Path, Relatable, remote
from py2neo.util import raise_from
from .util import NodePointer, BatchError


def _create_query(p, unique=False):
    initial_match_clause = []
    path, values, params = [], [], {}

    def append_node(i, node):
        remote_node = remote(node)
        if node is None:
            path.append("(n{0})".format(i))
            values.append("n{0}".format(i))
        elif remote_node:
            path.append("(n{0})".format(i))
            initial_match_clause.append("MATCH (n{0}) WHERE id(n{0})={{i{0}}}".format(i))
            params["i{0}".format(i)] = remote_node._id
            values.append("n{0}".format(i))
        else:
            path.append("(n{0} {{p{0}}})".format(i))
            params["p{0}".format(i)] = dict(node)
            values.append("n{0}".format(i))

    def append_relationship(i, relationship):
        if relationship:
            path.append("-[r{0}:`{1}` {{q{0}}}]->".format(i, relationship.type()))
            params["q{0}".format(i)] = dict(relationship)
            values.append("r{0}".format(i))
        else:
            path.append("-[r{0}:`{1}`]->".format(i, relationship.type()))
            values.append("r{0}".format(i))

    nodes = p.nodes()
    append_node(0, nodes[0])
    for i, relationship in enumerate(p.relationships()):
        append_relationship(i, relationship)
        append_node(i + 1, nodes[i + 1])
    clauses = list(initial_match_clause)
    if unique:
        clauses.append("CREATE UNIQUE p={0}".format("".join(path)))
    else:
        clauses.append("CREATE p={0}".format("".join(path)))
    clauses.append("RETURN p")
    query = "\n".join(clauses)
    return query, params


[docs] class Target(object): """ A callable target for a batch job. This may refer directly to a URI or to an object that can be resolved to a URI, such as a :class:`py2neo.Node`. """ #: The entity behind this target. entity = None #: Additional path segments to append to the resolved URI. segments = None def __init__(self, entity, *segments): self.entity = entity self.segments = segments @property def uri_string(self): """ The fully resolved URI string for this target. :rtype: string """ if isinstance(self.entity, int): uri_string = "{%d}" % self.entity elif isinstance(self.entity, NodePointer): uri_string = "{%d}" % self.entity.address else: remote_entity = remote(self.entity) if remote_entity: uri_string = remote_entity.ref else: uri_string = ustr(self.entity) if self.segments: if not uri_string.endswith("/"): uri_string += "/" uri_string += "/".join(map(percent_encode, self.segments)) return uri_string
[docs] class Job(Relatable): """ A single request for inclusion within a :class:`.Batch`. """ #: The graph for which this job is intended (optional). graph = None #: Indicates whether or not the result should be #: interpreted as raw data. raw_result = False #: The HTTP method for the request. method = None #: A :class:`.Target` object used to determine the destination URI. target = None #: The request payload. body = None #: Indicates whether the job has been submitted. finished = False def __init__(self, method, target, body=None): self.method = method self.target = target self.body = body def __repr__(self): parts = [self.method, self.target.uri_string] if self.body is not None: parts.append(json.dumps(self.body, separators=",:")) return " ".join(parts) def __eq__(self, other): return self is other def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(id(self)) def __iter__(self): yield "method", self.method yield "to", self.target.uri_string if self.body is not None: yield "body", self.body
[docs] class JobResult(object): """ The result returned from the server for a single :class:`.Job` following a :class:`.Batch` submission. """ @classmethod def hydrate(cls, data, batch): graph = getattr(batch, "graph", None) job_id = data["id"] uri = data["from"] status_code = data.get("status") location = data.get("location") if graph is None or batch[job_id].raw_result: body = data.get("body") else: body = None try: body = graph._hydrate(data.get("body")) except GraphError as error: message = "Batch job %s failed with %s\n%s" % ( job_id, error.__class__.__name__, ustr(error)) raise_from(BatchError(message, batch, job_id, status_code, uri, location), error) else: # If Cypher results, reduce to single row or single value if possible if isinstance(body, Cursor): if body.forward(): record = body.current() width = len(record) if width == 1: body = record[0] else: body = record else: body = None return cls(batch, job_id, uri, status_code, location, body) #: The :class:`.Batch` from which this result was returned. batch = None #: The unique ID for this job within the batch. job_id = None #: The URI destination of the original job. uri = None #: The status code returned for this job. status_code = None #: The ``Location`` header returned for this job (if included). location = None #: The response content for this job. content = None def __init__(self, batch, job_id, uri, status_code=None, location=None, content=None): self.batch = batch self.job_id = job_id self.uri = URI(uri) self.status_code = status_code or 200 self.location = URI(location) self.content = content def __repr__(self): parts = ["{" + ustr(self.job_id) + "}", ustr(self.status_code)] if self.content is not None: parts.append(repr(self.content)) return " ".join(parts) @property def graph(self): """ The corresponding graph for this result. :rtype: :class:`py2neo.Graph` """ return self.batch.graph @property def job(self): """ The original job behind this result. :rtype: :class:`.Job` """ return self.batch[self.job_id]
[docs] class CypherJob(Job): target = Target("transaction/commit") def __init__(self, statement, parameters=None): Job.__init__(self, "POST", self.target, {"statements": [cypher_request(statement, parameters)]})
[docs] class PullPropertiesJob(Job): raw_result = True def __init__(self, entity): Job.__init__(self, "GET", Target(entity, "properties"))
[docs] class PullNodeLabelsJob(Job): raw_result = True def __init__(self, node): Job.__init__(self, "GET", Target(node, "labels"))
[docs] class PullRelationshipJob(Job): raw_result = True def __init__(self, relationship): Job.__init__(self, "GET", Target(relationship))
[docs] class PushPropertyJob(Job): def __init__(self, entity, key, value): Job.__init__(self, "PUT", Target(entity, "properties", key), value)
[docs] class PushPropertiesJob(Job): def __init__(self, entity, properties): Job.__init__(self, "PUT", Target(entity, "properties"), dict(properties))
[docs] class PushNodeLabelsJob(Job): def __init__(self, node, labels): Job.__init__(self, "PUT", Target(node, "labels"), set(labels))
[docs] class CreateNodeJob(Job): target = Target("node") def __init__(self, **properties): Job.__init__(self, "POST", self.target, properties)
[docs] class CreateRelationshipJob(Job): def __init__(self, start_node, type, end_node, **properties): body = {"type": type, "to": Target(end_node).uri_string} if properties: body["data"] = properties Job.__init__(self, "POST", Target(start_node, "relationships"), body)
[docs] class CreatePathJob(CypherJob): def __init__(self, *entities): # Fudge to allow graph to be passed in so Cypher syntax # detection can occur. Can be removed when only 2.0+ is # supported. if isinstance(entities[0], Graph): self.graph, entities = entities[0], entities[1:] path = Path(*(entity or {} for entity in entities)) CypherJob.__init__(self, *_create_query(path))
[docs] class CreateUniquePathJob(CypherJob): def __init__(self, *entities): # Fudge to allow graph to be passed in so Cypher syntax # detection can occur. Can be removed when only 2.0+ is # supported. if isinstance(entities[0], Graph): self.graph, entities = entities[0], entities[1:] path = Path(*(entity or {} for entity in entities)) CypherJob.__init__(self, *_create_query(path, unique=True))
[docs] class DeleteEntityJob(Job): def __init__(self, entity): Job.__init__(self, "DELETE", Target(entity))
[docs] class DeletePropertyJob(Job): def __init__(self, entity, key): Job.__init__(self, "DELETE", Target(entity, "properties", key))
[docs] class DeletePropertiesJob(Job): def __init__(self, entity): Job.__init__(self, "DELETE", Target(entity, "properties"))
[docs] class AddNodeLabelsJob(Job): def __init__(self, node, *labels): Job.__init__(self, "POST", Target(node, "labels"), set(labels))
[docs] class RemoveNodeLabelJob(Job): def __init__(self, entity, label): Job.__init__(self, "DELETE", Target(entity, "labels", label))