Project

General

Profile

Actions

Feature #5090

open

add new adaptor type : <writer>

Added by Reynaud Sylvain over 11 years ago. Updated over 10 years ago.

Status:
New
Priority:
Low
Assigned To:
-
Category:
Engine
Target version:
Start date:
09/17/2013
Due date:
% Done:

0%

Estimated time:

Description

Adaptors example:
  • SQLWriter
  • HTTPWriter
  • FileWriter
  • EmailWriter

Usage example:

<view name="ldap-to-sql">
    <connector type="LDAPConnector"/>
    <writer type="SQLWriter" match="/o/cn[@id='fr']/ou[@id='in2p3']/dn" 
            recovery-base-directory="$TMP/recovery" recovery-period="PT1M">
        <parameter name="url">jdbc:...</parameter>
        <parameter name="query">INSERT INTO my_table (?,?)</parameter>
        <parameter name="parameters">
            <entry eval="@id"/>
            <entry eval="ancestor::cn/@id"/>
        </parameter>
        <trigger type="DeltaTimeTrigger">
            <parameter name="hours">1</parameter>
        </trigger>
    </writer>
</view>

Interface:

public interface Writer extends Adaptor {
    /**
     * @return the list of exceptions caused by bad configuration (event will not be retried).
     */
    public Exception[] getConfigurationExceptions();

    /**
     * A resource can be for example: a connection to database, an URL to an AVAILABLE web service, a pointer to file...
     * @throws ResourceAlreadyOpened if resource was already opened.
     * @return true if opening resource is successful, else false.
     */
    public boolean openResource() throws ResourceAlreadyOpened;
    public void closeResource();
}

Algorithm:

public class WriterLink {
    private File m_base_directory;
    private Duration m_retry_period;
    private EventStore m_unrecoverable;
    private EventStore m_recoverable;
    private Writer m_writer;
    private Set<Exception> m_configuration_exceptions;

    public WriterLink() {
        m_configuration_exceptions = new HashSet<Exception>();
        m_configuration_exceptions.addAll(Arrays.asList(m_writer.getConfigurationExceptions()));
    }

    public void write(Xml view) {
        EventReader input = new EventReader(view);
        m_unrecoverable = new EventStore(m_viewId);
        m_unrecoverable.write(StartElement("root"));
        while(!input.isEmpty() && !isTriggered) {
            label: while(!isTriggered) {
                switch (m_writer.openResource()) {
                    case WAS_ALREADY_OPENED:
                        Thread.sleep(m_retry_period);
                        break label;
                    case IS_OPENED:
                        break label;
                    case IS_CLOSED:
                        Thread.sleep(m_retry_period);
                        break;
                }
            }
            m_recoverable = new EventStore(m_viewId);
            try {
                for (event : input.getEvents()) {
                    switch(event.getState()) {
                        case UNSELECTED:
                            m_recoverable.write(event);
                            break;
                        case SELECTED:
                            this.write(event.getCurrent());
                            break;
                        case DESCENDANT_OF_SELECTED:
                            break;
                    }
                }
            } catch (ResourceClosedException e) {
                for (event : input.getEvents()) {
                    m_recoverable.write(event);
                }
            }
            m_recoverable.close(); // remove the file if if contains no event
            m_recoverable.moveTo(input);
        }
        m_writer.close();
        m_unrecoverable.write(EndElement("root"));
        m_unrecoverable.close(); // remove the file if it contains no event
    }
    public void write(Element current) throws ResourceClosedException {
        try {
            m_writer.write(current);
        } catch (Exception e) {
            if (m_configuration_exceptions.contains(e)) {
                m_unrecoverable.write(current);
            } else {
                switch (m_writer.openResource()) {
                    case WAS_ALREADY_OPENED:
                        m_recoverable.write(current);
                        break;
                    case IS_OPENED:
                        try {
                            m_writer.write(current);
                        } catch (Exception e) {
                            m_recoverable.write(current);
                        }
                        break;
                    case IS_CLOSED:
                        throw new ResourceClosedException();
                }
            }
        }
    }
}

Actions #1

Updated by Reynaud Sylvain over 11 years ago

  • Description updated (diff)
Actions #2

Updated by Reynaud Sylvain over 11 years ago

  • Category set to Engine
Actions #3

Updated by Reynaud Sylvain over 10 years ago

  • Target version set to later
Actions

Also available in: Atom PDF