Feature #5090
Updated by Reynaud Sylvain about 11 years ago
Adaptors example:
* SQLWriter
* HTTPWriter
* FileWriter
* EmailWriter
Usage example:
<pre><code class="xml">
<view name="ldap-to-sql">
<connector type="LDAPConnector"/>
<writer type="SQLWriter" match="/o/cn[@id='fr']/ou[@id='in2p3']/dn"
recovery-base-directory="$TMP/recovery" recovery-period="PT1M">
<parameter name="url">jdbc:...</parameter>
<parameter name="query">INSERT INTO my_table (?,?)</parameter>
<parameter name="parameters">
<entry eval="@id"/>
<entry eval="ancestor::cn/@id"/>
</parameter>
<trigger type="DeltaTimeTrigger">
<parameter name="hours">1</parameter>
</trigger>
</writer>
</view>
</code></pre>
Interface:
<pre><code class="java">
public interface Writer extends Adaptor {
/**
* @return the list of exceptions caused by bad configuration (event will not be retried).
*/
public Exception[] getConfigurationExceptions();
/**
* A resource can be for example: a connection to database, an URL to an AVAILABLE web service, a pointer to file...
* @throws ResourceAlreadyOpened if resource was already opened.
* @return true if opening resource is successful, else false.
*/
public boolean openResource() throws ResourceAlreadyOpened;
public void closeResource();
}
</code></pre>
Algorithm:
<pre><code class="java">
public class WriterLink {
private File m_base_directory;
private Duration m_retry_period;
private EventStore m_unrecoverable;
private EventStore m_recoverable;
private Writer m_writer;
private Set<Exception> m_configuration_exceptions;
public WriterLink() {
m_configuration_exceptions = new HashSet<Exception>();
m_configuration_exceptions.addAll(Arrays.asList(m_writer.getConfigurationExceptions()));
}
public void write(Xml view) {
EventReader input = new EventReader(view);
m_unrecoverable = new EventStore(m_viewId);
m_unrecoverable.write(StartElement("root"));
while(!input.isEmpty() && !isTriggered) {
label: while(!isTriggered) {
switch (m_writer.openResource()) {
case WAS_ALREADY_OPENED:
Thread.sleep(m_retry_period);
break label;
case IS_OPENED:
break label;
case IS_CLOSED:
Thread.sleep(m_retry_period);
break;
}
}
m_recoverable = new EventStore(m_viewId);
try {
for (event : input.getEvents()) {
switch(event.getState()) {
case UNSELECTED:
m_recoverable.write(event);
break;
case SELECTED:
this.write(event.getCurrent());
break;
case DESCENDANT_OF_SELECTED:
break;
}
}
} catch (ResourceClosedException e) {
for (event : input.getEvents()) {
m_recoverable.write(event);
}
}
m_recoverable.close(); // remove the file if if contains no event
m_recoverable.moveTo(input);
}
m_writer.close();
m_unrecoverable.write(EndElement("root"));
m_unrecoverable.close(); // remove the file if it contains no event
}
public void write(Element current) throws ResourceClosedException {
try {
m_writer.write(current);
} catch (Exception e) {
if (m_configuration_exceptions.contains(e)) {
m_unrecoverable.write(current);
} else {
switch (m_writer.openResource()) {
case WAS_ALREADY_OPENED:
m_recoverable.write(current);
break;
case IS_OPENED:
try {
m_writer.write(current);
} catch (Exception e) {
m_recoverable.write(current);
}
break;
case IS_CLOSED:
throw new ResourceClosedException();
}
}
}
}
}
</code></pre>