Source code for py2neo.ext.batman.batch

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Copyright 2011-2016, Nigel Small
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging

from py2neo.compat import integer
from py2neo.database import GraphError, Resource
from py2neo.packages.httpstream.packages.urimagic import percent_encode
from py2neo.types import Node, Relationship, cast as core_cast, cast_node, cast_relationship, remote
from .jobs import Job, JobResult, Target, CreateNodeJob, CreateRelationshipJob, CreatePathJob, \
    CreateUniquePathJob, DeleteEntityJob, PushPropertyJob, PushPropertiesJob, DeletePropertyJob, \
    DeletePropertiesJob, AddNodeLabelsJob, RemoveNodeLabelJob, PushNodeLabelsJob
from .util import NodePointer


log = logging.getLogger("py2neo.ext.batch")


def cast(obj):
    if isinstance(obj, integer):
        obj = NodePointer(obj)
    elif isinstance(obj, tuple):
        obj = tuple(NodePointer(x) if isinstance(x, integer) else x for x in obj)
    return core_cast(obj)


def pendulate(collection):
    count = len(collection)
    for i in range(count):
        if i % 2 == 0:
            index = i // 2
        else:
            index = count - ((i + 1) // 2)
        yield index, collection[index]


class BatchFinished(GraphError):
    """ Raised when actions are attempted against a :class:`.Batch`
    that is no longer available for use.
    """

    def __init__(self, obj):
        self.obj = obj

    def __repr__(self):
        return "%s finished" % self.obj.__class__.__name__


[docs] class BatchRunner(object): """ Resource for batch execution. """ def __init__(self, uri): self.resource = Resource(uri) def post(self, batch): num_jobs = len(batch) plural = "" if num_jobs == 1 else "s" log.info("> Sending batch request with %s job%s", num_jobs, plural) data = [] for i, job in enumerate(batch): if job.finished: raise BatchFinished(job) else: job.finished = True log.info("> {%s} %s", i, job) data.append(dict(job, id=i)) response = self.resource.post(data) log.info("< Received batch response for %s job%s", num_jobs, plural) return response
[docs] def run(self, batch): """ Execute a collection of jobs and return all results. :param batch: A :class:`.Batch` of jobs. :rtype: :class:`list` """ response = self.post(batch) try: results = [] for result_data in response.content: result = JobResult.hydrate(result_data, batch) log.info("< %s", result) results.append(result) return results except ValueError: # Here, we're looking to gracefully handle a Neo4j server bug # whereby a response is received with no content and # 'Content-Type: application/json'. Given that correct JSON # technically needs to contain {} at minimum, the JSON # parser fails with a ValueError. if response.content_length == 0: from sys import exc_info from traceback import extract_tb type_, value, traceback = exc_info() for filename, line_number, function_name, text in extract_tb(traceback): if "json" in filename and "decode" in function_name: return [] raise finally: response.close()
[docs] class Batch(object): """ A collection of :class:`.Job` objects that can be submitted to a :class:`.BatchRunner`. References to previous jobs are only valid **within the same batch** and will not work across batches. """ #: The graph with which this batch is associated graph = None def __init__(self, graph): self.graph = graph self.runner = BatchRunner(remote(self.graph).metadata["batch"]) self.jobs = [] def __getitem__(self, index): return self.jobs[index] def __len__(self): return len(self.jobs) def __bool__(self): return bool(self.jobs) def __nonzero__(self): return bool(self.jobs) def __iter__(self): return iter(self.jobs)
[docs] def resolve(self, node): """ Convert any references to previous jobs within the same batch into NodePointer objects. """ if isinstance(node, Job): return NodePointer(self.find(node)) else: return node
[docs] def append(self, job): """ Add a job to this batch. :param job: A :class:`.Job` object to add to this batch. :rtype: :class:`.Job` """ self.jobs.append(job) return job
[docs] def find(self, job): """ Find the position of a job within this batch. """ for i, candidate_job in pendulate(self.jobs): if candidate_job == job: return i raise ValueError("Job not found in batch")
class ReadBatch(Batch): """ Generic batch execution facility for data read requests, """ def __init__(self, graph): Batch.__init__(self, graph) def run(self): return [result.content for result in self.graph.batch.run(self)] class WriteBatch(Batch): """ Generic batch execution facility for data write requests. Most methods return a :class:`.BatchRequest` object that can be used as a reference in other methods. See the :meth:`.WriteBatch.create` method for an example of this. """ def __init__(self, graph): Batch.__init__(self, graph) def run(self): return [result.content for result in self.runner.run(self)] def create(self, abstract): """ Create a node or relationship based on the abstract entity provided. :param abstract: node or relationship :return: :class:`.Job` """ entity = cast(abstract) if isinstance(entity, Node): return self.append(CreateNodeJob(**entity)) elif isinstance(entity, Relationship): start_node = self.resolve(entity.start_node()) end_node = self.resolve(entity.end_node()) return self.append(CreateRelationshipJob(start_node, entity.type(), end_node, **entity)) else: raise TypeError(entity) def create_path(self, node, *rels_and_nodes): """ Construct a path across a specified set of nodes and relationships. Nodes may be existing concrete node instances, abstract nodes or :py:const:`None` but references to other requests are not supported. :param node: start node :param rels_and_nodes: alternating relationships and nodes :return: :class:`.Job` """ return self.append(CreatePathJob(self.graph, node, *rels_and_nodes)) def get_or_create_path(self, node, *rels_and_nodes): """ Construct a unique path across a specified set of nodes and relationships, adding only parts that are missing. Nodes may be existing concrete node instances, abstract nodes or :py:const:`None` but references to other requests are not supported. :param node: start node :param rels_and_nodes: alternating relationships and nodes :return: :class:`.Job` """ return self.append(CreateUniquePathJob(self.graph, node, *rels_and_nodes)) def delete(self, entity): """ Delete a node or relationship from the graph. :param entity: node or relationship to delete :return: :class:`.Job` """ return self.append(DeleteEntityJob(self.resolve(entity))) def set_property(self, entity, key, value): """ Set a single property on a node or relationship. :param entity: node or relationship on which to set property :param key: property key :param value: property value :return: :class:`.Job` """ return self.append(PushPropertyJob(self.resolve(entity), key, value)) def set_properties(self, entity, properties): """ Replace all properties on a node or relationship. :param entity: node or relationship on which to set properties :param properties: properties :return: :class:`.Job` """ return self.append(PushPropertiesJob(self.resolve(entity), properties)) def delete_property(self, entity, key): """ Delete a single property from a node or relationship. :param entity: node or relationship from which to delete property :param key: property key :return: :class:`.Job` """ return self.append(DeletePropertyJob(self.resolve(entity), key)) def delete_properties(self, entity): """ Delete all properties from a node or relationship. :param entity: node or relationship from which to delete properties :return: :class:`.Job` """ return self.append(DeletePropertiesJob(self.resolve(entity))) def add_labels(self, node, *labels): """ Add labels to a node. :param node: node to which to add labels :param labels: text labels :return: :class:`.Job` """ return self.append(AddNodeLabelsJob(self.resolve(node), *labels)) def remove_label(self, node, label): """ Remove a label from a node. :param node: node from which to remove labels (can be a reference to another request within the same batch) :param label: text label :return: :class:`.Job` """ return self.append(RemoveNodeLabelJob(self.resolve(node), label)) def set_labels(self, node, *labels): """ Replace all labels on a node. :param node: node on which to replace labels (can be a reference to another request within the same batch) :param labels: text labels :return: :class:`.Job` """ return self.append(PushNodeLabelsJob(self.resolve(node), labels))
[docs] class ManualIndexReadBatch(ReadBatch): """ Generic batch execution facility for data read requests, """ def append_get(self, uri): return self.append(Job("GET", Target(uri))) def _index(self, content_type, index): """ Fetch an Index object. """ from py2neo.ext.batman import ManualIndexManager, ManualIndex if isinstance(index, ManualIndex): if content_type == index._content_type: return index else: raise TypeError("Index is not for {0}s".format(content_type)) else: return ManualIndexManager(self.graph).get_or_create_index(content_type, str(index))
[docs] def get_indexed_nodes(self, index, key, value): """ Fetch all nodes indexed under a given key-value pair. :param index: index name or instance :param key: key under which nodes are indexed :param value: value under which nodes are indexed :return: :class:`.Job` """ index = self._index(Node, index) uri = index._searcher_stem_for_key(key) + percent_encode(value) return self.append_get(uri)
[docs] class ManualIndexWriteBatch(WriteBatch): """ Generic batch execution facility for data write requests. Most methods return a :class:`.BatchRequest` object that can be used as a reference in other methods. """ def append_post(self, uri, body=None): return self.append(Job("POST", Target(uri), body)) def append_delete(self, uri): return self.append(Job("DELETE", Target(uri))) def _uri_for(self, resource, *segments, **kwargs): """ Return a relative URI in string format for the entity specified plus extra path segments. """ if isinstance(resource, int): uri = "{{{0}}}".format(resource) elif isinstance(resource, NodePointer): uri = "{{{0}}}".format(resource.address) elif isinstance(resource, Job): uri = "{{{0}}}".format(self.find(resource)) else: remote_resource = remote(resource) remote_graph = remote(remote_resource.graph) graph_uri = remote_graph.uri.string entity_uri = remote_resource.uri.string uri = entity_uri[len(graph_uri):] if segments: if not uri.endswith("/"): uri += "/" uri += "/".join(map(percent_encode, segments)) query = kwargs.get("query") if query is not None: uri += "?" + query return uri def _index(self, content_type, index): """ Fetch an Index object. """ from py2neo.ext.batman import ManualIndexManager, ManualIndex if isinstance(index, ManualIndex): if content_type == index._content_type: return index else: raise TypeError("Index is not for {0}s".format(content_type)) else: return ManualIndexManager(self.graph).get_or_create_index(content_type, str(index)) def __init__(self, graph): super(ManualIndexWriteBatch, self).__init__(graph) self.__new_uniqueness_modes = None ### ADD TO INDEX ### def _add_to_index(self, cls, index, key, value, entity, query=None): uri = self._uri_for(self._index(cls, index), query=query) return self.append_post(uri, { "key": key, "value": value, "uri": self._uri_for(entity), })
[docs] def add_to_index(self, cls, index, key, value, entity): """ Add an existing node or relationship to an index. :param cls: the type of indexed entity :param index: index or index name :param key: index entry key :param value: index entry value :param entity: node or relationship to add to the index :return: :class:`.Job` """ return self._add_to_index(cls, index, key, value, entity)
[docs] def add_to_index_or_fail(self, cls, index, key, value, entity): """ Add an existing node or relationship uniquely to an index, failing the entire batch if such an entry already exists. .. warning:: Uniqueness modes for legacy indexes have been broken in recent server versions and therefore this method may not work as expected. :param cls: the type of indexed entity :param index: index or index name :param key: index entry key :param value: index entry value :param entity: node or relationship to add to the index :return: :class:`.Job` """ query = "uniqueness=create_or_fail" return self._add_to_index(cls, index, key, value, entity, query)
[docs] def get_or_add_to_index(self, cls, index, key, value, entity): """ Fetch a uniquely indexed node or relationship if one exists, otherwise add an existing entity to the index. :param cls: the type of indexed entity :param index: index or index name :param key: index entry key :param value: index entry value :param entity: node or relationship to add to the index :return: :class:`.Job` """ query = "uniqueness=get_or_create" return self._add_to_index(cls, index, key, value, entity, query)
### CREATE IN INDEX ### def _create_in_index(self, cls, index, key, value, abstract, query=None): uri = self._uri_for(self._index(cls, index), query=query) if cls is Node: a = cast_node(abstract) return self.append_post(uri, { "key": key, "value": value, "properties": dict(a), }) elif cls is Relationship: r = cast_relationship(abstract) return self.append_post(uri, { "key": key, "value": value, "start": self._uri_for(abstract.start_node()), "type": str(abstract.type()), "end": self._uri_for(abstract.end_node()), "properties": dict(r), }) else: raise TypeError(cls) # Removed create_in_index as parameter combination not supported by server
[docs] def create_in_index_or_fail(self, cls, index, key, value, abstract=None): """ Create a new node or relationship and add it uniquely to an index, failing the entire batch if such an entry already exists. .. warning:: Uniqueness modes for legacy indexes have been broken in recent server versions and therefore this method may not work as expected. :param cls: the type of indexed entity :param index: index or index name :param key: index entry key :param value: index entry value :param abstract: abstract node or relationship to create :return: :class:`.Job` """ query = "uniqueness=create_or_fail" return self._create_in_index(cls, index, key, value, abstract, query)
[docs] def get_or_create_in_index(self, cls, index, key, value, abstract=None): """ Fetch a uniquely indexed node or relationship if one exists, otherwise create a new entity and add that to the index. :param cls: the type of indexed entity :param index: index or index name :param key: index entry key :param value: index entry value :param abstract: abstract node or relationship to create :return: :class:`.Job` """ query = "uniqueness=get_or_create" if cls is Node: return self._create_in_index(cls, index, key, value, cast_node(abstract), query) elif cls is Relationship: return self._create_in_index(cls, index, key, value, cast_relationship(abstract), query) else: raise TypeError("Unindexable class")
### REMOVE FROM INDEX ###
[docs] def remove_from_index(self, cls, index, key=None, value=None, entity=None): """ Remove any nodes or relationships from an index that match a particular set of criteria. Allowed parameter combinations are: `key`, `value`, `entity` remove a specific node or relationship indexed under a given key-value pair `key`, `entity` remove a specific node or relationship indexed against a given key and with any value `entity` remove all occurrences of a specific node or relationship regardless of key or value :param cls: the type of indexed entity :param index: index or index name :param key: index entry key :param value: index entry value :param entity: node or relationship to remove from the index :return: :class:`.Job` """ index = self._index(cls, index) if key and value and entity: uri = self._uri_for(index, key, value, remote(entity)._id) elif key and entity: uri = self._uri_for(index, key, remote(entity)._id) elif entity: uri = self._uri_for(index, remote(entity)._id) else: raise TypeError("Illegal parameter combination for index removal") return self.append_delete(uri)