Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Moved "examples" section to a separate sub-page

...

The properties field will list the RDF properties that changed with that event. NODE_REMOVED events contain no properties. The fcrepo component for Camel is configured to recognize these headers and act appropriately.

Examples

LDPath Transformations

If an fcr:transform program has been installed as mytransform, you can generate a JSON representation of a container and send it to a low-latency, highly available document store, such as Riak. The following route determines if a container has been removed or simply added/updated. It then routes the message appropriately to a load-balancer sitting in front of the Riak HTTP endpoint.

Code Block
languagescala
titleCamel route to populate a Riak store, using the Scala DSL
linenumberstrue
val riakKeyProcessor = (exchange: Exchange) => {
	exchange.getIn.setHeader(
		Exchange.HTTP_PATH,
		"/buckets/fcrepo/keys/" + URLEncoder.encode(exchange.getIn.getHeader("org.fcrepo.jms.identifier", classOf[String]))
	)
}
 
"activemq:topic:fedora" ==> {
	choice() {
		when(_.in("org.fcrepo.jms.eventType") == "http://fedora.info/definitions/v4/repository#NODE_REMOVED") {
			setHeader(Exchange.HTTP_METHOD, constant("DELETE"))
			process(riakKeyProcessor)
			to("http4:localhost:8098")
		}
		otherwise() {
		    to("fcrepo:localhost:8080/fedora/rest")
			filter(xpathFilter) {
                to("fcrepo:localhost:8080/fedora/rest?transform=mytransform")
	      	    setHeader(Exchange.HTTP_METHOD, constant("PUT"))
		        process(riakKeyProcessor)
                to("http4:localhost:8098")
			}
		}
	}
}

External Triplestore

Some additional processing must be done to transform an application/n-triples response into a valid application/sparql-update payload before sending to an external triplestore such as Fuseki or Sesame. The fcrepo component contains some processors in org.fcrepo.camel.processor to handle this case. The examples below assume that messages have already been routed based on eventType (see below) and passed to the appropriate queue.

Code Block
languagejava
titlePopulate an external triplestore
linenumberstrue
from("direct:delete")
  .process(new SparqlDescribeProcessor())
  .to("http4:localhost:3030/db/query")
  .process(new SparqlDeleteProcessor())
  .to("http4:localhost:3030/db/update");
 
from("direct:new")
 .to("fcrepo:localhost:8080/rest")
 .process(new SparqlInsertProcessor())
 .to("http4:localhost:3030/db/update");
 
from("direct:update")
  .to("fcrepo:localhost:8080/rest")
  .process(new SparqlUpdateProcessor())
  .to("http4:localhost:3030/db/update");

When using these Sparql* processor classes, it is also possible to apply these operations to named graphs. While Fedora does not support named graphs, it is possible to assign nodes to certain named graphs in an external triplestore. The CamelFcrepoNamedGraph header can be used to apply a sparql-update operation to a particular named graph. For instance, to route all operations to a named graph (in this case the graph URI is defined dynamically as a property placeholder: 

Code Block
languagejava
titleNamed Graph
linenumberstrue
from("direct:update")
  .to("fcrepo:localhost:8080/rest")
  .setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
    .simple("{{named.graph}}")
  .process(new SparqlUpdateProcessor())
  .to("http4:localhost:3030/ds/update"); 

Or, to partition the graph based on an existing RDF property:

Code Block
languagejava
titleMultiple Named Graphs
linenumberstrue
from("direct:update")
  .to("fcrepo:localhost:8080/rest")
  .setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
    .xpath("/rdf:RDF/rdf:Description/ex:namedGraph/text()", String.class, ns)
  .process(new SparqlUpdateProcessor())
  .to("http4:localhost:3030/ds/update");

Or, you may want (possibly overlapping) "public" and "private" named graphs, defined as constants:

Code Block
languagejava
titleConstant Named Graphs
linenumberstrue
from("direct:update")
  .to("fcrepo:localhost:8080/rest")
  .multicast("direct:public", "direct:private");

from("direct:public")
  .filter(publicPredicate)
    .setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
      .constant("http://site/graph/public")
    .process(new SparqlUpdateProcessor())
    .to("http4:localhost:3030/ds/update");

from("direct:private")
  .filter(privatePredicate)
    .setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
      .constant("http://site/graph/private")
    .process(new SparqlUpdateProcessor())
    .to("http4:localhost:3030/ds/update");

In each case the Sparql update operation will include a GRAPH <uri> { ... } clause, where uri is the value of the FCREPO_NAMED_GRAPH header (in the Spring DSL, this header can be accessed as CamelFcrepoNamedGraph). It is important that the value of the named graph header is a properly formed URI.

Event-based Routing

It is often helpful to route messages to different queues based on the eventType value. This example splits messages on eventType values and routes the messages to appropriate queues. Following this example, it would be prudent to aggregate the messages based on org.fcrepo.jms.identifier value after retrieving the messages from the downstream queues.

Code Block
languagexml
titleContent-based Routing
linenumberstrue
<route id="fcrepo-event-splitter">
    <description>
        Retrieve messages from the fedora topic. Event types are comma-delimited, so split them into separate messages before routing them.
    </description>
    <from uri="activemq:topic:fedora"/>
    <setBody>
        <simple>${header.org.fcrepo.jms.eventType}</simple>
    </setBody>
    <split>
        <tokenize token=","/>
        <setHeader headerName="org.fcrepo.jms.eventType">
            <simple>${body}</simple>
        </setHeader>
        <setBody>
            <simple>null</simple>
        </setBody>
        <to uri="seda:fcrepo-event-router"/>
    </split>
</route>
 
<route id="fcrepo-event-router">
    <description>
        Route messages based on the eventType.
    </description>
    <from uri="seda:fcrepo-event-router"/>
    <choice>
        <when>
            <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#NODE_REMOVED"</simple>
            <to uri="activemq:queue:fcrepo.delete"/>
        </when>
        <when>
            <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#NODE_ADDED"</simple>
            <to uri="activemq:queue:fcrepo.add"/>
        </when>
        <when>
            <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_ADDED"</simple>
            <to uri="activemq:queue:fcrepo.update"/>
        </when>
        <when>
            <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_CHANGED"</simple>
            <to uri="activemq:queue:fcrepo.update"/>
        </when>
        <when>
            <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_REMOVED"</simple>
            <to uri="activemq:queue:fcrepo.update"/>
        </when>
        <otherwise>
            <log message="No router for ${header.org.fcrepo.jms.eventType}"/>
        </otherwise>
    </choice>
</route>

Supporting Queues

The default configuration is fine for locally-deployed listeners, but it can be problematic in a distributed context. For instance, if the listener is restarted while a message is sent to the topic, that message may be missed. Furthermore, if there is a networking hiccup between Fedora's local broker and the remote listener, that too can result in lost messages. Instead, in this case, a queue may be better suited.

...