what are JMS equivalents of DirectChannel and QueueChannel in the JAVA DSL? -


i have developed think solution in spring boot , integration, using java dsl, directchannel , queuechannel beans. based upon example code routertests#testmethodinvokingrouter2.

now want move activemq. if import activemqautoconfiguration instance of connectionfactory. how replace following beans jms equivalents?:

@bean(name = "failed-channel") public messagechannel failedchannel() {     return new directchannel(); }  @bean(name = "retry-channel") public messagechannel retrychannel() {     return new queuechannel(); }  @bean(name = "exhausted-channel") public messagechannel exhaustedchannel() {     return new queuechannel(); } 

is there easy way or barking wrong tree?

complete code below

@contextconfiguration @runwith(springjunit4classrunner.class) @dirtiescontext public class retryroutertests {      /** failed download attempts sent channel routed {@link contextconfiguration#faileddownloadrouting( ) } */     @autowired     @qualifier("failed-channel")     private messagechannel failed;      /** retry attempts failed downloads sent channel {@link contextconfiguration#faileddownloadrouting( ) }*/     @autowired     @qualifier("retry-channel")     private pollablechannel retrychannel;      /** failed download attempts not retried, sent channel {@link contextconfiguration#faileddownloadrouting( ) }*/     @autowired     @qualifier("exhausted-channel")     private pollablechannel exhaustedchannel;      /**      * unit test of {@link contextconfiguration#faileddownloadrouting( ) } , {@link retryrouter}.      */     @test     public void retryrouting() {          final int limit = 2;          message<?> message = failed( 0, limit);          ( int attempt = 0 ; attempt <= limit * 2; attempt++ ){              this.failed.send( message );              if ( attempt < limit){                  message = this.retrychannel.receive( );                 assertequals( payload( 0 ) , message.getpayload( ) );                 assertnull(this.exhaustedchannel.receive( 0 ) );              }else{                  assertequals( payload( 0 ) , this.exhaustedchannel.receive( ).getpayload( ) );                 assertnull( this.retrychannel.receive( 0 ) );             }         }     }      private message<string> failed( int attempt , int limit ) {          return messagebuilder             .withpayload(  payload( attempt ) )             .setheader("limit", limit)             .build();     }      private string payload (int attempt){         return "download attempt "+attempt;     }       @configuration     @import({/*activemqautoconfiguration.class,*/ integrationautoconfiguration.class})     public static class contextconfiguration {          @bean(name = "failed-channel")         public messagechannel failedchannel() {             return new directchannel();         }          @bean(name = "retry-channel")         public messagechannel retrychannel() {             return new queuechannel();         }          @bean(name = "exhausted-channel")         public messagechannel exhaustedchannel() {             return new queuechannel();         }          /**          * decides if failed download attempt can retried or not, based upon number of attempts made           * , limit number of attempts may made. logic in {@link retryrouter}.          * <p>          * number of download attempts made maintained header {@link #attempts},           * , limit number of attempts header {@link #retrylimit} setup upstream          * header {@link downloaddispatcher} retry configuration.          * <p>          * messages failed download attempts listened on channel {@link #failedchannel()},           * routed to {@link #retrychannel()} attempt or routed {@link #exhaustedchannel()} when there no more retries made.          * <p>          * refer http://stackoverflow.com/questions/34693248/how-to-increment-a-message-header how increment attempts header.          *            * @return {@link integrationflow} defining retry routing message flows          */         @bean         public integrationflow faileddownloadrouting() {              return integrationflows.from( "failed-channel" )                  .handle( logmessage ( "failed" ) )                  // adds download attempt counter when absent, first iteration                  .enrichheaders( h -> h.header("attempts", new atomicinteger( 0 ) ) )                  // incremented download attempt counter every retry                 .handle( new generichandler<message<string>>( ) {                      @override                     public object handle( message<string> payload , map<string,object> headers ) {                          ((atomicinteger)headers.get( "attempts" )).getandincrement();                         return payload;                     }})                  .handle( logmessage ( "incremented" ) )                  .route( new retryrouter( ) )                  .get();         }          /**          * decides if failed download attempt can retried or not, based upon number of attempts made           * , limit number of attempts may made.           * <p>          */         private static class retryrouter {              /**              * @param attempts current accumulated number of failed download attempts               * @param limit maximum number of download attempts              * @return string channel name message routed              */             @router             public string route(@header("attempts") atomicinteger attempts , @header("limit") integer limit) {                  if ( attempts.intvalue( ) <= limit.intvalue( ) ){                     return "retry-channel";                 }                 return "exhausted-channel";             }         }     } } 

it's not clear mean "replace" in context.

if mean channels equivalent those, backed jms queue (for persistence reasons only), use jms.channel(cf) subscribablechannel (directchannel subscribablechannel) , jms.pollablechannel(cf) (queuechannel pollablechannel).

for completeness, publishsubscribechannel can replaced jms.publishsubscribechannel(cf).

however, if want send contents of channels jms distribution other systems, it's better use jms.outboundadapter() subscribed directchannel or polling queuechannel.

jms-backed channels not intended message distribution; intended provide message persistence in order prevent data loss.


Comments

Popular posts from this blog

get url and add instance to a model with prefilled foreign key :django admin -

css - Make div keyboard-scrollable in jQuery Mobile? -

android - Keyboard hides my half of edit-text and button below it even in scroll view -