How to Create realtime twitter pipeline using kafka tweepy and python jupyter notebook cloudera

1: Kafka cluster should up and running

Confluent kafka dashboard

2: Loing to twitter developer account and check following

twitter secret key and token key for kafka pipeline

3: Install python only or with jupyter notebook

jupyter notebook for kafka twitter pipeline

3: Install below twitter and python packages

pip install tweepy==3.3.0
pip install kafka


Now open terminal and start kafka consumer

start kafka consumer for twitter

/root/confluent-5.1.2/bin/kafka-console-consumer --bootstrap-server --topic twitterer_data --from-beginning


Python full code you can file or using jupyter notebook you can tun

import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

# twitter setup
consumer_key = "kufNUXwBYDBtL3PvzqRsdte"
consumer_secret = "CtIIQVPZe0li29e0gDbOpzdajv39YVsdNUrVUMWf8RUm5Ohj"
access_token = "92658086742999449dsmbxggzNLFIRTXWcSFqFM250sdsT"
access_token_secret = "r3k2wrcl57dsdAIDVBHedsds7raUbFNsTDXCSzCz4JRoC"
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth)

from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S"))

producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'twitterer_data'

def get_twitter_data():
    res ="Apple OR iphone OR iPhone")
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))


def periodic_work(interval):
    while True:
        #interval should be an integer, the number of seconds to wait

periodic_work(60 * 0.1)  # get data every couple of minutes

Run jupyter notebook and check consumer terminal

run jupyter notebook python kafka for twitter

Getting twitter data into terminal

twitter data using kafka python for twitter
