Standard usage

This chapter describes the simplest way to use JMS queue and process message with Process.

List of features detailed in this page

  • Define a structured message holding data
  • Emit a message on a queue (with or without properties, with or without particular priority)
  • Define a message processor in order to process messages from a queue
  • Define a filter on a message processor to process only particular message from a queue using message properties
  • Define a batch size on a message processor
  • Add treatment at the beginning/end of a batch processing
  • Define a per message processing timeout on a message processor
  • Define the number of parallel message processors to run
  • Declare a queue with prefix, name and its message processors to run on it

The JMS interfaces

Process exposes three interfaces you can implement in order to declare a JMS queue and process messages from this queue.

the IJMSMessage interface

Implementing this interface you can describe your message structure and data it has to hold. Please note that your implementation have to be serializable.

package com.axemble.vdoc.sdk.interfaces;

import java.io.Serializable;

public interface IJMSMessage extends Serializable {
	/**
	 * This method return a human readable representation of the message implementation. It is mainly used when logging and during messages processing to be able to get an idea of the subject of the message.
	 *
	 * @return the human readable reprensentation of the message
	 */
	String getReadableRepresentation();
}

the IJMSMessageProcessor interface

You have to implement this interface to describe the processing of your message. For example the number of messages you want to process in each lot, the number of parallel message processor to run, the treatment to do on each message. Please note that most of the methods have default implementation. If the default behaviour matches your needs you are not forced to override them.

package com.axemble.vdoc.sdk.interfaces;

public interface IJMSMessageProcessor<M extends IJMSMessage> {
	/**
	 * Return the filter/discriminant if needs. If not return null
	 * This filter only apply on message properties
	 * Example : "JMSType = 'car' AND color = 'blue' AND weight > 2500"
	 *
	 * @return the filter or null if none
	 * @implSpec The default implementation returns null. Therefore, no filter will be applied.
	 */
	default String getMessageFilter() {
		return null;
	}
	
	/**
	 * Return the batch size for message processing
	 *
	 * @return the batch size
	 * @implSpec The default implementation returns 50 to obtain a batch size bounded to 50 messages.
	 */
	default int getBatchSize() {
		return 50;
	}
	
	/**
	 * Return the timeout to process a unique message. If return is null then the default timeout is applied
	 *
	 * @return the timeout to process a message
	 * @implSpec The default implementation returns 60 to result in a 60 seconds timeout to process each message.
	 */
	default Integer getPerMessageSecondTimeout() {
		return 60;
	}
	
	/**
	 * Return the number of parallel consumers to run
	 * @return the number of parallel consumers to run
	 * @implSpec The default implementation returns 1
	 */
	default Integer getNumberOfParallelConsumers() { return 1; }
	
	/**
	 * Invoked before processing the message batch
	 *
	 * @implSpec The default implementation does nothing.
	 */
	default void beforeProcessMessages() {
		// Do nothing
	}
	
	/**
	 * Invoked to process a message
	 *
	 * @param message the message to process
	 */
	void processMessage(M message);
	
	/**
	 * Invoked after processing the message batch
	 *
	 * @implSpec The default implementation does nothing.
	 */
	default void afterProcessMessages() {
		// Do nothing
	}
}

the IJMSQueueDeclaration interface

Finally, you have to implement this interface to declare your JMS queue with a queue prefix, a queue name, the message processors to use.

package com.axemble.vdoc.sdk.interfaces;

import java.util.List;

/**
 * Implementation of this interface are looked up at application startup and used to create queue if doesnt exist and start consumers which
 * will process queue's messages.
 *
 * @param <M> the message implementation
 */
public interface IJMSQueueDeclaration<M extends IJMSMessage> {
	/**
	 * Return the context prefix for the queue
	 * Exemple : contextPrefix = XtendedSearch.Indexation / queueName = Users
	 * 					 for a queue which index user in XtendedSearch
	 * @return the context prefix
	 */
	String getContextPrefix();
	
	/**
	 * Return the queue name
	 *
	 * @return the queue name
	 */
	String getQueueName();
	
	/**
	 * return the message processor list to start on the queue
	 *
	 * @return the message processor list
	 */
	List<IJMSMessageProcessor<M>> getMessageProcessors();
}

Your IJMSQueueDeclaration implementation will be taken into account by Process at start time. The only thing you need to do for this is to declare it with the following configuration key. Your can use this configuration key several times to declare multiple implementations.

com.moovapps.messaging.queue.declaration=reference.to.your.QueueDeclarationImplementation
com.moovapps.messaging.queue.declaration=reference.to.AnotherQueueDeclarationImplementation

Implementation example

Let’s imagine we want to manage events triggered on Save and Delete on two different type of entities. Following these events, we have to process some asynchronous tasks.

  • We have two type of entities: User and Organization

  • We have two type of event: Save and Delete

  • We want to transmit the identifier of the concerned entity

  • We want to process different tasks on Save and Delete event.

Declare the queue and process structured messages

