Last week I built OTP applications out of my Twitter Fetcher and Unshortener and connected them together using Stream
s. This week I’ll continue development of the Domain Scrapper system by writing an application to fetch comments from reddit.
Initial Setup
The first thing I had to do was the familiarize myself with the reddit API. I read over the documentation at https://www.reddit.com/dev/api. The API endpoints I need are:
/r/#{subreddit}/new
- A listing of new links (posts)/r/#{subreddit}/new
- A listing of hot links/r/#{subreddit}/comments/#{id}
- A listing of the comments for a specific link
I also saw that I needed to authenticate requests using OAuth.
After getting prepared to use the reddit API I started setting up my project. I created a new app in my umbrella project called “reddit” and got to work.
Iterations
I went through several iterations in developing this Fetcher app.
Fetch Posts, Then Comments
My first thought was to fetch new posts and then fetch comments for those posts. Here’s the first iteration that I built:
defmodule Reddit do
def get_oauth_token do
request_oauth_token().body
|> Poison.decode
|> ok
|> Map.get("access_token")
end
defp request_oauth_token do
cfg = config
HTTPotion.post "https://www.reddit.com/api/v1/access_token", [
body: 'grant_type=password&username=#{cfg[:user]}&password=#{cfg[:pass]}',
headers: [
"User-Agent": "josephkain-test",
"Content-Type": "application/x-www-form-urlencoded"
],
basic_auth: {cfg[:client_id], cfg[:secret]}
]
end
defp config do
%{
user: System.get_env("REDDIT_USER"),
pass: System.get_env("REDDIT_PASSWORD"),
client_id: System.get_env("REDDIT_CLIENT_ID"),
secret: System.get_env("REDDIT_SECRET")
}
end
def get_new(token, subreddit, opts \\ []) do
request("/r/#{subreddit}/new", token, opts)
end
def get_comments(token, subreddit, id) do
request("/r/#{subreddit}/comments/#{id}", token, [limit: 100])
end
defp request(endpoint, token, opts) do
HTTPotion.get("https://oauth.reddit.com/" <> endpoint <> query(opts), [headers: [
"User-Agent": "josephkain-test/0.1 by josephkain",
"Authorization": "bearer #{token}"
]])
|> Map.get(:body)
|> Poison.decode
|> ok
end
defp query(opts) do
string = opts
|> Enum.map(fn {key, value} -> "#{key}=#{value}" end)
|> Enum.join("&")
"?" <> string
end
defp ok({:ok, result}), do: result
def fetch_100_new(token, sub, opts) do
result = get_new(token, sub, [limit: 100] ++ opts)
{result["data"]["children"], result["data"]["after"]}
end
def fetch_new_perpertually(token, sub) do
Stream.resource(fn -> [] end,
fn next -> fetch_100_new(token, sub, [after: next]) end,
fn _ -> true end)
end
def test do
sub = "programming"
token = get_oauth_token
fetch_new_perpertually(token, sub)
|> Stream.map(fn item -> item["data"]["id"] end)
|> Stream.map(fn id -> get_comments(token, sub, id) end)
|> Stream.map(fn item -> IO.inspect item end)
|> Stream.run
end
end
There’s a lot here. Let’s start with the test/0
function at the end.
We start by getting an OAuth token by making a POST request to the reddit endpoint. Then we use this token in subsequent work. The idea is to generate a stream of new posts from /r/programming, reusing the ideas from Stream Patterns in Elixir 3. Then, with that stream we fetch the comments from those posts.
This iteration worked alright but I realized that I needed to do two things differently.
First, I need to rate limit to one request per second.
Second, the way I’ve structured the fetch doesn’t really pick up all the comments. I fetch only the newest posts and comments for those posts. But I’ll miss comments made to older posts. What I really want to do is to just fetch new comments regardless of the post they are attached to. But, I don’t think this is available in the API.
Instead, I could try
- fetch posts and extract {ids, post date} into some table.
- Continuously scan over the table and fetch new comments.
- Save last seen comment so I can only fetch new comments.
Another, simpler idea might be to:
- Periodically fetch hot posts.
- Fetch comments from those hot posts.
This won’t fetch all comments either, but it will fetch the most important ones.
Fetch Hot Posts and Comments
I’m going to go ahead with the idea to fecth the hot posts and comments but, I have to figure out how to structure the code. My first idea was
fetch_hot_perpertually
|> Stream.map(fn item -> item["data"]["id"] end)
|> lookup_id(from_some_Agent) # maps to {id, next}
|> Stream.map(fn {id, next} -> get_comments(token, sub, id, after: next) end)
|> Stream.map(fn item -> IO.inspect item end)
|> Stream.run
This would use an Agent
to store the next
token that would let me remember the last comment I had seen for a particular post.
But, thinking about it more, I think I need to merge the first two transforms so I can update the Agent
at the same time. That is, the Agent
needs to hold the set of last hot items. More like this:
fetch_hot
Agent.get_and_update(agent, fn old_hot -> f(old_hot, hot) end) # Return list of {id, next}
|> Stream.map(fn {id, next} -> get_comments(token, sub, id, after: next) end)
|> Stream.map(fn item -> IO.inspect item end)
|> Stream.run
But even this doesn’t seem right. Continuing to think about it, the aggregator, which I haven’t designed yet, has to be responsible for rejecting duplicates. Trying to track hot items will cause my tracking structure will grow without bound. I could try to age items out but of the list that’s not guaranteed. The aggregator also has a slight advantage in that it only has to worry about items that pass our filter (e.g. items that refer to joekain.com).
This leads me to:
def test do
sub = "programming"
token = get_oauth_token
token
|> fetch_hot_perpertually(sub)
|> Stream.map(fn item -> item["data"]["id"] end)
|> Stream.flat_map(fn id -> get_comments(token, sub, id) end)
|> Stream.flat_map(fn item -> item["data"]["children"] end)
|> Stream.map(fn item -> item["data"]["body"] || item["data"]["url"] end)
|> Stream.map(fn item -> IO.inspect item end)
|> Stream.run
end
This works the way I expected it to and the code is essentially what I started with when fetching new posts. I’ve switched it over to fetching hot posts instead.
I’ve also added a new transformation:
Stream.map(fn item -> item["data"]["body"] || item["data"]["url"] end)
This extracts out the body of the comment rather than keeping the whole record. There are two cases here:
item["data"]["body"]
handles regular comments and extracts the text of the comment.item["data"]["url"]
handles the links (posts) themselves which have no body. But they have a URL which we extract. reddit includes the original post with the comments. We will make more use of this fact later on.
Rate Limiting
The next thing to do is to rate limit requests to one per second. To do this, I need a server to handle making all the requests. That way, the server can maintain state about the request rate across all requests. Here are my requirements for the server:
- This server needs to take requests and put them in a queue
- Add timer to send a event once per second
- The
handle_info
callback will be called when the timer fires. - In
handle_info
, pop an item from the queue and make the request
I coded this up as a GenServer. I won’t post the entire server because there’s a lot there. I ended up moving everything related to reddit requests to this module. This includes the OAauth setup and configuration. The server maintains a queue as it’s state.
This is a standard Erlang queue from the :queue
module.
The key functions in the server are handle_call
and handle_info
:
def handle_call({:request, params}, from, queue) do
{:noreply, :queue.in({from, params}, queue)}
end
handle_call({:request, params}, from, queue)
pushes a tuple into a queue. The tuple contains two items, from
and params
. from
is the PID of the process that called the request/3
function. Our handle_call
doesn’t reply so we owe the caller a reply later. That’s why we push the PID, from
, into the queue. Once we process the request we’ll use from
to send a reply back.
The option params
has the parameters for the reddit request. We push these into the queue as well and will use them when we actually issue the request.
In init/1
we setup a timer
def init(_) do
:timer.send_interval(@delay_seconds * 1000, :tick)
{:ok, :queue.new}
end
based on the constants @delay_seconds
. We also create the queue and set it as the Server’s state.
When the timer fires the handle_info/2
callback will be called.
def handle_info(:tick, queue) do
process_pop(:queue.out(queue))
end
Here we pop the {from, params}
tuple and pass it to process_pop/1
:
defp process_pop({:empty, queue}), do: {:noreply, queue}
defp process_pop(}, queue}) do
GenServer.reply(from, reddit_request(endpoint, token, opts))
{:noreply, queue}
end
defp reddit_request(endpoint, token, opts) do
HTTPotion.get("https://oauth.reddit.com/" <> endpoint <> query(opts), [headers: [
"User-Agent": "josephkain-test/0.1 by josephkain",
"Authorization": "bearer #{token}"
]])
|> Map.get(:body)
|> Poison.decode
end
This code extracts the params, calls through to the reddit API using HTTPotion and decodes the result. Then we use GenServer.reply/2
to send the result back to our original caller, from
.
One thing I learned while building this iteration was that I can’t use ExActor for this kind of GenServer. ExActor doesn’t maintain the from
argument to handle_call
. This makes sense for ExActor because it is designed to be simple and allow you to build fairly standard GenServers. But, occasionally I’ve found that I need to build servers that defer sending a response and in this case I have to write the server out by hand.
Structure as Fetcher and Parser
The last step is to structure all this code following the Fetcher and Parser patterns I’ve used for Twitter.
Ideally, I’d create a Protocol here. But before doing that I should finish implementing two Fetchers and two Parsers so that I make sure that I have the right interface.
To create a Fetcher I replaced the Reddit.test/0
function with a Reddit.fetch/0
function like this:
@spec fetch :: Enumerable.t
def fetch do
sub = "programming"
token = get_oauth_token
token
|> fetch_hot_perpertually(sub)
|> Stream.map(fn item -> item["data"]["id"] end)
|> Stream.flat_map(fn id -> get_comments(token, sub, id) end)
|> Stream.flat_map(fn item -> item["data"]["children"] end)
end
I’ve removed a few stages of this pipeline, namely:
|> Stream.map(fn item -> item["data"]["body"] || item["data"]["url"] end)
As I described before, this line parses out text or URLs. This code belongs in the Parser.
|> Stream.map(fn item -> IO.inspect item end)
This was just for testing and can be removed.
|> Stream.run
This made sense for testing but for the whole Domain Scrapper this belongs at a higher level.
The next step was to write up a Parser:
defmodule Reddit.Parser do
@uri_regex ~r<https*://[^\s]+>
@typep comment_t :: map()
@spec text(comment_t) :: String.t
def text(comment) do
get_body(comment)
end
@spec urls(comment_t) :: [ String.t ]
def urls(comment) do
comment
|> get_body
|> parse_urls
end
@spec get_body(comment_t) :: String.t
defp get_body(comment) do
comment["data"]["body"] || comment["data"]["url"] || ""
end
@spec parse_urls(String.t) :: String.t
defp parse_urls(body) do
@uri_regex
|> Regex.scan(body)
|> List.flatten
end
end
This code simply uses a regexp to extract URLs from the comment bodies. Note
that the function get_body/1
handles several cases
- The comment could be a true comment, in which
body
contains a bunch of text. - The comment could actually be the original link for the post. The reddit API returns this along with the comments. In this case the “body” field is nil and we will use
comment["data"]["url"]
. This will match the regexp and give us the link. - I’ve seen a few cases where there is no data in which case
get_body/1
returns""
which just won’t match the regexp causing the comment will be ignored.
Finally, we have a GenServer to run the Fetcher / Parser part of the pipeline. This will allow us to create a Reddit OTP application:
defmodule Reddit.Server do
use ExActor.GenServer, export: {:global, __MODULE__}
defstart start_link() do
stream = Reddit.Fetcher.fetch
|> Stream.flat_map(fn tweet -> Reddit.Parser.urls(tweet) end)
initial_state(stream)
end
defcall get, state: stream, do: reply(stream)
end
This is similar to the Twitter code we have. We start up a process that puts the Fetcher / Parser pipeline together to generate a Stream
of URLs. The Stream
can be retrieved with the get
function.
Next Steps
Now that we have two Fetchers (Twitter and reddit) we can stream both through the Unshortener. We can use these to prove out the architecture and interfaces we’ve designed. This will be the topic of the next post.