Joseph Kain bio photo

Joseph Kain

Professional Software Engineer learning Elixir.

Twitter LinkedIn Github

Last week I built OTP applications out of my Twitter Fetcher and Unshortener and connected them together using Streams. This week I’ll continue development of the Domain Scrapper system by writing an application to fetch comments from reddit.

Initial Setup

The first thing I had to do was the familiarize myself with the reddit API. I read over the documentation at https://www.reddit.com/dev/api. The API endpoints I need are:

  • /r/#{subreddit}/new - A listing of new links (posts)
  • /r/#{subreddit}/new - A listing of hot links
  • /r/#{subreddit}/comments/#{id} - A listing of the comments for a specific link

I also saw that I needed to authenticate requests using OAuth.

After getting prepared to use the reddit API I started setting up my project. I created a new app in my umbrella project called “reddit” and got to work.

Iterations

I went through several iterations in developing this Fetcher app.

Fetch Posts, Then Comments

My first thought was to fetch new posts and then fetch comments for those posts. Here’s the first iteration that I built:

defmodule Reddit do
  def get_oauth_token do
    request_oauth_token().body
    |> Poison.decode
    |> ok
    |> Map.get("access_token")
  end

  defp request_oauth_token do
    cfg = config

    HTTPotion.post "https://www.reddit.com/api/v1/access_token", [
      body: 'grant_type=password&username=#{cfg[:user]}&password=#{cfg[:pass]}',
      headers: [
        "User-Agent": "josephkain-test",
        "Content-Type": "application/x-www-form-urlencoded"
      ],
      basic_auth: {cfg[:client_id], cfg[:secret]}
    ]
  end

  defp config do
    %{
      user: System.get_env("REDDIT_USER"),
      pass: System.get_env("REDDIT_PASSWORD"),
      client_id: System.get_env("REDDIT_CLIENT_ID"),
      secret: System.get_env("REDDIT_SECRET")
    }
  end

  def get_new(token, subreddit, opts \\ []) do
    request("/r/#{subreddit}/new", token, opts)
  end

  def get_comments(token, subreddit, id) do
    request("/r/#{subreddit}/comments/#{id}", token, [limit: 100])
  end

  defp request(endpoint, token, opts) do
    HTTPotion.get("https://oauth.reddit.com/" <> endpoint <> query(opts), [headers: [
      "User-Agent": "josephkain-test/0.1 by josephkain",
      "Authorization": "bearer #{token}"
    ]])
    |> Map.get(:body)
    |> Poison.decode
    |> ok
  end

  defp query(opts) do
    string = opts
    |> Enum.map(fn {key, value} -> "#{key}=#{value}" end)
    |> Enum.join("&")

    "?" <> string
  end

  defp ok({:ok, result}), do: result

  def fetch_100_new(token, sub, opts) do
    result = get_new(token, sub, [limit: 100] ++ opts)
    {result["data"]["children"], result["data"]["after"]}
  end

  def fetch_new_perpertually(token, sub) do
    Stream.resource(fn -> [] end,
                    fn next -> fetch_100_new(token, sub, [after: next]) end,
                    fn _ -> true end)
  end

  def test do
    sub = "programming"

    token = get_oauth_token

    fetch_new_perpertually(token, sub)
    |> Stream.map(fn item -> item["data"]["id"] end)
    |> Stream.map(fn id -> get_comments(token, sub, id) end)
    |> Stream.map(fn item -> IO.inspect item end)
    |> Stream.run
  end
end

There’s a lot here. Let’s start with the test/0 function at the end.

We start by getting an OAuth token by making a POST request to the reddit endpoint. Then we use this token in subsequent work. The idea is to generate a stream of new posts from /r/programming, reusing the ideas from Stream Patterns in Elixir 3. Then, with that stream we fetch the comments from those posts.

This iteration worked alright but I realized that I needed to do two things differently.

First, I need to rate limit to one request per second.

Second, the way I’ve structured the fetch doesn’t really pick up all the comments. I fetch only the newest posts and comments for those posts. But I’ll miss comments made to older posts. What I really want to do is to just fetch new comments regardless of the post they are attached to. But, I don’t think this is available in the API.

Instead, I could try

  • fetch posts and extract {ids, post date} into some table.
  • Continuously scan over the table and fetch new comments.
  • Save last seen comment so I can only fetch new comments.

