pip install tweepy==3.3.0 pip install kafka
/root/confluent-5.1.2/bin/kafka-console-consumer --bootstrap-server 159.203.83.126:9092 --topic twitterer_data --from-beginning
import tweepy import time from kafka import KafkaConsumer, KafkaProducer # twitter setup consumer_key = "kufNUXwBYDBtL3PvzqRsdte" consumer_secret = "CtIIQVPZe0li29e0gDbOpzdajv39YVsdNUrVUMWf8RUm5Ohj" access_token = "92658086742999449dsmbxggzNLFIRTXWcSFqFM250sdsT" access_token_secret = "r3k2wrcl57dsdAIDVBHedsds7raUbFNsTDXCSzCz4JRoC" # Creating the authentication object auth = tweepy.OAuthHandler(consumer_key, consumer_secret) # Setting your access token and secret auth.set_access_token(access_token, access_token_secret) # Creating the API object by passing in auth information api = tweepy.API(auth) from datetime import datetime, timedelta def normalize_timestamp(time): mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S") mytime += timedelta(hours=1) # the tweets are timestamped in GMT timezone, while I am in +1 timezone return (mytime.strftime("%Y-%m-%d %H:%M:%S")) producer = KafkaProducer(bootstrap_servers='localhost:9092') topic_name = 'twitterer_data' def get_twitter_data(): res = api.search("Apple OR iphone OR iPhone") for i in res: record = '' record += str(i.user.id_str) record += ';' record += str(normalize_timestamp(str(i.created_at))) record += ';' record += str(i.user.followers_count) record += ';' record += str(i.user.location) record += ';' record += str(i.favorite_count) record += ';' record += str(i.retweet_count) record += ';' producer.send(topic_name, str.encode(record)) get_twitter_data() def periodic_work(interval): while True: get_twitter_data() #interval should be an integer, the number of seconds to wait time.sleep(interval) periodic_work(60 * 0.1) # get data every couple of minutes