...
The properties
field will list the RDF properties that changed with that event. NODE_REMOVED
events contain no properties. The fcrepo component for Camel is configured to recognize these headers and act appropriately.
Examples
LDPath Transformations
If an fcr:transform
program has been installed as mytransform
, you can generate a JSON representation of a container and send it to a low-latency, highly available document store, such as Riak. The following route determines if a container has been removed or simply added/updated. It then routes the message appropriately to a load-balancer sitting in front of the Riak HTTP endpoint.
Code Block |
---|
language | scala |
---|
title | Camel route to populate a Riak store, using the Scala DSL |
---|
linenumbers | true |
---|
|
val riakKeyProcessor = (exchange: Exchange) => {
exchange.getIn.setHeader(
Exchange.HTTP_PATH,
"/buckets/fcrepo/keys/" + URLEncoder.encode(exchange.getIn.getHeader("org.fcrepo.jms.identifier", classOf[String]))
)
}
"activemq:topic:fedora" ==> {
choice() {
when(_.in("org.fcrepo.jms.eventType") == "http://fedora.info/definitions/v4/repository#NODE_REMOVED") {
setHeader(Exchange.HTTP_METHOD, constant("DELETE"))
process(riakKeyProcessor)
to("http4:localhost:8098")
}
otherwise() {
to("fcrepo:localhost:8080/fedora/rest")
filter(xpathFilter) {
to("fcrepo:localhost:8080/fedora/rest?transform=mytransform")
setHeader(Exchange.HTTP_METHOD, constant("PUT"))
process(riakKeyProcessor)
to("http4:localhost:8098")
}
}
}
} |
External Triplestore
Some additional processing must be done to transform an application/n-triples
response into a valid application/sparql-update
payload before sending to an external triplestore such as Fuseki or Sesame. The fcrepo component contains some processors in org.fcrepo.camel.processor
to handle this case. The examples below assume that messages have already been routed based on eventType
(see below) and passed to the appropriate queue.
Code Block |
---|
language | java |
---|
title | Populate an external triplestore |
---|
linenumbers | true |
---|
|
from("direct:delete")
.process(new SparqlDescribeProcessor())
.to("http4:localhost:3030/db/query")
.process(new SparqlDeleteProcessor())
.to("http4:localhost:3030/db/update");
from("direct:new")
.to("fcrepo:localhost:8080/rest")
.process(new SparqlInsertProcessor())
.to("http4:localhost:3030/db/update");
from("direct:update")
.to("fcrepo:localhost:8080/rest")
.process(new SparqlUpdateProcessor())
.to("http4:localhost:3030/db/update"); |
When using these Sparql*
processor classes, it is also possible to apply these operations to named graphs. While Fedora does not support named graphs, it is possible to assign nodes to certain named graphs in an external triplestore. The CamelFcrepoNamedGraph
header can be used to apply a sparql-update
operation to a particular named graph. For instance, to route all operations to a named graph (in this case the graph URI is defined dynamically as a property placeholder:
Code Block |
---|
language | java |
---|
title | Named Graph |
---|
linenumbers | true |
---|
|
from("direct:update")
.to("fcrepo:localhost:8080/rest")
.setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
.simple("{{named.graph}}")
.process(new SparqlUpdateProcessor())
.to("http4:localhost:3030/ds/update"); |
Or, to partition the graph based on an existing RDF property:
Code Block |
---|
language | java |
---|
title | Multiple Named Graphs |
---|
linenumbers | true |
---|
|
from("direct:update")
.to("fcrepo:localhost:8080/rest")
.setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
.xpath("/rdf:RDF/rdf:Description/ex:namedGraph/text()", String.class, ns)
.process(new SparqlUpdateProcessor())
.to("http4:localhost:3030/ds/update"); |
Or, you may want (possibly overlapping) "public" and "private" named graphs, defined as constants:
Code Block |
---|
language | java |
---|
title | Constant Named Graphs |
---|
linenumbers | true |
---|
|
from("direct:update")
.to("fcrepo:localhost:8080/rest")
.multicast("direct:public", "direct:private");
from("direct:public")
.filter(publicPredicate)
.setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
.constant("http://site/graph/public")
.process(new SparqlUpdateProcessor())
.to("http4:localhost:3030/ds/update");
from("direct:private")
.filter(privatePredicate)
.setHeader(FcrepoHeaders.FCREPO_NAMED_GRAPH)
.constant("http://site/graph/private")
.process(new SparqlUpdateProcessor())
.to("http4:localhost:3030/ds/update"); |
In each case the Sparql update operation will include a GRAPH <uri> { ... }
clause, where uri
is the value of the FCREPO_NAMED_GRAPH
header (in the Spring DSL, this header can be accessed as CamelFcrepoNamedGraph
). It is important that the value of the named graph header is a properly formed URI.
Event-based Routing
It is often helpful to route messages to different queues based on the eventType
value. This example splits messages on eventType
values and routes the messages to appropriate queues. Following this example, it would be prudent to aggregate the messages based on org.fcrepo.jms.identifier
value after retrieving the messages from the downstream queues.
Code Block |
---|
language | xml |
---|
title | Content-based Routing |
---|
linenumbers | true |
---|
|
<route id="fcrepo-event-splitter">
<description>
Retrieve messages from the fedora topic. Event types are comma-delimited, so split them into separate messages before routing them.
</description>
<from uri="activemq:topic:fedora"/>
<setBody>
<simple>${header.org.fcrepo.jms.eventType}</simple>
</setBody>
<split>
<tokenize token=","/>
<setHeader headerName="org.fcrepo.jms.eventType">
<simple>${body}</simple>
</setHeader>
<setBody>
<simple>null</simple>
</setBody>
<to uri="seda:fcrepo-event-router"/>
</split>
</route>
<route id="fcrepo-event-router">
<description>
Route messages based on the eventType.
</description>
<from uri="seda:fcrepo-event-router"/>
<choice>
<when>
<simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#NODE_REMOVED"</simple>
<to uri="activemq:queue:fcrepo.delete"/>
</when>
<when>
<simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#NODE_ADDED"</simple>
<to uri="activemq:queue:fcrepo.add"/>
</when>
<when>
<simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_ADDED"</simple>
<to uri="activemq:queue:fcrepo.update"/>
</when>
<when>
<simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_CHANGED"</simple>
<to uri="activemq:queue:fcrepo.update"/>
</when>
<when>
<simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_REMOVED"</simple>
<to uri="activemq:queue:fcrepo.update"/>
</when>
<otherwise>
<log message="No router for ${header.org.fcrepo.jms.eventType}"/>
</otherwise>
</choice>
</route> |
Supporting Queues
The default configuration is fine for locally-deployed listeners, but it can be problematic in a distributed context. For instance, if the listener is restarted while a message is sent to the topic, that message may be missed. Furthermore, if there is a networking hiccup between Fedora's local broker and the remote listener, that too can result in lost messages. Instead, in this case, a queue may be better suited.
...