Unify and manage your data

Show Page Sections

Examples for Consumer Implementation

Various examples of consumer implementations are added as following.

Amazon Simple Queue Service(SQS)

Maven Dependencies

<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sns</artifactId> <version>1.11.180</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>

SQS Consumer

import java.util.ArrayList; import java.util.List; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.BatchResultErrorEntry; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; public class SqsConsumer { private static final Logger logger = LogManager.getLogger(SqsConsumer.class); public static void main(String[] args) { String accessKey = "AccessKey"; String secretKey = "SecretKey"; String region = "us-east-1"; //Create SQS client: AWSCredentialsProvider provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); AmazonSQS sqs = AmazonSQSClient.builder().withCredentials(provider).withRegion(region).build(); //Resolve queue URL by name: String queueUrl = sqs.getQueueUrl("QueueName").getQueueUrl(); //Create and start poller: SqsPoller poller = new SqsPoller(queueUrl, sqs); poller.start(); //... //Stop eventually: poller.stopRunning(); } private static class SqsPoller extends Thread { private final String queueUrl; private final AmazonSQS sqs; private final ObjectMapper mapper = new ObjectMapper(); private boolean keepRunning = true; public SqsPoller(String queueUrl, AmazonSQS sqs) { this.queueUrl = queueUrl; this.sqs = sqs; } @Override public void run() { while (keepRunning) { performReceive(); } } public void stopRunning() { this.keepRunning = false; } private void performReceive() { //Request a batch with up to 10 messages (SQS limitation): ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10); List<Message> messages = sqs.receiveMessage(request).getMessages(); List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>(); int item = 0; for (Message message : messages) { // Invoke a method to process the message: onMessage(message); //Collect the receipts for successfully processed messages so they can be acknowledged at the end: toDelete.add(new DeleteMessageBatchRequestEntry(String.valueOf(++item), message.getReceiptHandle())); } deleteMessages(toDelete); } private void deleteMessages(List<DeleteMessageBatchRequestEntry> toDelete) { if (!toDelete.isEmpty()) { try { DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl, toDelete); DeleteMessageBatchResult result = sqs.deleteMessageBatch(request); for (BatchResultErrorEntry error: result.getFailed()) { logger.error("Cannot acknowledge message: " + error.getMessage()); } } catch (Exception e) { logger.error("Cannot acknowledge messages batch", e); } } } private void onMessage(Message message) { try { JsonNode json = mapper.readTree(message.getBody()); String eventType = json.findValue("type").asText(); switch (eventType) { case "ENTITY_CREATED": case "ENTITY_CHANGED": //Handle in some way: break; case "RELATIONSHIP_CREATED": case "RELATIONSHIP_CHANGED": //Handle in another way: break; //... default: //Ignore others; } } catch (Exception e) { logger.error(e); } } } }

Google PubSub:

Maven Dependencies

<dependencies> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsub</artifactId> <version>0.40.0-beta</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>

PubSub Consumer

import java.io.FileInputStream; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.threeten.bp.Duration; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.auth.Credentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; public class PubSubConsumer { private static final Logger logger = LogManager.getLogger(PubSubConsumer.class); public static void main(String[] args) throws Exception { //Load credentials from previously exported ServiceAccount key file: Credentials credentials = ServiceAccountCredentials.fromStream(new FileInputStream("PathToServiceAccountKeyFile")); CredentialsProvider provider = FixedCredentialsProvider.create(credentials); //Create a subscription reader: ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("projectName", "subscriptionId"); Subscriber subscriber = Subscriber.newBuilder(subscriptionName, new MessageHandler()). setCredentialsProvider(provider).setMaxAckExtensionPeriod(Duration.ofMinutes(1L)).build(); subscriber.startAsync();