Flashback to several years ago when I was using Twitter and learn text mining. At that time, I used python to collects twitter statuses with specific keywords / terms defined and then stored all of those statuses to csv as dataset for text mining learning. In this article, I will share how to stream twitter statuses with python using tweepy module but in this article, I only print the status without store it to a file or database.
Requirements
- Python 3, you might use Python 2. But I recommend to use Python 3 since Python 2 going to reach its EOL.
tweepy
module, install usingpip install tweepy
- Twitter Apps key and tokens, create your in here
Go to the Code
Here is my code to stream tweets in twitter:
from tweepy.streaming import StreamListener
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy import API
import os
import time
CONSUMER_KEY = os.environ['TWITTER_CONSUMER_KEY']
CONSUMER_SECRET = os.environ['TWITTER_CONSUMER_SECRET']
ACCESS_TOKEN = os.environ['TWITTER_ACCESS_TOKEN']
ACCESS_TOKEN_SECRET = os.environ['TWITTER_ACCESS_TOKEN_SECRET']
KEYWORDS = ['cianjur']
class TweetListener(StreamListener):
def __init__(self, batch_size):
super(TweetListener, self).__init__()
self.tweets = []
self.batch_size = batch_size
self.count = 0
def on_status(self, status):
if status.retweeted or ('RT' in str(status.text).split(' ')):
return
self.tweets.append(status)
if len(self.tweets) >= self.batch_size:
self.tweets = []
print("sleep 5 secs")
time.sleep(5)
self.count = self.count + 1
print(status.text)
return True
def on_error(self, status_code):
if status_code == 420:
print("Rate Limited")
return False
def auth_twitter():
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
return auth
if __name__ == '__main__':
auth = auth_twitter()
api = API(auth, wait_on_rate_limit=True)
stream_listener = TweetListener(batch_size=10)
stream = Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=KEYWORDS)
The code itself is quite simple and straightforward, let’s move to next section for more details.
Explanation
from tweepy.streaming import StreamListener
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy import API
import os
import time
First part are importing required modules to be used in the process. StreamListener
is a listener that will
receives all messages routed by Stream
established session. OAuthHandler
used for authentication purpose.
The API
is twitter API wrapper provided by tweepy
.
CONSUMER_KEY = os.environ['TWITTER_CONSUMER_KEY']
CONSUMER_SECRET = os.environ['TWITTER_CONSUMER_SECRET']
ACCESS_TOKEN = os.environ['TWITTER_ACCESS_TOKEN']
ACCESS_TOKEN_SECRET = os.environ['TWITTER_ACCESS_TOKEN_SECRET']
KEYWORDS = ['cianjur']
The next part are constants for the script, it’s including twitter apps credentials and in this case, the stream
keywords. Please note that I retrieved the credentials from environment variables, so you need to export
the variable
to your environment by export TWITTER_xxx_xxx=content
for all vars.
class TweetListener(StreamListener):
def __init__(self, batch_size):
super(TweetListener, self).__init__()
self.tweets = []
self.batch_size = batch_size
self.count = 0
def on_status(self, status):
if status.retweeted or ('RT' in str(status.text).split(' ')):
return
self.tweets.append(status)
if len(self.tweets) >= self.batch_size:
self.tweets = []
print("sleep 5 secs")
time.sleep(5)
self.count = self.count + 1
print(status.text)
return True
def on_error(self, status_code):
if status_code == 420:
print("Rate Limited")
return False
This is the class that inherited from StreamListener
. I created __init__
method to add several class variable like
tweets
to store list of tweet, batch_size
to define maximum tweets stored in a batch. Batch useful if you’re going
to pass the tweets
to another stage, in example to messaging queue, it’s actually not useful for this article, so you
might remove it if necessary. The last are count
that will store number of tweets that already receives, useful if
you’re going to limit tweets size.
Next is on_status
in here, I basically only receive tweet that not a retweet status and print the content.
When tweets size limit the batch size, it will reset the list and sleep for five secs to prevent to many request in
specific window time there might be a better solution for this, lol.
The last is on_error
, its basically just exit if the streamer got rate limited. For more response codes, go to
here
def auth_twitter():
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
return auth
if __name__ == '__main__':
auth = auth_twitter()
api = API(auth, wait_on_rate_limit=True)
stream_listener = TweetListener(batch_size=10)
stream = Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=KEYWORDS)
The last part are auth_twitter
, this function used to authenticate twitter apps using oauth. And the
most important part is the main method. The processes are authentication, create a stream listener, establish stream
session, and finally start the stream by filter with specific keywords/terms.
Running the Code
(twitter-stream) ➜ twitter-stream pipenv run python twitter_stream/stream.py
Ini bukti dukungan masyarakat thdp penindakan koruptor yg dilakukan @KPK_RI. Maju terus KPK, rakyat mendukungmu.
https://t.co/o2y1rMWDmN
@ridwankamil muhun kang sngt wajar. Ts lami masrakat cianjur trtindas ti jmn p cecep tp t walakaya, alhamdulillaah ihdinassirootol-mustaqiim
Empat kardus dan satu koper dibawa dari Disdik Cianjur #ElshintaWeekend https://t.co/dX9BRJsmkI
Proklamasi jilid 2 Kemerdekaan dari Korupsi.
Dampak Dari penangkapan #Bupaticianjur Hampir semua yang berkaitan dengan "Cianjur Jago" dirusak " bingung akutu :( https://t.co/gqRLDmsN6H
Graduation Video
Universitas Putra Indonesia
Yayasan Pendidikan Yuyun Moeslim Taher
Cianjur, tanggal 1 Desember 20… https://t.co/qTk4E5VNDr
if everything works, you’ll see the tweets stream in your console. Please do ignore the tweets, I run it not in a good
time for cianjur
keyword. Haha