ReindexingRouter.java

/**
 * Copyright 2015 DuraSpace, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.fcrepo.camel.reindexing;

import static org.fcrepo.camel.FcrepoHeaders.FCREPO_BASE_URL;
import static org.slf4j.LoggerFactory.getLogger;

import javax.xml.transform.stream.StreamSource;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.PropertyInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.xml.Namespaces;
import org.apache.camel.builder.xml.XPathBuilder;
import org.fcrepo.camel.HttpMethods;
import org.fcrepo.camel.RdfNamespaces;
import org.slf4j.Logger;

/**
 * A content router for handling JMS events.
 *
 * @author Aaron Coburn
 */
public class ReindexingRouter extends RouteBuilder {

    private static final Logger LOGGER = getLogger(ReindexingRouter.class);
    private static final int BAD_REQUEST = 400;

    @PropertyInject(value = "rest.port", defaultValue = "9080")
    private String port;

    /**
     * Configure the message route workflow.
     */
    public void configure() throws Exception {

        restConfiguration().component("restlet").port(
                System.getProperty("fcrepo.dynamic.reindexing.port", port));

        final Namespaces ns = new Namespaces("rdf", RdfNamespaces.RDF);
        ns.add("ldp", RdfNamespaces.LDP);

        final XPathBuilder children = new XPathBuilder("/rdf:RDF/rdf:Description/ldp:contains");
        children.namespaces(ns);

        /**
         * A generic error handler (specific to this RouteBuilder)
         */
        onException(Exception.class)
            .maximumRedeliveries("{{error.maxRedeliveries}}")
            .log("Index Routing Error: ${routeId}");

        /**
         * Expose a RESTful endpoint for re-indexing
         */
        rest("{{rest.prefix}}")
            .get().to("direct:usage")
            .post().consumes("application/json").to("direct:reindex");

        from("direct:usage")
            .routeId("FcrepoReindexingUsage")
            .setHeader(ReindexingHeaders.REST_PREFIX).simple("{{rest.prefix}}")
            .setHeader(ReindexingHeaders.REST_PORT).simple("{{rest.port}}")
            .setHeader(FCREPO_BASE_URL).simple("{{fcrepo.baseUrl}}")
            .process(new UsageProcessor());

        /**
         * A Re-indexing endpoint, setting where in the fcrepo hierarchy
         * a re-indexing operation should begin.
         */
        from("direct:reindex")
            .routeId("FcrepoReindexingReindex")
            .setHeader(ReindexingHeaders.REST_PREFIX).simple("{{rest.prefix}}")
            .setHeader(FCREPO_BASE_URL).simple("{{fcrepo.baseUrl}}")
            .process(new RestProcessor())
            .choice()
                .when(header(Exchange.HTTP_RESPONSE_CODE).isGreaterThanOrEqualTo(BAD_REQUEST))
                    .endChoice()
                .when(header(ReindexingHeaders.RECIPIENTS).isEqualTo(""))
                    .transform().simple("No endpoints configured for indexing")
                    .endChoice()
                .otherwise()
                    .log(LoggingLevel.INFO, LOGGER, "Initial indexing path: ${headers[CamelFcrepoIdentifier]}")
                    .inOnly("{{reindexing.stream}}?disableTimeToLive=true")
                    .setHeader(Exchange.CONTENT_TYPE).constant("text/plain")
                    .transform().simple("Indexing started at ${headers[CamelFcrepoIdentifier]}");

        /**
         *  A route that traverses through a fedora heirarchy
         *  indexing nodes, as appropriate.
         */
        from("{{reindexing.stream}}?asyncConsumer=true")
            .routeId("FcrepoReindexingTraverse")
            .streamCaching()
            .inOnly("direct:recipients")
            .removeHeaders("CamelHttp*")
            .setHeader(Exchange.HTTP_METHOD).constant(HttpMethods.GET)
            .to("fcrepo:{{fcrepo.baseUrl}}?preferInclude=PreferContainment&preferOmit=ServerManaged")
            .convertBodyTo(StreamSource.class)
            .split(children).streaming()
                .transform().xpath("/ldp:contains/@rdf:resource", String.class, ns)
                .process(new PathProcessor())
                .inOnly("{{reindexing.stream}}?disableTimeToLive=true");

        /**
         *  Send the message to all of the pre-determined endpoints
         */
        from("direct:recipients")
            .routeId("FcrepoReindexingRecipients")
            .recipientList(header(ReindexingHeaders.RECIPIENTS))
            .ignoreInvalidEndpoints();
    }
}