Actions
Feature #5090
openadd new adaptor type : <writer>
Start date:
09/17/2013
Due date:
% Done:
0%
Estimated time:
Description
Adaptors example:
- SQLWriter
- HTTPWriter
- FileWriter
- EmailWriter
Usage example:
<view name="ldap-to-sql">
<connector type="LDAPConnector"/>
<writer type="SQLWriter" match="/o/cn[@id='fr']/ou[@id='in2p3']/dn"
recovery-base-directory="$TMP/recovery" recovery-period="PT1M">
<parameter name="url">jdbc:...</parameter>
<parameter name="query">INSERT INTO my_table (?,?)</parameter>
<parameter name="parameters">
<entry eval="@id"/>
<entry eval="ancestor::cn/@id"/>
</parameter>
<trigger type="DeltaTimeTrigger">
<parameter name="hours">1</parameter>
</trigger>
</writer>
</view>
Interface:
public interface Writer extends Adaptor {
/**
* @return the list of exceptions caused by bad configuration (event will not be retried).
*/
public Exception[] getConfigurationExceptions();
/**
* A resource can be for example: a connection to database, an URL to an AVAILABLE web service, a pointer to file...
* @throws ResourceAlreadyOpened if resource was already opened.
* @return true if opening resource is successful, else false.
*/
public boolean openResource() throws ResourceAlreadyOpened;
public void closeResource();
}
Algorithm:
public class WriterLink {
private File m_base_directory;
private Duration m_retry_period;
private EventStore m_unrecoverable;
private EventStore m_recoverable;
private Writer m_writer;
private Set<Exception> m_configuration_exceptions;
public WriterLink() {
m_configuration_exceptions = new HashSet<Exception>();
m_configuration_exceptions.addAll(Arrays.asList(m_writer.getConfigurationExceptions()));
}
public void write(Xml view) {
EventReader input = new EventReader(view);
m_unrecoverable = new EventStore(m_viewId);
m_unrecoverable.write(StartElement("root"));
while(!input.isEmpty() && !isTriggered) {
label: while(!isTriggered) {
switch (m_writer.openResource()) {
case WAS_ALREADY_OPENED:
Thread.sleep(m_retry_period);
break label;
case IS_OPENED:
break label;
case IS_CLOSED:
Thread.sleep(m_retry_period);
break;
}
}
m_recoverable = new EventStore(m_viewId);
try {
for (event : input.getEvents()) {
switch(event.getState()) {
case UNSELECTED:
m_recoverable.write(event);
break;
case SELECTED:
this.write(event.getCurrent());
break;
case DESCENDANT_OF_SELECTED:
break;
}
}
} catch (ResourceClosedException e) {
for (event : input.getEvents()) {
m_recoverable.write(event);
}
}
m_recoverable.close(); // remove the file if if contains no event
m_recoverable.moveTo(input);
}
m_writer.close();
m_unrecoverable.write(EndElement("root"));
m_unrecoverable.close(); // remove the file if it contains no event
}
public void write(Element current) throws ResourceClosedException {
try {
m_writer.write(current);
} catch (Exception e) {
if (m_configuration_exceptions.contains(e)) {
m_unrecoverable.write(current);
} else {
switch (m_writer.openResource()) {
case WAS_ALREADY_OPENED:
m_recoverable.write(current);
break;
case IS_OPENED:
try {
m_writer.write(current);
} catch (Exception e) {
m_recoverable.write(current);
}
break;
case IS_CLOSED:
throw new ResourceClosedException();
}
}
}
}
}
Actions