Examples for Consumer Implementation
Various examples of consumer implementations are added as following.
Amazon Simple Queue Service(SQS)
Maven Dependencies
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
<version>1.11.180</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
SQS Consumer
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SqsConsumer {
private static final Logger logger = LogManager.getLogger(SqsConsumer.class);
public static void main(String[] args) {
String accessKey = "AccessKey";
String secretKey = "SecretKey";
String region = "us-east-1";
//Create SQS client:
AWSCredentialsProvider provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
AmazonSQS sqs = AmazonSQSClient.builder().withCredentials(provider).withRegion(region).build();
//Resolve queue URL by name:
String queueUrl = sqs.getQueueUrl("QueueName").getQueueUrl();
//Create and start poller:
SqsPoller poller = new SqsPoller(queueUrl, sqs);
poller.start();
//...
//Stop eventually:
poller.stopRunning();
}
private static class SqsPoller extends Thread {
private final String queueUrl;
private final AmazonSQS sqs;
private final ObjectMapper mapper = new ObjectMapper();
private boolean keepRunning = true;
public SqsPoller(String queueUrl, AmazonSQS sqs) {
this.queueUrl = queueUrl;
this.sqs = sqs;
}
@Override
public void run() {
while (keepRunning) {
performReceive();
}
}
public void stopRunning() {
this.keepRunning = false;
}
private void performReceive() {
//Request a batch with up to 10 messages (SQS limitation):
ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10);
List<Message> messages = sqs.receiveMessage(request).getMessages();
List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>();
int item = 0;
for (Message message : messages) {
// Invoke a method to process the message:
onMessage(message);
//Collect the receipts for successfully processed messages so they can be acknowledged at the end:
toDelete.add(new DeleteMessageBatchRequestEntry(String.valueOf(++item), message.getReceiptHandle()));
}
deleteMessages(toDelete);
}
private void deleteMessages(List<DeleteMessageBatchRequestEntry> toDelete) {
if (!toDelete.isEmpty()) {
try {
DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl, toDelete);
DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
for (BatchResultErrorEntry error: result.getFailed()) {
logger.error("Cannot acknowledge message: " + error.getMessage());
}
} catch (Exception e) {
logger.error("Cannot acknowledge messages batch", e);
}
}
}
private void onMessage(Message message) {
try {
JsonNode json = mapper.readTree(message.getBody());
String eventType = json.findValue("type").asText();
switch (eventType) {
case "ENTITY_CREATED":
case "ENTITY_CHANGED":
//Handle in some way:
break;
case "RELATIONSHIP_CREATED":
case "RELATIONSHIP_CHANGED":
//Handle in another way:
break;
//...
default:
//Ignore others;
}
} catch (Exception e) {
logger.error(e);
}
}
}
}
Google PubSub:
Maven Dependencies
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>0.40.0-beta</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
PubSub Consumer
import java.io.FileInputStream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.threeten.bp.Duration;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
public class PubSubConsumer {
private static final Logger logger = LogManager.getLogger(PubSubConsumer.class);
public static void main(String[] args) throws Exception {
//Load credentials from previously exported ServiceAccount key file:
Credentials credentials = ServiceAccountCredentials.fromStream(new FileInputStream("PathToServiceAccountKeyFile"));
CredentialsProvider provider = FixedCredentialsProvider.create(credentials);
//Create a subscription reader:
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("projectName", "subscriptionId");
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, new MessageHandler()).
setCredentialsProvider(provider).setMaxAckExtensionPeriod(Duration.ofMinutes(1L)).build();
subscriber.startAsync();