Advanced usage

This chapter explains how to work with dynamic Process features like create or delete queues, run or stop message processors at runtime.

List of features detailed in this page

  • Queue creation and deletion
  • Check for queue existence
  • Obtain a listing of existing queue with the prefix, the name and the message count for each one
  • Enqueue message in a queue with a property
  • Count messages in a queue (with or without a filter)
  • List messages in a queue (with or without a filter) obtaining for each the human readable representation, the JMS id and the creation date
  • Delete a message from a queue using its JMS ID
  • Delete messages from a queue using a filter
  • Empty a queue
  • Running a message processor to process message from a queue
  • Count message processors running on a queue
  • Count message processors of a particular type (filter) running on a queue.
  • Stop a message processors of a particular type (filter) running on a queue.
  • Stop every message processors running on a queue.

Our IJMSMessage and IJMSMessageProcessor implementation used for this example

The ExampleMessage

This is a very simple IJMSMessage implementation. It only holds a String identifier.

package com.moovapps.example2;

import com.axemble.vdoc.sdk.interfaces.IJMSMessage;

public class ExampleMessage implements IJMSMessage {
	
	private final String identifier;
	
	public ExampleMessage(String identifier) {
		this.identifier = identifier;
	}
	
	@Override
	public String getReadableRepresentation() {
		return "message_with_id_" + identifier;
	}

	public String getIdentifier() {
		return identifier;
	}
}

The ExampleMessageProcessor

This IJMSMessageProcessor implementation simply writes a log as a way of processing messages. Its only particularity is to be newed with a property value passed to the constructor in order to use it for the message filter.

package com.moovapps.example2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.axemble.vdoc.sdk.interfaces.IJMSMessageProcessor;

public class ExampleMessageProcessor implements IJMSMessageProcessor<com.moovapps.example2.ExampleMessage> {
	private static final Logger LOGGER = LoggerFactory.getLogger(ExampleMessageProcessor.class);
	
	protected static final String PROPERTY_COLOR_NAME = "colorProperty";
	protected static final String PROPERTY_COLOR_VALUE_BLUE = "blue";
	protected static final String PROPERTY_COLOR_VALUE_RED = "red";
	
	private final String propertyColorValue;
	
	public ExampleMessageProcessor(String propertyColorValue) {
		this.propertyColorValue = propertyColorValue;
	}
	
	@Override
	public String getMessageFilter() {
		// This processor will only receive message for the matching color property.
		return PROPERTY_COLOR_NAME + " = '" + propertyColorValue + "'";
	}
	
	
	@Override
	public void processMessage(ExampleMessage exampleMessage) {
		LOGGER.error("Processing the following message: " + exampleMessage.getReadableRepresentation());
	}
}

Play with the Process JMS SDK at runtime

Here is an example Process agent showing how to access the JMS SDK using our ExampleMessage and ExampleMessageProcessor. The reading of the code should be quite explicit :).

package com.moovapps.example2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import com.axemble.vdoc.sdk.agent.base.BaseAgent;
import com.axemble.vdoc.sdk.exceptions.JMSQueueExistsException;
import com.axemble.vdoc.sdk.exceptions.JMSQueueMissingException;
import com.axemble.vdoc.sdk.exceptions.PortalModuleException;
import com.axemble.vdoc.sdk.interfaces.IJMSController;
import com.axemble.vdoc.sdk.interfaces.IJMSMessageBean;
import com.axemble.vdoc.sdk.interfaces.IJMSMessageProcessor;
import com.axemble.vdoc.sdk.interfaces.IJMSMessageProperty;
import com.axemble.vdoc.sdk.interfaces.IJMSQueueBean;
import com.axemble.vdoc.sdk.interfaces.IJMSQueueController;

public class ExampleAdvancedJMSAgent extends BaseAgent {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(ExampleAdvancedJMSAgent.class);
	
	private static final String QUEUE_COLORS_PREFIX = "example";
	private static final String QUEUE_COLORS_NAME = "colors";
	private static final String QUEUE_TEST_DELETE_QUEUE_NAME = "testDeleteQueue";
	
