8 minutes
Writing a data pipeline in R for streaming tweets
Inspired by this datacamp post and the current events happening in Chile I decided to give it a try to make my own version of a tweet streaming script, with the intention of using it when another event of that magnitud happens and maybe getting some useful insights then, or maybe not, at least I had fun while doing this.
Setting up the structure of the script
The first thing I did was plan how to structure the script. I decided to structure separate the process in three functions, one for each letter of ETL (imaginative, ikr). The whole script would look like this:
# Extract
get_tweets <- function() {}
# Transform
transform_tweets <- function() {}
# Load
load_tweets <- function() {}
main <- function() {
get_tweets()
transform_tweets()
load_tweets()
}
# Run everything
main()
I won’t be updating the full script in-post as it would occupy too much space, you can look at the finished product here in this github repo.
Connecting to the twitter API and streaming the tweets
After designing the script it is time to start filling those empty functions, I started with the streaming of tweets as it is the logical first step.
To properly use the rtweet
package one must create a new app in the twitter developer options, find the keys and secrets and store them somewhere, after that tedious process is done I store the account and access keys/secrets into the .Renviron
file, which ended up like this:
TW_CONSUMER_KEY='wow'
TW_CONSUMER_SECRET='so secret'
TW_ACCESS_TOKEN='much private'
TW_ACCESS_SECRET='wow'
Then creating the token for the session becomes super easy and portable:
token <- rtweet::create_token(
app = 'app',
consumer_key = Sys.getenv('TW_CONSUMER_KEY'),
consumer_secret = Sys.getenv('TW_CONSUMER_SECRET'),
access_token = Sys.getenv('TW_ACCESS_TOKEN'),
access_secret = Sys.getenv('TW_ACCESS_SECRET')
)
Writing the body of the get_tweets
function was easy too, it ended up being just a wrapper around rtweet::stream_tweets
creating a unique filename for the stream data and returning it to use it in the next step. I decided to go with that approach because there is no way (at least that I found) to call functions directly to the messages received, wich could be nice to load data to the database in real time, but whatever, we don’t need real time data in the database anyway.
Here is how get_tweets
turned out:
get_tweets <- function(keys, timeout = 600, raw_data_dir = "raw_data") {
filename <- file.path(
raw_data_dir,
paste0("stream_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".json")
)
rtweet::stream_tweets(
q = keys,
timeout = timeout,
parse = FALSE,
file_name = filename,
verbose = TRUE
)
filename
}
Parsing the streams
After the E was done, it was turn for the T portion. The function reads the tweet data from the file we stored the stream in and then process it.
There is a bunch of data that comes with every tweet, I thought about saving it all but after a first look I realized that there are some fields that I don’t consider important with the intention of analyzing what the people is saying about a current live event; so I just deleted the retweets, kept only the tweets that were in spanish, processed the tweet content and the quoted content, removed duplicated tweets and returned the parsed dataframe.
The variables that I considered important for that specific use case were:
created_at
: time is always important.screen_name
: to see which users tweeted the most and what.content
: self explainatory.source
: from which device are the people tweeting from.location
: where are the people tweeting from.quoted_user
: if the tweet is a quoting somebody, who is the user quoting?quoted_content
: what did the other user tweeted (twote?) about?
The function ended up like this:
transform_tweets <- function(filename) {
df <- rtweet::parse_stream(filename, verbose=FALSE)
df <- dplyr::filter(df, !is_retweet, lang == "es")
df <- dplyr::transmute(
df,
date_created = created_at,
user = screen_name,
content = text,
source = source,
location = location,
quoted_user = quoted_screen_name,
quoted_content = quoted_text
)
df <- dplyr::mutate_at(
df,
vars(content, quoted_content),
function(text) {
text = text %>%
# coerce to lower case all the words
stringr::str_to_lower() %>%
# remove links
stringr::str_remove_all("\\s?(f|ht)(tp)(s?)(://)([^\\.]*)[\\.|/](\\S*)") %>%
# remove mentions of other users
stringr::str_remove_all("@\\w+") %>%
# remove stopwords
tm::removeWords(tm::stopwords("spanish")) %>%
# trim all whitespaces
stringr::str_squish()
}
)
df <- dplyr::filter(df, !duplicated(content))
df
}
I’m not keen on using pipes %>%
inside a function’s body but in that case it made the processing of the tweets so much legible that I couldn’t not use it. Also, I removed the mentions inside the tweets to filter out copypasted responses to multiple people.
Loading to the database
E and T ready, now for the L. Maybe the easiest part of this ETL is the loading, just connet to the database, write the processed tweets to it and close the connection, easy peasy.
load_tweets <- function(tweets, database = "tweets.db") {
conn <- DBI::dbConnect(RSQLite::SQLite(), database)
DBI::dbWriteTable(conn, "tweet_data", tweets, append = TRUE)
DBI::dbDisconnect(conn)
}
Everything seems ready now…
Oh wait, we don’t have a database…
After all the work we did to get tweets from the twitter database to ours I have just now realized I have not made a database to store our precious data.
I decided to make a function that sets up the database to avoid writing again those commands in case I decided to delete the file or if any other person would like to use the script. It is super simple, connect to the database, execute the command to create the table and disconnect, the sqlite file is ready.
setup_database <- function(database = "tweets.db") {
conn <- DBI::dbConnect(RSQLite::SQLite(), database)
DBI::dbExecute(
conn,
"CREATE TABLE tweet_data(
tweet_id INTEGER PRIMARY KEY,
date_created INTEGER,
user TEXT,
content TEXT,
source TEXT,
location TEXT,
quoted_user TEXT,
quoted_content TEXT
)"
)
DBI::dbDisconnect(conn)
}
I chose sqlite because I really haven’t used it before, the tutorial I was following used sqlite and it truly is easy AF (and fun) to set up.
Making it a CLI
Now, everything else ready, what can I do to make this script truly a usable piece of software? All my friends know that I am a linux enthusiast and that I love the terminal so much that I ditched RStudio (ahhh, the horror) in favor of nvim and the incredibly made Nvim-R plugin (More on this in a future post), so the obvious path was to make a full CLI for it, luckily for me there is a package that handles that perfectly, optparse
.
Using optparse
I added the following options to the script:
--database
: defaults to “tweets.db”, and it is the name od the database to create or connect to.--keys
: the search query for the tweets to be stramed, I used the default: “#chile,#chiledesperto,#santiago” but I intend to make it a mandatory positional argument to increase the flexibility, but that default is ok for our purposes.--initial-setup
: if this flag is used just runsetup_database
and end the script.-f
|--force-stream
: if this flag is used with--initial-setup
stream the tweets afterwards.--raw-data-dir
: defaults to “raw_data”, and its the name of the folder that will hold all the raw data streams.-c
|--stream-chunks
: the number of times the stream will be done, the default, 5, means that the ETL process will be run five times.-t
|--stream-timeout
: for how long to run the streaming of tweets, the default is 60, meaning that each stream session will be just one minute long.
One thing that I didn’t mention is that the ETL process will be run a few times, streaming, processing, loading the tweets and then automatically repeating that for a set number of times, I used the term “stream chunks” because naming is difficult, but it could be named anything else, that name is not final.
Adding safety checks and logging
We are almost ready, there are a few things that can go wrong with our script:
- The database file could exist when setting it up, and we could possibly overwrite the data we have.
- The raw data directory could not exist and then we could not save the streams to anywhere.
- There could be no tweets streamed at all, causing problems in the T and L.
For all those problems we have the solution, just check for those conditions before running the body of the corresponding function and we should be safe, there could be more possible problems lurking around but I did not find any more.
Another thing is that we don’t actually know what is part of the process is running at any given time, we just know that the console is locked, there could be a slient problem that we are not aware of just because of that, but there is always a solution: futile.logger
.
A package that adds logging capabilities similar to the python logging module but in R, what a time to be alive. Just add a few calls to flog.info
and flog.debug
and we are ready to go!
The end… ?
This is it, the data pipeline, or ETL, I feel that pipeline is just another buzzword used to mistify simple stuff; is ready, now I will play with this someday, maybe add more suff to it, change other things or add a second part to this as a sort of continuation to the idea that led me to creating it in the first place.
The code for this project is here, feel free to give me your feedback on github, or twitter :)