Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ config :concentrate,
],
file_tap: [
enabled?: false
]
],
http_producer: Concentrate.Producer.HTTPoison

import_config "#{Mix.env()}.exs"
4 changes: 2 additions & 2 deletions guides/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ Throughout the pipeline, data is represented as one of three structs:
* `VehiclePosition`: where the vehicle is, both latitude/longitude and on the trip
* `StopTimeUpdate`: a prediction about when a vehicle will arrive/depart a stop

## Producer.HTTP
## Producer.HTTPoison

At the top of the pipeline are a set of Producer.HTTP stages. Each one is
At the top of the pipeline are a set of Producer.HTTPoison stages. Each one is
responsible for a single file, as well as handling caching and parsing. We
fetch no more frequently than every 5 seconds, to avoid overloading the
remote systems. Once fetched, they're parsed and passed along down the pipeline.
Expand Down
2 changes: 1 addition & 1 deletion lib/concentrate/filter/alert/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Concentrate.Filter.Alert.Supervisor do
Supervisor.start_link(
[
{
Concentrate.Producer.HTTP,
Application.get_env(:concentrate, :http_producer),
{
config[:url],
parser: Concentrate.Parser.Alerts,
Expand Down
2 changes: 1 addition & 1 deletion lib/concentrate/filter/gtfs/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Concentrate.Filter.GTFS.Supervisor do
Supervisor.start_link(
[
{
Concentrate.Producer.HTTP,
Application.get_env(:concentrate, :http_producer),
{
config[:url],
parser: Concentrate.Filter.GTFS.Unzip,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
defmodule Concentrate.Producer.HTTP do
defmodule Concentrate.Producer.HTTPoison do
@moduledoc """
GenStage Producer which fulfills demand by fetching from an HTTP Server.
"""
use GenStage
alias Concentrate.Producer.HTTP.StateMachine, as: SM
alias Concentrate.Producer.HTTPoison.StateMachine, as: SM
require Logger
@start_link_opts [:name]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Concentrate.Producer.HTTP.StateMachine do
defmodule Concentrate.Producer.HTTPoison.StateMachine do
@moduledoc """
State machine to manage the incoming/outgoing messages for making recurring HTTP requests.

Expand Down
2 changes: 1 addition & 1 deletion lib/concentrate/supervisor/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ defmodule Concentrate.Supervisor.Pipeline do

child_spec(
{
Concentrate.Producer.HTTP,
Application.get_env(:concentrate, :http_producer),
{url, [name: source, parser: parser] ++ opts}
},
id: source
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Concentrate.Producer.HttpPropertyTest do
defmodule Concentrate.Producer.HTTPoison.PropertyTest do
@moduledoc false
use ExUnit.Case, async: true
use ExUnitProperties
Expand Down Expand Up @@ -31,7 +31,9 @@ defmodule Concentrate.Producer.HttpPropertyTest do
{bypass, url} = url_for_bodies(bodies)

{:ok, producer} =
start_supervised({Concentrate.Producer.HTTP, {url, parser: &parser/1, fetch_after: 1}})
start_supervised(
{Concentrate.Producer.HTTPoison, {url, parser: &parser/1, fetch_after: 1}}
)

expected_body_count = expected_count(bodies)

Expand All @@ -53,7 +55,7 @@ defmodule Concentrate.Producer.HttpPropertyTest do

{:ok, producer} =
start_supervised(
{Concentrate.Producer.HTTP,
{Concentrate.Producer.HTTPoison,
{url,
fallback_url: fallback_url,
content_warning_timeout: 10,
Expand Down Expand Up @@ -139,7 +141,7 @@ defmodule Concentrate.Producer.HttpPropertyTest do
false
end

:ok = stop_supervised(Concentrate.Producer.HTTP)
:ok = stop_supervised(Concentrate.Producer.HTTPoison)
passed?
end
end
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Concentrate.Producer.HTTP.StateMachineTest do
defmodule Concentrate.Producer.HTTPoison.StateMachineTest do
@moduledoc false
use ExUnit.Case
import Concentrate.Producer.HTTP.StateMachine
import Concentrate.Producer.HTTPoison.StateMachine
import ExUnit.CaptureLog

setup_all do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule Concentrate.Producer.HTTPTest do
defmodule Concentrate.Producer.HTTPoisonTest do
@moduledoc false
use ExUnit.Case
import Concentrate.Producer.HTTP
alias Concentrate.Producer.HTTP.StateMachine
import Concentrate.Producer.HTTPoison
alias Concentrate.Producer.HTTPoison.StateMachine
import Plug.Conn, only: [get_req_header: 2, put_resp_header: 3, send_resp: 3]

defmodule TestParser do
Expand Down Expand Up @@ -44,14 +44,14 @@ defmodule Concentrate.Producer.HTTPTest do
@tag :capture_log
test "ignores unknown messages" do
machine = StateMachine.init("url", parser: & &1)
state = %Concentrate.Producer.HTTP.State{machine: machine}
state = %Concentrate.Producer.HTTPoison.State{machine: machine}
assert {:noreply, [], ^state} = handle_info(:unknown, state)
end

@tag :capture_log
test "does not send more demand than requested" do
machine = StateMachine.init("url", parser: &[&1])
state = %Concentrate.Producer.HTTP.State{machine: machine, demand: 1}
state = %Concentrate.Producer.HTTPoison.State{machine: machine, demand: 1}
response = %HTTPoison.Response{status_code: 200, body: "body"}
assert {:noreply, [_], state, :hibernate} = handle_info({:http_response, response}, state)

Expand Down Expand Up @@ -288,7 +288,9 @@ defmodule Concentrate.Producer.HTTPTest do

@tag :capture_log
test "a fetch error is not fatal" do
{:ok, pid} = start_supervised({Concentrate.Producer.HTTP, {"nodomain.dne", parser: & &1}})
{:ok, pid} =
start_supervised({Concentrate.Producer.HTTPoison, {"nodomain.dne", parser: & &1}})

# this will never finish, so run it in a separate process
Task.async(fn -> take_events(pid, 1) end)
:timer.sleep(50)
Expand All @@ -299,7 +301,7 @@ defmodule Concentrate.Producer.HTTPTest do
url = "http://127.0.0.1:#{bypass.port}/"
opts = Keyword.put_new(opts, :parser, fn body -> [body] end)

{:ok, _} = start_supervised({Concentrate.Producer.HTTP, {url, opts}})
{:ok, _} = start_supervised({Concentrate.Producer.HTTPoison, {url, opts}})
end

defp take_events(producer, event_count) do
Expand Down