Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 124 additions & 9 deletions lib/ex_twelve_data/real_time_prices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,17 @@ defmodule ExTwelveData.RealTimePrices do
"""
@spec subscribe(pid, symbols_list()) :: {:error, any} | {:ok}
def subscribe(client, symbols) do
validated_symbols = validate_and_format_symbols(symbols)

msg =
Jason.encode!(%{
action: "subscribe",
params: %{
symbols: symbols
symbols: validated_symbols
}
})

Logger.debug("~> Subscribing to symbols: #{msg}")
Logger.debug("~> Subscribing to symbols: #{inspect(symbols)} -> formatted as: #{msg}")
WebSockex.send_frame(client, {:text, msg})
end

Expand Down Expand Up @@ -119,6 +121,7 @@ defmodule ExTwelveData.RealTimePrices do

def handle_connect(conn, state) do
Logger.info("<~ Connected to Twelve Data")
Logger.debug("Scheduling initial heartbeat...")
schedule_next_heartbeat()
super(conn, state)
end
Expand Down Expand Up @@ -146,8 +149,9 @@ defmodule ExTwelveData.RealTimePrices do
# TODO At this stage, we should also schedule a message to close the connection, keep a reference to it,
# and cancel it when we receive the heartbeat reply. This prevents situations where the WebSocket connection
# is open, we can send heartbeats, but the server is unresponsive.
Logger.debug("~> Sending heartbeat")
schedule_next_heartbeat()
Logger.debug("~> Sending heartbeat: #{@heartbeat_message}")
timer_ref = schedule_next_heartbeat()
Logger.debug("Next heartbeat scheduled with timer ref: #{inspect(timer_ref)}")
{:reply, {:text, @heartbeat_message}, state}
end

Expand All @@ -172,11 +176,16 @@ defmodule ExTwelveData.RealTimePrices do
%{
event: "subscribe-status",
status: status
},
} = message,
_state
) do
case status do
"ok" ->
handle_successful_subscription(message)
:ok

"error" ->
handle_subscription_error(message)
:ok

_ ->
Expand All @@ -189,11 +198,16 @@ defmodule ExTwelveData.RealTimePrices do
%{
event: "unsubscribe-status",
status: status
},
} = message,
_state
) do
case status do
"ok" ->
handle_successful_unsubscription(message)
:ok

"error" ->
handle_unsubscription_error(message)
:ok

_ ->
Expand All @@ -206,11 +220,16 @@ defmodule ExTwelveData.RealTimePrices do
%{
event: "reset-status",
status: status
},
} = message,
_state
) do
case status do
"ok" ->
Logger.info("Reset successful")
:ok

"error" ->
handle_reset_error(message)
:ok

_ ->
Expand All @@ -230,8 +249,104 @@ defmodule ExTwelveData.RealTimePrices do
:ok
end

defp handle_successful_subscription(%{success: success, fails: fails}) do
if success && not Enum.empty?(success) do
symbols = Enum.map(success, & &1.symbol)
Logger.info("Successfully subscribed to symbols: #{Enum.join(symbols, ", ")}")
end

if fails && not Enum.empty?(fails) do
symbols = Enum.map(fails, & &1.symbol)
Logger.warning("Failed to subscribe to symbols: #{Enum.join(symbols, ", ")}")
end
end

defp handle_successful_subscription(_message) do
Logger.info("Subscription successful")
end

defp handle_subscription_error(%{messages: messages}) when is_list(messages) do
error_msg = Enum.join(messages, ", ")
Logger.error("Subscribe failed: #{error_msg}")
end

defp handle_subscription_error(%{fails: fails}) when is_list(fails) and length(fails) > 0 do
failed_symbols = Enum.map(fails, fn
%{symbol: symbol} -> symbol
symbol when is_binary(symbol) -> symbol
other -> inspect(other)
end)
Logger.error("Subscribe failed for symbols: #{Enum.join(failed_symbols, ", ")}")
end

defp handle_subscription_error(message) do
Logger.error("Subscribe failed: #{inspect(message)}")
end

defp handle_successful_unsubscription(%{success: success, fails: fails}) do
if success && not Enum.empty?(success) do
symbols = Enum.map(success, & &1.symbol)
Logger.info("Successfully unsubscribed from symbols: #{Enum.join(symbols, ", ")}")
end

if fails && not Enum.empty?(fails) do
symbols = Enum.map(fails, & &1.symbol)
Logger.warning("Failed to unsubscribe from symbols: #{Enum.join(symbols, ", ")}")
end
end

defp handle_successful_unsubscription(_message) do
Logger.info("Unsubscription successful")
end

defp handle_unsubscription_error(%{messages: messages}) when is_list(messages) do
error_msg = Enum.join(messages, ", ")
Logger.error("Unsubscribe failed: #{error_msg}")
end

defp handle_unsubscription_error(message) do
Logger.error("Unsubscribe failed: #{inspect(message)}")
end

defp handle_reset_error(%{messages: messages}) when is_list(messages) do
error_msg = Enum.join(messages, ", ")
Logger.error("Reset failed: #{error_msg}")
end

defp handle_reset_error(message) do
Logger.error("Reset failed: #{inspect(message)}")
end

defp validate_and_format_symbols(symbols) when is_binary(symbols) do
symbols
end

defp validate_and_format_symbols(symbols) when is_list(symbols) do
case symbols do
[] ->
Logger.warning("Empty symbols list provided")
[]

[%Symbol{} | _] = symbol_structs ->
symbol_structs

[binary | _] when is_binary(binary) ->
Enum.join(symbols, ",")

_ ->
Logger.error("Invalid symbols format: #{inspect(symbols)}")
symbols
end
end

defp validate_and_format_symbols(symbols) do
Logger.error("Unsupported symbols format: #{inspect(symbols)}")
symbols
end

defp schedule_next_heartbeat do
Logger.debug("Scheduling next heartbeat in #{@heartbeat_seconds}s...")
Process.send_after(self(), :heartbeat, @heartbeat_seconds * 1000)
timer_ref = Process.send_after(self(), :heartbeat, @heartbeat_seconds * 1000)
Logger.debug("Scheduling next heartbeat in #{@heartbeat_seconds}s... Timer ref: #{inspect(timer_ref)}")
timer_ref
end
end
141 changes: 141 additions & 0 deletions test/ex_twelve_data/realtime_prices_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,151 @@ defmodule ExTwelveData.RealtimePricesTest do
end
end

defmodule TestPriceCapture do
@behaviour RealTimePrices.Handler
use Agent

def start_link(_opts \\ []) do
Agent.start_link(fn -> [] end, name: __MODULE__)
end

@impl true
def handle_price_update(price) do
Agent.update(__MODULE__, fn prices -> [price | prices] end)
:ok
end

def get_captured_prices do
Agent.get(__MODULE__, & &1)
end

def clear_prices do
Agent.update(__MODULE__, fn _ -> [] end)
end
end

test "handle price update" do
RealTimePrices.handle_frame(
{:text, ~s({"event": "price"})},
%{handler: SamplePriceUpdateHandler}
)
end

test "handle subscribe-status error with messages field" do
{result, _state} = RealTimePrices.handle_frame(
{:text, ~s({"event": "subscribe-status", "status": "error", "messages": ["Cant parse message"]})},
%{handler: SamplePriceUpdateHandler}
)

assert result === :ok
end

test "handle subscribe-status success with success/fails arrays" do
success_response = ~s({
"event": "subscribe-status",
"status": "ok",
"success": [
{"symbol": "AAPL", "exchange": "NASDAQ", "country": "United States", "type": "Common Stock"}
],
"fails": []
})

{result, _state} = RealTimePrices.handle_frame(
{:text, success_response},
%{handler: SamplePriceUpdateHandler}
)

assert result === :ok
end

test "handle unsubscribe-status error with messages field" do
{result, _state} = RealTimePrices.handle_frame(
{:text, ~s({"event": "unsubscribe-status", "status": "error", "messages": ["Symbol not found"]})},
%{handler: SamplePriceUpdateHandler}
)

assert result === :ok
end

test "handle reset-status error with messages field" do
{result, _state} = RealTimePrices.handle_frame(
{:text, ~s({"event": "reset-status", "status": "error", "messages": ["Reset failed"]})},
%{handler: SamplePriceUpdateHandler}
)

assert result === :ok
end

test "validate price update format with real data" do
TestPriceCapture.start_link()
TestPriceCapture.clear_prices()

# Simulate a real BTC/USD price update based on Twelve Data format
price_update = ~s({
"event": "price",
"symbol": "BTC/USD",
"currency": "USD",
"currency_base": "Bitcoin",
"currency_quote": "US Dollar",
"type": "Physical Currency",
"timestamp": 1692720000,
"price": 26500.50,
"bid": 26500.25,
"ask": 26500.75
})

{result, _state} = RealTimePrices.handle_frame(
{:text, price_update},
%{handler: TestPriceCapture}
)

assert result === :ok

captured_prices = TestPriceCapture.get_captured_prices()
assert length(captured_prices) === 1

price = List.first(captured_prices)
assert price.event === "price"
assert price.symbol === "BTC/USD"
assert price.currency === "USD"
assert price.price === 26500.50
assert price.timestamp === 1692720000
assert Map.has_key?(price, :bid)
assert Map.has_key?(price, :ask)
end

test "validate stock price update format" do
TestPriceCapture.start_link()
TestPriceCapture.clear_prices()

# Simulate a stock price update (AAPL format)
stock_update = ~s({
"event": "price",
"symbol": "AAPL",
"currency": "USD",
"exchange": "NASDAQ",
"type": "Common Stock",
"timestamp": 1692720000,
"price": 175.25,
"day_volume": 45123456
})

{result, _state} = RealTimePrices.handle_frame(
{:text, stock_update},
%{handler: TestPriceCapture}
)

assert result === :ok

captured_prices = TestPriceCapture.get_captured_prices()
assert length(captured_prices) === 1

price = List.first(captured_prices)
assert price.event === "price"
assert price.symbol === "AAPL"
assert price.currency === "USD"
assert price.exchange === "NASDAQ"
assert price.price === 175.25
assert price.day_volume === 45123456
end
end