what are JMS equivalents of DirectChannel and QueueChannel in the JAVA DSL? -
i have developed think solution in spring boot , integration, using java dsl, directchannel , queuechannel beans. based upon example code routertests#testmethodinvokingrouter2.
now want move activemq. if import activemqautoconfiguration instance of connectionfactory. how replace following beans jms equivalents?:
@bean(name = "failed-channel") public messagechannel failedchannel() { return new directchannel(); } @bean(name = "retry-channel") public messagechannel retrychannel() { return new queuechannel(); } @bean(name = "exhausted-channel") public messagechannel exhaustedchannel() { return new queuechannel(); }
is there easy way or barking wrong tree?
complete code below
@contextconfiguration @runwith(springjunit4classrunner.class) @dirtiescontext public class retryroutertests { /** failed download attempts sent channel routed {@link contextconfiguration#faileddownloadrouting( ) } */ @autowired @qualifier("failed-channel") private messagechannel failed; /** retry attempts failed downloads sent channel {@link contextconfiguration#faileddownloadrouting( ) }*/ @autowired @qualifier("retry-channel") private pollablechannel retrychannel; /** failed download attempts not retried, sent channel {@link contextconfiguration#faileddownloadrouting( ) }*/ @autowired @qualifier("exhausted-channel") private pollablechannel exhaustedchannel; /** * unit test of {@link contextconfiguration#faileddownloadrouting( ) } , {@link retryrouter}. */ @test public void retryrouting() { final int limit = 2; message<?> message = failed( 0, limit); ( int attempt = 0 ; attempt <= limit * 2; attempt++ ){ this.failed.send( message ); if ( attempt < limit){ message = this.retrychannel.receive( ); assertequals( payload( 0 ) , message.getpayload( ) ); assertnull(this.exhaustedchannel.receive( 0 ) ); }else{ assertequals( payload( 0 ) , this.exhaustedchannel.receive( ).getpayload( ) ); assertnull( this.retrychannel.receive( 0 ) ); } } } private message<string> failed( int attempt , int limit ) { return messagebuilder .withpayload( payload( attempt ) ) .setheader("limit", limit) .build(); } private string payload (int attempt){ return "download attempt "+attempt; } @configuration @import({/*activemqautoconfiguration.class,*/ integrationautoconfiguration.class}) public static class contextconfiguration { @bean(name = "failed-channel") public messagechannel failedchannel() { return new directchannel(); } @bean(name = "retry-channel") public messagechannel retrychannel() { return new queuechannel(); } @bean(name = "exhausted-channel") public messagechannel exhaustedchannel() { return new queuechannel(); } /** * decides if failed download attempt can retried or not, based upon number of attempts made * , limit number of attempts may made. logic in {@link retryrouter}. * <p> * number of download attempts made maintained header {@link #attempts}, * , limit number of attempts header {@link #retrylimit} setup upstream * header {@link downloaddispatcher} retry configuration. * <p> * messages failed download attempts listened on channel {@link #failedchannel()}, * routed to {@link #retrychannel()} attempt or routed {@link #exhaustedchannel()} when there no more retries made. * <p> * refer http://stackoverflow.com/questions/34693248/how-to-increment-a-message-header how increment attempts header. * * @return {@link integrationflow} defining retry routing message flows */ @bean public integrationflow faileddownloadrouting() { return integrationflows.from( "failed-channel" ) .handle( logmessage ( "failed" ) ) // adds download attempt counter when absent, first iteration .enrichheaders( h -> h.header("attempts", new atomicinteger( 0 ) ) ) // incremented download attempt counter every retry .handle( new generichandler<message<string>>( ) { @override public object handle( message<string> payload , map<string,object> headers ) { ((atomicinteger)headers.get( "attempts" )).getandincrement(); return payload; }}) .handle( logmessage ( "incremented" ) ) .route( new retryrouter( ) ) .get(); } /** * decides if failed download attempt can retried or not, based upon number of attempts made * , limit number of attempts may made. * <p> */ private static class retryrouter { /** * @param attempts current accumulated number of failed download attempts * @param limit maximum number of download attempts * @return string channel name message routed */ @router public string route(@header("attempts") atomicinteger attempts , @header("limit") integer limit) { if ( attempts.intvalue( ) <= limit.intvalue( ) ){ return "retry-channel"; } return "exhausted-channel"; } } } }
it's not clear mean "replace" in context.
if mean channels equivalent those, backed jms queue (for persistence reasons only), use jms.channel(cf)
subscribablechannel
(directchannel
subscribablechannel
) , jms.pollablechannel(cf)
(queuechannel
pollablechannel
).
for completeness, publishsubscribechannel
can replaced jms.publishsubscribechannel(cf)
.
however, if want send contents of channels jms distribution other systems, it's better use jms.outboundadapter()
subscribed directchannel
or polling queuechannel
.
jms-backed channels not intended message distribution; intended provide message persistence in order prevent data loss.
Comments
Post a Comment