Accelerate the Value of Data

Examples for Consumer Implementation

Various examples of consumer implementations are added as following.

Amazon Simple Queue Service(SQS)

Maven Dependencies


    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-sns</artifactId>
            <version>1.11.180</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
</dependencies>

SQS Consumer

import java.util.ArrayList;
import java.util.List;
 
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
 
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
 
public class SqsConsumer {
 
    private static final Logger logger = LogManager.getLogger(SqsConsumer.class);
 
    public static void main(String[] args) {
        String accessKey = "AccessKey";
        String secretKey = "SecretKey";
        String region = "us-east-1";
 
        //Create SQS client:
        AWSCredentialsProvider provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
        AmazonSQS sqs = AmazonSQSClient.builder().withCredentials(provider).withRegion(region).build();
 
        //Resolve queue URL by name:
        String queueUrl = sqs.getQueueUrl("QueueName").getQueueUrl();
 
        //Create and start poller:
        SqsPoller poller = new SqsPoller(queueUrl, sqs);
        poller.start();
 
        //...
        //Stop eventually:
        poller.stopRunning();
    }
     private static class SqsPoller extends Thread {
        private final String queueUrl;
        private final AmazonSQS sqs;
        private final ObjectMapper mapper = new ObjectMapper();
        private boolean keepRunning = true;
 
        public SqsPoller(String queueUrl, AmazonSQS sqs) {
            this.queueUrl = queueUrl;
            this.sqs = sqs;
        }
 
        @Override
        public void run() {
            while (keepRunning) {
                performReceive();
            }
        }
 
        public void stopRunning() {
            this.keepRunning = false;
        }
 
        private void performReceive() {
            //Request a batch with up to 10 messages (SQS limitation):
            ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10);
            List<Message> messages = sqs.receiveMessage(request).getMessages();
 
            List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>();
            int item = 0;
            for (Message message : messages) {
                // Invoke a method to process the message:
                onMessage(message);
 
                //Collect the receipts for successfully processed messages so they can be acknowledged at the end:
                toDelete.add(new DeleteMessageBatchRequestEntry(String.valueOf(++item), message.getReceiptHandle()));
            }
            deleteMessages(toDelete);
        }
 
        private void deleteMessages(List<DeleteMessageBatchRequestEntry> toDelete) {
            if (!toDelete.isEmpty()) {
                try {
                    DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl, toDelete);
                    DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
                    for (BatchResultErrorEntry error: result.getFailed()) {
                        logger.error("Cannot acknowledge message: " + error.getMessage());
                    }
                } catch (Exception e) {
                    logger.error("Cannot acknowledge messages batch", e);
                }
            }
        }
         private void onMessage(Message message) {
            try {
                JsonNode json = mapper.readTree(message.getBody());
                String eventType = json.findValue("type").asText();
                switch (eventType) {
                    case "ENTITY_CREATED":
                    case "ENTITY_CHANGED":
                        //Handle in some way:
                        break;
                    case "RELATIONSHIP_CREATED":
                    case "RELATIONSHIP_CHANGED":
                        //Handle in another way:
                        break;
                    //...
                    default:
                        //Ignore others;
                }
            } catch (Exception e) {
                logger.error(e);
            }
        }
    }
} 

Google PubSub:

Maven Dependencies

<dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-pubsub</artifactId>
            <version>0.40.0-beta</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
</dependencies>    

PubSub Consumer

import java.io.FileInputStream;
 
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.threeten.bp.Duration;
 
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
 
public class PubSubConsumer {
 
    private static final Logger logger = LogManager.getLogger(PubSubConsumer.class);
 
    public static void main(String[] args) throws Exception {
        //Load credentials from previously exported ServiceAccount key file:
        Credentials credentials = ServiceAccountCredentials.fromStream(new FileInputStream("PathToServiceAccountKeyFile"));
        CredentialsProvider provider = FixedCredentialsProvider.create(credentials);
 
        //Create a subscription reader:
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("projectName", "subscriptionId");
        Subscriber subscriber = Subscriber.newBuilder(subscriptionName, new MessageHandler()).
            setCredentialsProvider(provider).setMaxAckExtensionPeriod(Duration.ofMinutes(1L)).build();
        subscriber.startAsync();