	@Override
	protected void execute() {
		try {
			// Retrieve the IJMSController
			IJMSController jmsController = getPortalModule().getJMSController();
			
			// STEP 1 : Delete previously created queues
			LOGGER.error("STEP 1 : Delete previously created queues");
			cleanQueuesIfExist(jmsController);
			
			// STEP 2 : Create two queues
			LOGGER.error("STEP 2 : Create two queues");
			jmsController.createQueue(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME);
			jmsController.createQueue(QUEUE_COLORS_PREFIX, QUEUE_TEST_DELETE_QUEUE_NAME);
			
			// STEP 3 : List existing queues
			StringBuilder existingQueuesStrBuilder = new StringBuilder();
			existingQueuesStrBuilder.append("STEP 3 : Listing all existing queues: ");
			for (IJMSQueueBean queue : jmsController.getQueues()) {
				existingQueuesStrBuilder.append(queue.getContextPrefix());
				existingQueuesStrBuilder.append("#");
				existingQueuesStrBuilder.append(queue.getName());
				existingQueuesStrBuilder.append("(");
				existingQueuesStrBuilder.append(queue.getMessagesCount());
				existingQueuesStrBuilder.append(" msgs);");
			}
			LOGGER.error(existingQueuesStrBuilder.toString());
			
			// STEP 4 : Test if a queue exists
			if (jmsController.queueExist(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME)) {
				LOGGER.error("STEP 4 : The queue " + QUEUE_COLORS_NAME + " has been created successfully");
			}
			
			// STEP 5 & 6 : Delete a queue
			if (jmsController.queueExist(QUEUE_COLORS_PREFIX, QUEUE_TEST_DELETE_QUEUE_NAME)) {
				LOGGER.error("STEP 5 : The queue " + QUEUE_TEST_DELETE_QUEUE_NAME + " has been created successfully");
				jmsController.deleteQueue(QUEUE_COLORS_PREFIX, QUEUE_TEST_DELETE_QUEUE_NAME);
				if (!jmsController.queueExist(QUEUE_COLORS_PREFIX, QUEUE_TEST_DELETE_QUEUE_NAME)) {
					LOGGER.error("STEP 6 : The queue " + QUEUE_TEST_DELETE_QUEUE_NAME + " has been deleted successfully");
				}
			}
			
			// STEP 7 : Enqueue three messages using an IJMSQueueController.
			LOGGER.error("STEP 7 : Enqueue three messages");
			enqueueABlueAndTwoRedMessage(jmsController);
			
			// STEP 8 : Count message in the queue
			long messageCount = jmsController.countMessage(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME);
			LOGGER.error("STEP 8 : The queue contains " + messageCount + " messages.");
			
			// STEP 9 : Count message in the queue filtering on the blue property
			long messageCountBlue = jmsController.countMessage(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME,
				ExampleMessageProcessor.PROPERTY_COLOR_NAME + " = '" + ExampleMessageProcessor.PROPERTY_COLOR_VALUE_BLUE + "'");
			LOGGER.error("STEP 9 : The queue contains " + messageCountBlue + " blue messages.");
			
			// STEP 10 : List messages in the queue
			StringBuilder messageLstStrBuilder = new StringBuilder();
			messageLstStrBuilder.append("STEP 10 : Listing all messages in the queue: ");
			for (IJMSMessageBean message : jmsController.getMessages(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME, 100)) {
				messageLstStrBuilder.append(message.getReadableRepresentation());
				messageLstStrBuilder.append(" with JMS id ");
				messageLstStrBuilder.append(message.getId());
				messageLstStrBuilder.append(" (");
				messageLstStrBuilder.append(new Date(message.getCreationDate()).toString());
				messageLstStrBuilder.append(")");
				messageLstStrBuilder.append(";");
			}
			LOGGER.error(messageLstStrBuilder.toString());
			
			// STEP 11 : List message in the queue filtering on the red property and delete it
			StringBuilder redMessageLstStrBuilder = new StringBuilder();
			redMessageLstStrBuilder.append("STEP 11 : Listing all red messages in the queue: ");
			for (IJMSMessageBean message : jmsController.getMessages(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME,
				ExampleMessageProcessor.PROPERTY_COLOR_NAME + " = '" + ExampleMessageProcessor.PROPERTY_COLOR_VALUE_RED + "'", 100)) {
				redMessageLstStrBuilder.append(message.getReadableRepresentation());
				redMessageLstStrBuilder.append(";");
				jmsController.deleteMessage(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME, message.getId());
			}
			LOGGER.error(redMessageLstStrBuilder.toString());
			
			// STEP 12 : Delete all blue message from the queue
			LOGGER.error("STEP 12 : Delete all blue message from the queue");
			jmsController.deleteMessages(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME,
				ExampleMessageProcessor.PROPERTY_COLOR_NAME + " = '" + ExampleMessageProcessor.PROPERTY_COLOR_VALUE_BLUE + "'");
			
			// STEP 13 : Enqueue three new messages using an IJMSQueueController.
			LOGGER.error("STEP 13 : Enqueue three new messages");
			enqueueABlueAndTwoRedMessage(jmsController);
			
			// STEP 14 : Empty the queue
			LOGGER.error("STEP 14 : Empty the queue");
			jmsController.emptyQueue(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME);
			
			// STEP 15 : Enqueue three new messages using an IJMSQueueController.
			LOGGER.error("STEP 15 : Enqueue three new messages");
			enqueueABlueAndTwoRedMessage(jmsController);
			
			// STEP 16 : Start a blue message processor
			LOGGER.error("STEP 16 : Start a blue message processor");
			IJMSMessageProcessor<ExampleMessage> blueMessageProcessor = new ExampleMessageProcessor(
				ExampleMessageProcessor.PROPERTY_COLOR_VALUE_BLUE);
			jmsController.runConsumers(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME, blueMessageProcessor);
			
			// STEP 17 : Start a red message processor
			LOGGER.error("STEP 17 : Start a red message processor");
			IJMSMessageProcessor<ExampleMessage> redMessageProcessor = new ExampleMessageProcessor(
				ExampleMessageProcessor.PROPERTY_COLOR_VALUE_RED);
			jmsController.runConsumers(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME, redMessageProcessor);
			
			// STEP 18 : Count message processor running with a blue filter
			IJMSMessageProcessor<ExampleMessage> secondBlueMessageProcessor = new ExampleMessageProcessor(
				ExampleMessageProcessor.PROPERTY_COLOR_VALUE_BLUE);
			int blueProcessingRunningCount = jmsController.checkConsumers(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME, secondBlueMessageProcessor);
			LOGGER.error("STEP 18 : There are " + blueProcessingRunningCount + " message processor running with the blue filter");
			
			// STEP 19 : Count all message processor running on the queue
			int messageProcessorRunningCount = jmsController.checkConsumers(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME);
			LOGGER.error("STEP 19 : There are " + messageProcessorRunningCount + " message processor running on the queue");
			
			waitThreeSecondsWithoutThreadSleep();
			
			// STEP 20 : Stop blue message processors
			LOGGER.error("STEP 20 : Stop blue message processors");
			IJMSMessageProcessor<ExampleMessage> anotherBlueMessageProcessor = new ExampleMessageProcessor(
				ExampleMessageProcessor.PROPERTY_COLOR_VALUE_BLUE);
			jmsController.stopConsumers(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME, anotherBlueMessageProcessor);
			
			waitThreeSecondsWithoutThreadSleep();
			
			// STEP 21 : Stop all message processor running on the queue
			LOGGER.error("STEP 21 : Stop all message processor running on the queue");
			jmsController.stopConsumers(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME);
			
			// STEP 22 : Delete created queues
			LOGGER.error("STEP 22 : Delete created queues");
			cleanQueuesIfExist(jmsController);
		} catch (PortalModuleException | IOException | JMSQueueExistsException | JMSQueueMissingException | JMSException e) {
			throw new IllegalStateException(e);
		}
	}
	