Another, simpler idea might be to:

  • Periodically fetch hot posts.
  • Fetch comments from those hot posts.

This won’t fetch all comments either, but it will fetch the most important ones.

Fetch Hot Posts and Comments

I’m going to go ahead with the idea to fecth the hot posts and comments but, I have to figure out how to structure the code. My first idea was

fetch_hot_perpertually
|> Stream.map(fn item -> item["data"]["id"] end)
|> lookup_id(from_some_Agent)   # maps to {id, next}
|> Stream.map(fn {id, next} -> get_comments(token, sub, id, after: next) end)
|> Stream.map(fn item -> IO.inspect item end)
|> Stream.run

This would use an Agent to store the next token that would let me remember the last comment I had seen for a particular post.

But, thinking about it more, I think I need to merge the first two transforms so I can update the Agent at the same time. That is, the Agent needs to hold the set of last hot items. More like this:

fetch_hot
Agent.get_and_update(agent, fn old_hot -> f(old_hot, hot) end) # Return list of {id, next}
|> Stream.map(fn {id, next} -> get_comments(token, sub, id, after: next) end)
|> Stream.map(fn item -> IO.inspect item end)
|> Stream.run

But even this doesn’t seem right. Continuing to think about it, the aggregator, which I haven’t designed yet, has to be responsible for rejecting duplicates. Trying to track hot items will cause my tracking structure will grow without bound. I could try to age items out but of the list that’s not guaranteed. The aggregator also has a slight advantage in that it only has to worry about items that pass our filter (e.g. items that refer to joekain.com).

This leads me to:

def test do
  sub = "programming"
  token = get_oauth_token

  token
  |> fetch_hot_perpertually(sub)
  |> Stream.map(fn item -> item["data"]["id"] end)
  |> Stream.flat_map(fn id -> get_comments(token, sub, id) end)
  |> Stream.flat_map(fn item -> item["data"]["children"] end)
  |> Stream.map(fn item -> item["data"]["body"] || item["data"]["url"] end)
  |> Stream.map(fn item -> IO.inspect item end)
  |> Stream.run
end

This works the way I expected it to and the code is essentially what I started with when fetching new posts. I’ve switched it over to fetching hot posts instead.

I’ve also added a new transformation:

Stream.map(fn item -> item["data"]["body"] || item["data"]["url"] end)

This extracts out the body of the comment rather than keeping the whole record. There are two cases here:

  1. item["data"]["body"] handles regular comments and extracts the text of the comment.
  2. item["data"]["url"] handles the links (posts) themselves which have no body. But they have a URL which we extract. reddit includes the original post with the comments. We will make more use of this fact later on.

Rate Limiting

The next thing to do is to rate limit requests to one per second. To do this, I need a server to handle making all the requests. That way, the server can maintain state about the request rate across all requests. Here are my requirements for the server:

  • This server needs to take requests and put them in a queue
  • Add timer to send a event once per second
  • The handle_info callback will be called when the timer fires.
  • In handle_info, pop an item from the queue and make the request

I coded this up as a GenServer. I won’t post the entire server because there’s a lot there. I ended up moving everything related to reddit requests to this module. This includes the OAauth setup and configuration. The server maintains a queue as it’s state.

This is a standard Erlang queue from the :queue module.

The key functions in the server are handle_call and handle_info:

def handle_call({:request, params}, from, queue) do
  {:noreply, :queue.in({from, params}, queue)}
end

handle_call({:request, params}, from, queue) pushes a tuple into a queue. The tuple contains two items, from and params. from is the PID of the process that called the request/3 function. Our handle_call doesn’t reply so we owe the caller a reply later. That’s why we push the PID, from, into the queue. Once we process the request we’ll use from to send a reply back.

The option params has the parameters for the reddit request. We push these into the queue as well and will use them when we actually issue the request.

In init/1 we setup a timer

def init(_) do
  :timer.send_interval(@delay_seconds * 1000, :tick)
  {:ok, :queue.new}
end

based on the constants @delay_seconds. We also create the queue and set it as the Server’s state.

When the timer fires the handle_info/2 callback will be called.

def handle_info(:tick, queue) do
  process_pop(:queue.out(queue))
end