So we can imagine a IJMSMessage implementation and two enums as follows:

An enum to hold the entity type

package com.moovapps.example;

public enum EntityType {
	USER,
	ORGANIZATION;
}

An enum to hold the event type

package com.moovapps.example;

public enum EventType {
	SAVE("save"),
	DELETE("delete");
	
	private final String eventPropertyValue;
	
	EventType(String eventPropertyValue) {
		this.eventPropertyValue = eventPropertyValue;
	}
	
	public String getEventPropertyValue() {
		return eventPropertyValue;
	}
}

Our IJMSMessage implementation

package com.moovapps.example;

import com.axemble.vdoc.sdk.interfaces.IJMSMessage;

public class ExampleEventMessage implements IJMSMessage {
	private final String identifier;
	
	private final EntityType entityType;
	
	private final EventType eventType;
	
	public ExampleEventMessage(String identifier, EntityType entityType, EventType eventType) {
		this.identifier = identifier;
		this.entityType = entityType;
		this.eventType = eventType;
	}
	
	@Override
	public String getReadableRepresentation() {
		return eventType + " on " + entityType + " with id " + identifier;
	}
	
	public String getIdentifier() {
		return identifier;
	}
	
	public EntityType getEntityType() {
		return entityType;
	}
	
	public EventType getEventType() {
		return eventType;
	}
}

A simple message processor to deal with delete event messages. Please, note the use of the filter to process only message with the “eventType” property with the “delete” value. We describe how to initialize this property at the end of this article.

package com.moovapps.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.axemble.vdoc.sdk.interfaces.IJMSMessageProcessor;

public class ExampleDeleteEventProcessor implements IJMSMessageProcessor<ExampleEventMessage> {
	private static final Logger LOGGER = LoggerFactory.getLogger(ExampleDeleteEventProcessor.class);
	
	@Override
	public String getMessageFilter() {
		// This processor will only receive message for the Delete event type.
		return ExampleEventQueueDeclaration.PROPERTY_NAME + " = '" + EventType.DELETE.getEventPropertyValue() + "'";
	}
	
	@Override
	public void processMessage(ExampleEventMessage exampleEventMessage) {
		// We can write our code to process each delete event here.
		StringBuilder stringBuilder = new StringBuilder();
		stringBuilder.append("I am processing a ");
		stringBuilder.append(exampleEventMessage.getEventType());
		stringBuilder.append(" on a ");
		stringBuilder.append(exampleEventMessage.getEntityType());
		stringBuilder.append(" with id ");
		stringBuilder.append(exampleEventMessage.getIdentifier());
		LOGGER.info(stringBuilder.toString());
	}
}

Another message processor to deal with save event messages using a little more features.

package com.moovapps.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.axemble.vdoc.sdk.interfaces.IJMSMessageProcessor;

public class ExampleSaveEventProcessor implements IJMSMessageProcessor<ExampleEventMessage> {
	private static final Logger LOGGER = LoggerFactory.getLogger(ExampleSaveEventProcessor.class);
	private StringBuilder stringBuilder;
	
	@Override
	public String getMessageFilter() {
		// This processor will only receive message for the Save event type.
		return ExampleEventQueueDeclaration.PROPERTY_NAME + " = '" + EventType.SAVE.getEventPropertyValue() + "'";
	}
	
	@Override
	public int getBatchSize() {
		// We want to process messages 10 by 10.
		return 10;
	}
	
	@Override
	public Integer getPerMessageSecondTimeout() {
		// Our treatment on each message should be very quick. So after 5 seconds, we allow to consider treatment has failed and abort it.
		// This ensures that the queue processing wont be blocked by a problematic message processing.
		return 5;
	}
	
	@Override
	public Integer getNumberOfParallelConsumers() {
		// We want two parallel message processors (two threads) to process Save events.
		return 2;
	}
	
	@Override
	public void beforeProcessMessages() {
        // This method is called at the begin of each lot of 10 messages. So I can initialize my StringBuilder or other object here.
		stringBuilder = new StringBuilder();
		stringBuilder.append("Starting to process a lot of Save message\n");
	}
	
	@Override
	public void processMessage(ExampleEventMessage exampleEventMessage) {
		// We can write our code to process each save event here.
		stringBuilder.append("I am processing a ");
		stringBuilder.append(exampleEventMessage.getEventType());
		stringBuilder.append(" on a ");
		stringBuilder.append(exampleEventMessage.getEntityType());
		stringBuilder.append(" with id ");
		stringBuilder.append(exampleEventMessage.getIdentifier());
	}
	
	@Override
	public void afterProcessMessages() {
        // This method is always called at the end of each lot of 10 messages. I can use my StringBuilder and write it to logs. I can close
        // previously opened files, modules, etc... 
		stringBuilder.append("Finishing to process a lot of message\n");
		LOGGER.info(stringBuilder.toString());
		stringBuilder = null;
	}
}