	private void cleanQueuesIfExist(IJMSController jmsController) throws JMSQueueMissingException {
		if (jmsController.queueExist(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME)) {
			jmsController.deleteQueue(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME);
		}
		if (jmsController.queueExist(QUEUE_COLORS_PREFIX, QUEUE_TEST_DELETE_QUEUE_NAME)) {
			jmsController.deleteQueue(QUEUE_COLORS_PREFIX, QUEUE_TEST_DELETE_QUEUE_NAME);
		}
	}
	
	private void enqueueABlueAndTwoRedMessage(IJMSController jmsController) throws IOException, JMSException {
		// Please note that IJMSQueueController is Closable and needs to be closed.
		try (IJMSQueueController queueController = jmsController.createQueueController(QUEUE_COLORS_PREFIX, QUEUE_COLORS_NAME)) {
			// Create and send a new message with "blue" property
			ExampleMessage blueMessage = new ExampleMessage("firstMessage(blue)");
			List<IJMSMessageProperty<?>> propertiesForBlue = new ArrayList<>();
			propertiesForBlue.add(
				queueController.createProperty(ExampleMessageProcessor.PROPERTY_COLOR_NAME, ExampleMessageProcessor.PROPERTY_COLOR_VALUE_BLUE));
			queueController.putMessage(blueMessage, propertiesForBlue);
			
			// Create and send a new message with "red" property
			ExampleMessage redMessage = new ExampleMessage("secondMessage(red)");
			List<IJMSMessageProperty<?>> propertiesForRed = new ArrayList<>();
			propertiesForRed
				.add(queueController.createProperty(ExampleMessageProcessor.PROPERTY_COLOR_NAME, ExampleMessageProcessor.PROPERTY_COLOR_VALUE_RED));
			queueController.putMessage(redMessage, propertiesForRed);
			
			// Create and send a second message with "red" property
			ExampleMessage secondRedMessage = new ExampleMessage("thirdMessage(red)");
			List<IJMSMessageProperty<?>> propertiesForSecondRed = new ArrayList<>();
			propertiesForSecondRed
				.add(queueController.createProperty(ExampleMessageProcessor.PROPERTY_COLOR_NAME, ExampleMessageProcessor.PROPERTY_COLOR_VALUE_RED));
			queueController.putMessage(secondRedMessage, propertiesForSecondRed);
		}
	}
	
	private void waitThreeSecondsWithoutThreadSleep() {
		LOGGER.error("Wait for 3 seconds");
		long t = System.currentTimeMillis();
		long end = t + 3000;
		int sec = 0;
		while (System.currentTimeMillis() < end) {
			if (System.currentTimeMillis() > t + 1000 * (sec + 1)) {
				sec += 1;
				LOGGER.info(sec + "seconds");
			}
		}
	}
}