RestProcessor.java
/**
* Copyright 2015 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.reindexing;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.URL;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.fcrepo.camel.FcrepoHeaders;
import org.slf4j.Logger;
/**
* A processor that converts the REST uri into the
* identifying path for an fcrepo node.
*
* This assumes that the `rest.prefix` value is stored
* in the CamelFcrepoRestPrefix header.
*
* @author Aaron Coburn
*/
public class RestProcessor implements Processor {
private static final Logger LOGGER = getLogger(RestProcessor.class);
private static final int BAD_REQUEST = 400;
/**
* Convert the incoming REST request into the correct
* Fcrepo header fields.
*
* @param exchange the current message exchange
*/
public void process(final Exchange exchange) throws Exception {
final Message in = exchange.getIn();
final URL url = new URL(in.getHeader(Exchange.HTTP_URI, String.class));
final String prefix = in.getHeader(ReindexingHeaders.REST_PREFIX, "", String.class);
final String contentType = in.getHeader(Exchange.CONTENT_TYPE, "", String.class);
final String body = in.getBody(String.class);
final Set<String> endpoints = new HashSet<>();
for (final String s : in.getHeader(ReindexingHeaders.RECIPIENTS, "", String.class).split(",")) {
endpoints.add(s.trim());
}
in.removeHeaders("CamelHttp*");
in.removeHeaders("CamelRestlet*");
in.removeHeaders("org.restlet*");
in.removeHeader("JMSCorrelationID");
in.setBody(null);
if (contentType.equals("application/json") && body != null && !body.trim().isEmpty()) {
final ObjectMapper mapper = new ObjectMapper();
try {
final JsonNode root = mapper.readTree(body);
final Iterator<JsonNode> ite = root.elements();
while (ite.hasNext()) {
final JsonNode n = ite.next();
endpoints.add(n.asText());
}
} catch (JsonProcessingException e) {
LOGGER.debug("Invalid JSON", e);
in.setHeader(Exchange.HTTP_RESPONSE_CODE, BAD_REQUEST);
in.setBody("Invalid JSON");
}
}
in.setHeader(FcrepoHeaders.FCREPO_IDENTIFIER, url.getPath().substring(prefix.length()));
in.setHeader(ReindexingHeaders.RECIPIENTS, String.join(",", endpoints));
}
}