Kafka_Mongo_Integration
Sample POC on Kafka which will pull tweets from twitter and persist in to the MongoDB
Technologies:
For full code
https://github.com/BrahmaR/KafkaMongoIntegration
Kafka Producer:
package com.royal.twitter.kafka;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
/**
* Created by brahma on 22/12/16.
*/
public class TwitterProducer {
private static final Logger LOGGER = LoggerFactory
.getLogger(TwitterProducer.class);
private static final String BROKER_LIST = "kafka.broker.list";
private static final String CONSUMER_KEY = "consumerKey";
private static final String CONSUMER_SECRET = "consumerSecret";
private static final String ACCESS_TOKEN = "accessToken";
private static final String ACCESS_TOKEN_SECRET = "accessTokenSecret";
private static final String KAFKA_TOPIC = "kafka.twitter.raw.topic";
public static void runProducer(Context context) throws InterruptedException {
// Producer properties
Properties properties = new Properties();
properties.put("metadata.broker.list", context.getString(BROKER_LIST));
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(properties);
final Producer<String, String> producer = new Producer<String, String>(
producerConfig);
// Create an appropriately sized blocking queue
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
// create endpoint
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Lists.newArrayList("twitterapi", "#love"));
Authentication auth = new OAuth1(context.getString(CONSUMER_KEY),
context.getString(CONSUMER_SECRET),
context.getString(ACCESS_TOKEN),
context.getString(ACCESS_TOKEN_SECRET));
// create client
BasicClient client = new ClientBuilder()
// .name("twitterClient")
.hosts(Constants.STREAM_HOST).endpoint(endpoint)
.authentication(auth)
.processor(new StringDelimitedProcessor(queue)).build();
// Establish a connection
client.connect();
for (int msgRead = 0; msgRead < 1000; msgRead++) {
if (client.isDone()) {
LOGGER.info("Client connection closed unexpectedly: "
+ client.getExitEvent().getMessage());
break;
}
KeyedMessage<String, String> message = null;
try {
message = new KeyedMessage<String, String>(
context.getString(KAFKA_TOPIC), queue.take());
LOGGER.info(message.toString());
System.out.println(message);
} catch (InterruptedException ie) {
LOGGER.error(ie.getMessage());
}
// send the message to the producer
producer.send(message);
}
producer.close();
client.stop();
// Print some stats
LOGGER.info("The client read %d messages!\n", client.getStatsTracker()
.getNumMessages());
}
public static void main(String[] args) {
try {
Context context = new Context("config.properties");
TwitterProducer.runProducer(context);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
MongoDBKafka Consumer:
package com.royal.twitter.kafka;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.bson.Document;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
/**
* Created by brahma on 22/12/16.
*/
public class MongoDBSimpleConsumer {
private static final String BROKER_LIST = "kafka.broker.list";
private static final String HASHTAG_COLLECTION = "hashtag.collection";
private static final String DB_NAME = "db.name";
static Context context = null;
public static void main(String args[]) {
MongoDBSimpleConsumer example = new MongoDBSimpleConsumer();
// long maxReads = Long.parseLong(args[0]);
long maxReads = 1000;
// String topic = args[1];
String topic = "kafka.twitter.raw.topic";
// int partition = Integer.parseInt(args[2]);
int partition = 0;
// seeds.add(args[3]);
List<String> seeds = null;
int port = 0;
try {
context = new Context("config.properties");
seeds = new ArrayList<String>();
String host = context.getString(BROKER_LIST);
seeds.add(host.substring(0, host.indexOf(":")));
// int port = Integer.parseInt(args[4]);
port =Integer.parseInt(host.substring(host.indexOf(":")+1, host.length()));
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
example.run(maxReads, context.getString(topic), partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
}
}
private List<String> m_replicaBrokers = new ArrayList<String>();
public MongoDBSimpleConsumer() {
m_replicaBrokers = new ArrayList<String>();
}
public void run(long a_maxReads, String a_topic, int a_partition,
List<String> a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in
//
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
a_partition);
if (metadata == null) {
System.out
.println("Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
System.out
.println("Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000,
64 * 1024, clientName);
}
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask
// for the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
a_port);
continue;
}
numErrors = 0;
long numRead = 0;
// Inserting data to the Mongo Db
MongoClient client = new MongoClient();
MongoDatabase db = client.getDatabase(context.getString(DB_NAME));
MongoCollection<Document> tweetCollection = db
.getCollection(context.getString(HASHTAG_COLLECTION));
Gson gson = new Gson();
Type type = new TypeToken<TwitterHashTagBean>() {
}.getType();
//Type userType = new TypeToken<User>() {
//}.getType();
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset
+ " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
//User user = gson.fromJson(new String(
//bytes, "UTF-8"), userType);
TwitterHashTagBean incomingTweet = gson.fromJson(new String(
bytes, "UTF-8"), type);
//incomingTweet.setUser(user);
System.out.println(incomingTweet);
tweetCollection.insertOne(incomingTweet.getTweetAsDocument());
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
if (consumer != null)
consumer.close();
}
public static long getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private String findNewLeader(String a_oldLeader, String a_topic,
int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper
// a second to recover second time, assume the broker did
// recover before failover,
// or it was a non-Broker issue
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
System.out
.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception(
"Unable to find new leader after Broker failure. Exiting");
}
private PartitionMetadata findLeader(List<String> a_seedBrokers,
int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
"leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed
+ "] to find Leader for [" + a_topic + ", "
+ a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
}
Sample Tweet bean:
package com.royal.twitter.kafka;
import org.bson.Document;
public class TwitterHashTagBean {
private String created_at;
private String id_str;
private String text;
private String source;
/*private User user;*/
public TwitterHashTagBean(String created_at, String id_str, /*User user,*/ String text,
String source) {
super();
this.created_at = created_at;
this.id_str = id_str;
/*this.user = user;*/
this.text = text;
this.source = source;
}
@Override
public String toString() {
return "TwitterHashTagBean [created_at=" + created_at + ", id_str=" + id_str
+ ", text=" + text + ", source=" + source + "]";
}
/*public User getUser() {
return user;
}
public void setUser(User user) {
this.user = user;
}*/
public String getCreated_at() {
return created_at;
}
public void setCreated_at(String created_at) {
this.created_at = created_at;
}
public String getId_str() {
return id_str;
}
public void setId_str(String id_str) {
this.id_str = id_str;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public Document getTweetAsDocument() {
Document twitterHashTagDocument = new Document(
"created_at", getCreated_at())
.append("id_str", getId_str())
/* .append("user", getUser())*/
.append("text", getText())
.append("source", getSource());
return twitterHashTagDocument;
};
}
Comments
Post a Comment