Here we pop the {from, params} tuple and pass it to process_pop/1:

defp process_pop({:empty, queue}), do: {:noreply, queue}
defp process_pop(}, queue}) do
  GenServer.reply(from, reddit_request(endpoint, token, opts))
  {:noreply, queue}
end

defp reddit_request(endpoint, token, opts) do
  HTTPotion.get("https://oauth.reddit.com/" <> endpoint <> query(opts), [headers: [
    "User-Agent": "josephkain-test/0.1 by josephkain",
    "Authorization": "bearer #{token}"
  ]])
  |> Map.get(:body)
  |> Poison.decode
end

This code extracts the params, calls through to the reddit API using HTTPotion and decodes the result. Then we use GenServer.reply/2 to send the result back to our original caller, from.

One thing I learned while building this iteration was that I can’t use ExActor for this kind of GenServer. ExActor doesn’t maintain the from argument to handle_call. This makes sense for ExActor because it is designed to be simple and allow you to build fairly standard GenServers. But, occasionally I’ve found that I need to build servers that defer sending a response and in this case I have to write the server out by hand.

Structure as Fetcher and Parser

The last step is to structure all this code following the Fetcher and Parser patterns I’ve used for Twitter.

Ideally, I’d create a Protocol here. But before doing that I should finish implementing two Fetchers and two Parsers so that I make sure that I have the right interface.

To create a Fetcher I replaced the Reddit.test/0 function with a Reddit.fetch/0 function like this:

@spec fetch :: Enumerable.t
def fetch do
  sub = "programming"
  token = get_oauth_token

  token
  |> fetch_hot_perpertually(sub)
  |> Stream.map(fn item -> item["data"]["id"] end)
  |> Stream.flat_map(fn id -> get_comments(token, sub, id) end)
  |> Stream.flat_map(fn item -> item["data"]["children"] end)
end

I’ve removed a few stages of this pipeline, namely:

|> Stream.map(fn item -> item["data"]["body"] || item["data"]["url"] end)

As I described before, this line parses out text or URLs. This code belongs in the Parser.

|> Stream.map(fn item -> IO.inspect item end)

This was just for testing and can be removed.

|> Stream.run

This made sense for testing but for the whole Domain Scrapper this belongs at a higher level.

The next step was to write up a Parser:

defmodule Reddit.Parser do
  @uri_regex ~r<https*://[^\s]+>

  @typep comment_t :: map()

  @spec text(comment_t) :: String.t
  def text(comment) do
    get_body(comment)
  end

  @spec urls(comment_t) :: [ String.t ]
  def urls(comment) do
    comment
    |> get_body
    |> parse_urls
  end

  @spec get_body(comment_t) :: String.t
  defp get_body(comment) do
    comment["data"]["body"] || comment["data"]["url"] || ""
  end

  @spec parse_urls(String.t) :: String.t
  defp parse_urls(body) do
    @uri_regex
    |> Regex.scan(body)
    |> List.flatten
  end
end

This code simply uses a regexp to extract URLs from the comment bodies. Note that the function get_body/1 handles several cases

  1. The comment could be a true comment, in which body contains a bunch of text.
  2. The comment could actually be the original link for the post. The reddit API returns this along with the comments. In this case the “body” field is nil and we will use comment["data"]["url"]. This will match the regexp and give us the link.
  3. I’ve seen a few cases where there is no data in which case get_body/1 returns "" which just won’t match the regexp causing the comment will be ignored.

Finally, we have a GenServer to run the Fetcher / Parser part of the pipeline. This will allow us to create a Reddit OTP application:

defmodule Reddit.Server do
  use ExActor.GenServer, export: {:global, __MODULE__}

  defstart start_link() do
    stream = Reddit.Fetcher.fetch
    |> Stream.flat_map(fn tweet -> Reddit.Parser.urls(tweet) end)

    initial_state(stream)
  end

  defcall get, state: stream, do: reply(stream)
end

This is similar to the Twitter code we have. We start up a process that puts the Fetcher / Parser pipeline together to generate a Stream of URLs. The Stream can be retrieved with the get function.

Next Steps

Now that we have two Fetchers (Twitter and reddit) we can stream both through the Unshortener. We can use these to prove out the architecture and interfaces we’ve designed. This will be the topic of the next post.