Our IJMSQueueDeclaration implementation. It will be use by Process to create a JMS queue and start corresponding message processors at start time.

package com.moovapps.example;

import java.util.ArrayList;
import java.util.List;

import com.axemble.vdoc.sdk.interfaces.IJMSMessageProcessor;
import com.axemble.vdoc.sdk.interfaces.IJMSQueueDeclaration;

public class ExampleEventQueueDeclaration implements IJMSQueueDeclaration<ExampleEventMessage> {
	protected static final String PROPERTY_NAME = "eventType";
	protected static final String QUEUE_CONTEXT_PREFIX = "myCurrentProject";
	protected static final String QUEUE_NAME = "exampleEvent";
	
	@Override
	public String getContextPrefix() {
		return QUEUE_CONTEXT_PREFIX;
	}
	
	@Override
	public String getQueueName() {
		return QUEUE_NAME;
	}
	
	@Override
	public List<IJMSMessageProcessor<ExampleEventMessage>> getMessageProcessors() {
		List<IJMSMessageProcessor<ExampleEventMessage>> messageProcessors = new ArrayList<>();
		
		messageProcessors.add(new ExampleSaveEventProcessor());
		messageProcessors.add(new ExampleDeleteEventProcessor());
		
		return messageProcessors;
	}
}

Then we register the ExampleEventQueueDeclaration implementation using the following configuration key. With this, the Process instance will find our IJMSQueueDeclarationImplementation, create the JMS queue, run a message processor in a thread to deals with delete event messages and two messages processors in two thread to deals with save event messages.

com.moovapps.messaging.queue.declaration=com.moovapps.example.ExampleEventQueueDeclaration

Emit structured messages to our queue

Finally, we have to be able to emit messages to this queue in order to be processed by message processors. Here is an example code in a Process agent.

package com.moovapps.example;

import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.axemble.vdoc.sdk.agent.base.BaseAgent;
import com.axemble.vdoc.sdk.exceptions.PortalModuleException;
import com.axemble.vdoc.sdk.interfaces.IJMSController;
import com.axemble.vdoc.sdk.interfaces.IJMSMessageProperty;
import com.axemble.vdoc.sdk.interfaces.IJMSQueueController;

public class ExampleEnqueueMessageAgent extends BaseAgent {
	@Override
	protected void execute() {
		try {
			// Retrieve the jmsController from the portalModule
			IJMSController jmsController = getPortalModule().getJMSController();
			
			// Create a queueController on our queue using queue prefix and name
			try (IJMSQueueController queueController = jmsController.createQueueController(ExampleEventQueueDeclaration.PROPERTY_NAME, ExampleEventQueueDeclaration.PROPERTY_NAME)) {
				ExampleEventMessage exampleEventMessage;
				// Emit 20 Save type message
				for (int i = 0; i < 20; i++) {
					if (i % 2 == 0) {
						// Create a new message for a user
						exampleEventMessage = new ExampleEventMessage("id-" + i,EntityType.USER,EventType.SAVE);
						
						// We consider user's save events should be processed with higher priority so we set the priority to 5 in this case.
						queueController.setPriority(5);
					} else {
						// Create a new message for an organization
						exampleEventMessage = new ExampleEventMessage("id-" + i,EntityType.ORGANIZATION,EventType.SAVE);
						
						// Restore the default priority for the organization's events
						queueController.setPriority(4);
					}
					
					// Create a property holding the event type. Only properties allow to filter message for a message processor. If you
                    // just use one message processor, you can send message without property (See the putMessage(IJMSMessage message)
                    // method).
					List<IJMSMessageProperty<?>> properties = new ArrayList<>();
					properties.add(queueController.createProperty(ExampleEventQueueDeclaration.PROPERTY_NAME, EventType.SAVE));
					
					// Send the message using the queueController
					queueController.putMessage(exampleEventMessage,properties);
				}
				
				// Restore the default priority for the next messages.
				queueController.setPriority(4);
				
				// Emit 20 Delete type message
				for (int i = 20; i < 40; i++) {
					if (i % 2 == 0) {
						// Create a new message for a user
						exampleEventMessage = new ExampleEventMessage("id-" + i,EntityType.USER,EventType.DELETE);
					} else {
						// Create a new message for an organization
						exampleEventMessage = new ExampleEventMessage("id-" + i,EntityType.ORGANIZATION,EventType.DELETE);
					}
					
					// Create a property holding the event type. Only properties allow to filter message for a message processor. If you 
                    // just use one message processor, you can send message without property (See the putMessage(IJMSMessage message)
                    // method).
					List<IJMSMessageProperty<?>> properties = new ArrayList<>();
					properties.add(queueController.createProperty(ExampleEventQueueDeclaration.PROPERTY_NAME, EventType.DELETE));
					
					// Send the message using the queueController
					queueController.putMessage(exampleEventMessage,properties);
				}
			}
		} catch (PortalModuleException | IOException | JMSException e) {
			throw new IllegalStateException(e);
		}
	}
}