Project

General

Profile

Feature #5090

Updated by Reynaud Sylvain about 11 years ago

Adaptors example: 
 * SQLWriter 
 * HTTPWriter 
 * FileWriter 
 * EmailWriter 

 Usage example: 
 <pre><code class="xml"> 
 <view name="ldap-to-sql"> 
     <connector type="LDAPConnector"/> 
     <writer type="SQLWriter" match="/o/cn[@id='fr']/ou[@id='in2p3']/dn" 
             recovery-base-directory="$TMP/recovery" recovery-period="PT1M"> 
         <parameter name="url">jdbc:...</parameter> 
         <parameter name="query">INSERT INTO my_table (?,?)</parameter> 
         <parameter name="parameters"> 
             <entry eval="@id"/> 
             <entry eval="ancestor::cn/@id"/> 
         </parameter> 
         <trigger type="DeltaTimeTrigger"> 
             <parameter name="hours">1</parameter> 
         </trigger> 
     </writer> 
 </view> 
 </code></pre> 

 Interface: 
 <pre><code class="java"> 
 public interface Writer extends Adaptor { 
     /** 
      * @return the list of exceptions caused by bad configuration (event will not be retried). 
      */ 
     public Exception[] getConfigurationExceptions(); 

     /** 
      * A resource can be for example: a connection to database, an URL to an AVAILABLE web service, a pointer to file... 
      * @throws ResourceAlreadyOpened if resource was already opened. 
      * @return true if opening resource is successful, else false. 
      */ 
     public boolean openResource() throws ResourceAlreadyOpened; 
     public void closeResource(); 
 } 
 </code></pre> 

 Algorithm: 
 <pre><code class="java"> 
 public class WriterLink { 
     private File m_base_directory; 
     private Duration m_retry_period; 
     private EventStore m_unrecoverable; 
     private EventStore m_recoverable; 
     private Writer m_writer; 
     private Set<Exception> m_configuration_exceptions; 

     public WriterLink() { 
         m_configuration_exceptions = new HashSet<Exception>(); 
         m_configuration_exceptions.addAll(Arrays.asList(m_writer.getConfigurationExceptions())); 
     } 

     public void write(Xml view) { 
         EventReader input = new EventReader(view); 
         m_unrecoverable = new EventStore(m_viewId); 
         m_unrecoverable.write(StartElement("root")); 
         while(!input.isEmpty() && !isTriggered) { 
             label: while(!isTriggered) { 
                 switch (m_writer.openResource()) { 
                     case WAS_ALREADY_OPENED: 
                         Thread.sleep(m_retry_period); 
                         break label; 
                     case IS_OPENED: 
                         break label; 
                     case IS_CLOSED: 
                         Thread.sleep(m_retry_period); 
                         break; 
                 } 
             } 
             m_recoverable = new EventStore(m_viewId); 
             try { 
                 for (event : input.getEvents()) { 
                     switch(event.getState()) { 
                         case UNSELECTED: 
                             m_recoverable.write(event); 
                             break; 
                         case SELECTED: 
                             this.write(event.getCurrent()); 
                             break; 
                         case DESCENDANT_OF_SELECTED: 
                             break; 
                     } 
                 } 
             } catch (ResourceClosedException e) { 
                 for (event : input.getEvents()) { 
                     m_recoverable.write(event); 
                 } 
             } 
             m_recoverable.close(); // remove the file if if contains no event 
             m_recoverable.moveTo(input); 
         } 
         m_writer.close(); 
         m_unrecoverable.write(EndElement("root")); 
         m_unrecoverable.close(); // remove the file if it contains no event 
     } 
     public void write(Element current) throws ResourceClosedException { 
         try { 
             m_writer.write(current); 
         } catch (Exception e) { 
             if (m_configuration_exceptions.contains(e)) { 
                 m_unrecoverable.write(current); 
             } else { 
                 switch (m_writer.openResource()) { 
                     case WAS_ALREADY_OPENED: 
                         m_recoverable.write(current); 
                         break; 
                     case IS_OPENED: 
                         try { 
                             m_writer.write(current); 
                         } catch (Exception e) { 
                             m_recoverable.write(current); 
                         } 
                         break; 
                     case IS_CLOSED: 
                         throw new ResourceClosedException(); 
                 } 
             } 
         } 
     } 
 } 
 </code></pre>

Back