From e2cc5216ef899a62f37515b78823985f7b47bfc1 Mon Sep 17 00:00:00 2001 From: MikaAK Date: Fri, 22 Aug 2025 14:07:38 -0700 Subject: [PATCH] chore: get price updates working again --- lib/ex_twelve_data/real_time_prices.ex | 133 +++++++++++++++-- test/ex_twelve_data/realtime_prices_test.exs | 141 +++++++++++++++++++ 2 files changed, 265 insertions(+), 9 deletions(-) diff --git a/lib/ex_twelve_data/real_time_prices.ex b/lib/ex_twelve_data/real_time_prices.ex index b9cd6e3..6740ae5 100644 --- a/lib/ex_twelve_data/real_time_prices.ex +++ b/lib/ex_twelve_data/real_time_prices.ex @@ -75,15 +75,17 @@ defmodule ExTwelveData.RealTimePrices do """ @spec subscribe(pid, symbols_list()) :: {:error, any} | {:ok} def subscribe(client, symbols) do + validated_symbols = validate_and_format_symbols(symbols) + msg = Jason.encode!(%{ action: "subscribe", params: %{ - symbols: symbols + symbols: validated_symbols } }) - Logger.debug("~> Subscribing to symbols: #{msg}") + Logger.debug("~> Subscribing to symbols: #{inspect(symbols)} -> formatted as: #{msg}") WebSockex.send_frame(client, {:text, msg}) end @@ -119,6 +121,7 @@ defmodule ExTwelveData.RealTimePrices do def handle_connect(conn, state) do Logger.info("<~ Connected to Twelve Data") + Logger.debug("Scheduling initial heartbeat...") schedule_next_heartbeat() super(conn, state) end @@ -146,8 +149,9 @@ defmodule ExTwelveData.RealTimePrices do # TODO At this stage, we should also schedule a message to close the connection, keep a reference to it, # and cancel it when we receive the heartbeat reply. This prevents situations where the WebSocket connection # is open, we can send heartbeats, but the server is unresponsive. - Logger.debug("~> Sending heartbeat") - schedule_next_heartbeat() + Logger.debug("~> Sending heartbeat: #{@heartbeat_message}") + timer_ref = schedule_next_heartbeat() + Logger.debug("Next heartbeat scheduled with timer ref: #{inspect(timer_ref)}") {:reply, {:text, @heartbeat_message}, state} end @@ -172,11 +176,16 @@ defmodule ExTwelveData.RealTimePrices do %{ event: "subscribe-status", status: status - }, + } = message, _state ) do case status do "ok" -> + handle_successful_subscription(message) + :ok + + "error" -> + handle_subscription_error(message) :ok _ -> @@ -189,11 +198,16 @@ defmodule ExTwelveData.RealTimePrices do %{ event: "unsubscribe-status", status: status - }, + } = message, _state ) do case status do "ok" -> + handle_successful_unsubscription(message) + :ok + + "error" -> + handle_unsubscription_error(message) :ok _ -> @@ -206,11 +220,16 @@ defmodule ExTwelveData.RealTimePrices do %{ event: "reset-status", status: status - }, + } = message, _state ) do case status do "ok" -> + Logger.info("Reset successful") + :ok + + "error" -> + handle_reset_error(message) :ok _ -> @@ -230,8 +249,104 @@ defmodule ExTwelveData.RealTimePrices do :ok end + defp handle_successful_subscription(%{success: success, fails: fails}) do + if success && not Enum.empty?(success) do + symbols = Enum.map(success, & &1.symbol) + Logger.info("Successfully subscribed to symbols: #{Enum.join(symbols, ", ")}") + end + + if fails && not Enum.empty?(fails) do + symbols = Enum.map(fails, & &1.symbol) + Logger.warning("Failed to subscribe to symbols: #{Enum.join(symbols, ", ")}") + end + end + + defp handle_successful_subscription(_message) do + Logger.info("Subscription successful") + end + + defp handle_subscription_error(%{messages: messages}) when is_list(messages) do + error_msg = Enum.join(messages, ", ") + Logger.error("Subscribe failed: #{error_msg}") + end + + defp handle_subscription_error(%{fails: fails}) when is_list(fails) and length(fails) > 0 do + failed_symbols = Enum.map(fails, fn + %{symbol: symbol} -> symbol + symbol when is_binary(symbol) -> symbol + other -> inspect(other) + end) + Logger.error("Subscribe failed for symbols: #{Enum.join(failed_symbols, ", ")}") + end + + defp handle_subscription_error(message) do + Logger.error("Subscribe failed: #{inspect(message)}") + end + + defp handle_successful_unsubscription(%{success: success, fails: fails}) do + if success && not Enum.empty?(success) do + symbols = Enum.map(success, & &1.symbol) + Logger.info("Successfully unsubscribed from symbols: #{Enum.join(symbols, ", ")}") + end + + if fails && not Enum.empty?(fails) do + symbols = Enum.map(fails, & &1.symbol) + Logger.warning("Failed to unsubscribe from symbols: #{Enum.join(symbols, ", ")}") + end + end + + defp handle_successful_unsubscription(_message) do + Logger.info("Unsubscription successful") + end + + defp handle_unsubscription_error(%{messages: messages}) when is_list(messages) do + error_msg = Enum.join(messages, ", ") + Logger.error("Unsubscribe failed: #{error_msg}") + end + + defp handle_unsubscription_error(message) do + Logger.error("Unsubscribe failed: #{inspect(message)}") + end + + defp handle_reset_error(%{messages: messages}) when is_list(messages) do + error_msg = Enum.join(messages, ", ") + Logger.error("Reset failed: #{error_msg}") + end + + defp handle_reset_error(message) do + Logger.error("Reset failed: #{inspect(message)}") + end + + defp validate_and_format_symbols(symbols) when is_binary(symbols) do + symbols + end + + defp validate_and_format_symbols(symbols) when is_list(symbols) do + case symbols do + [] -> + Logger.warning("Empty symbols list provided") + [] + + [%Symbol{} | _] = symbol_structs -> + symbol_structs + + [binary | _] when is_binary(binary) -> + Enum.join(symbols, ",") + + _ -> + Logger.error("Invalid symbols format: #{inspect(symbols)}") + symbols + end + end + + defp validate_and_format_symbols(symbols) do + Logger.error("Unsupported symbols format: #{inspect(symbols)}") + symbols + end + defp schedule_next_heartbeat do - Logger.debug("Scheduling next heartbeat in #{@heartbeat_seconds}s...") - Process.send_after(self(), :heartbeat, @heartbeat_seconds * 1000) + timer_ref = Process.send_after(self(), :heartbeat, @heartbeat_seconds * 1000) + Logger.debug("Scheduling next heartbeat in #{@heartbeat_seconds}s... Timer ref: #{inspect(timer_ref)}") + timer_ref end end diff --git a/test/ex_twelve_data/realtime_prices_test.exs b/test/ex_twelve_data/realtime_prices_test.exs index 1a99bd2..5d1bbfc 100644 --- a/test/ex_twelve_data/realtime_prices_test.exs +++ b/test/ex_twelve_data/realtime_prices_test.exs @@ -20,10 +20,151 @@ defmodule ExTwelveData.RealtimePricesTest do end end + defmodule TestPriceCapture do + @behaviour RealTimePrices.Handler + use Agent + + def start_link(_opts \\ []) do + Agent.start_link(fn -> [] end, name: __MODULE__) + end + + @impl true + def handle_price_update(price) do + Agent.update(__MODULE__, fn prices -> [price | prices] end) + :ok + end + + def get_captured_prices do + Agent.get(__MODULE__, & &1) + end + + def clear_prices do + Agent.update(__MODULE__, fn _ -> [] end) + end + end + test "handle price update" do RealTimePrices.handle_frame( {:text, ~s({"event": "price"})}, %{handler: SamplePriceUpdateHandler} ) end + + test "handle subscribe-status error with messages field" do + {result, _state} = RealTimePrices.handle_frame( + {:text, ~s({"event": "subscribe-status", "status": "error", "messages": ["Cant parse message"]})}, + %{handler: SamplePriceUpdateHandler} + ) + + assert result === :ok + end + + test "handle subscribe-status success with success/fails arrays" do + success_response = ~s({ + "event": "subscribe-status", + "status": "ok", + "success": [ + {"symbol": "AAPL", "exchange": "NASDAQ", "country": "United States", "type": "Common Stock"} + ], + "fails": [] + }) + + {result, _state} = RealTimePrices.handle_frame( + {:text, success_response}, + %{handler: SamplePriceUpdateHandler} + ) + + assert result === :ok + end + + test "handle unsubscribe-status error with messages field" do + {result, _state} = RealTimePrices.handle_frame( + {:text, ~s({"event": "unsubscribe-status", "status": "error", "messages": ["Symbol not found"]})}, + %{handler: SamplePriceUpdateHandler} + ) + + assert result === :ok + end + + test "handle reset-status error with messages field" do + {result, _state} = RealTimePrices.handle_frame( + {:text, ~s({"event": "reset-status", "status": "error", "messages": ["Reset failed"]})}, + %{handler: SamplePriceUpdateHandler} + ) + + assert result === :ok + end + + test "validate price update format with real data" do + TestPriceCapture.start_link() + TestPriceCapture.clear_prices() + + # Simulate a real BTC/USD price update based on Twelve Data format + price_update = ~s({ + "event": "price", + "symbol": "BTC/USD", + "currency": "USD", + "currency_base": "Bitcoin", + "currency_quote": "US Dollar", + "type": "Physical Currency", + "timestamp": 1692720000, + "price": 26500.50, + "bid": 26500.25, + "ask": 26500.75 + }) + + {result, _state} = RealTimePrices.handle_frame( + {:text, price_update}, + %{handler: TestPriceCapture} + ) + + assert result === :ok + + captured_prices = TestPriceCapture.get_captured_prices() + assert length(captured_prices) === 1 + + price = List.first(captured_prices) + assert price.event === "price" + assert price.symbol === "BTC/USD" + assert price.currency === "USD" + assert price.price === 26500.50 + assert price.timestamp === 1692720000 + assert Map.has_key?(price, :bid) + assert Map.has_key?(price, :ask) + end + + test "validate stock price update format" do + TestPriceCapture.start_link() + TestPriceCapture.clear_prices() + + # Simulate a stock price update (AAPL format) + stock_update = ~s({ + "event": "price", + "symbol": "AAPL", + "currency": "USD", + "exchange": "NASDAQ", + "type": "Common Stock", + "timestamp": 1692720000, + "price": 175.25, + "day_volume": 45123456 + }) + + {result, _state} = RealTimePrices.handle_frame( + {:text, stock_update}, + %{handler: TestPriceCapture} + ) + + assert result === :ok + + captured_prices = TestPriceCapture.get_captured_prices() + assert length(captured_prices) === 1 + + price = List.first(captured_prices) + assert price.event === "price" + assert price.symbol === "AAPL" + assert price.currency === "USD" + assert price.exchange === "NASDAQ" + assert price.price === 175.25 + assert price.day_volume === 45123456 + end end