From 5645abb5d898fab5b063ad668b8c935dcf2641eb Mon Sep 17 00:00:00 2001 From: "Tyler M. Kontra" Date: Wed, 20 Oct 2021 19:55:20 -0700 Subject: [PATCH 1/2] introduce storage layer --- .vscode/launch.json | 29 +++++++++++++++++ apps/muster/lib/muster/repository/impl.ex | 35 ++++++++++++++++----- apps/muster/lib/muster/repository/server.ex | 15 +++------ apps/muster/lib/muster/storage.ex | 31 ++++++++++++++++++ apps/muster/mix.exs | 4 +++ apps/muster/test/test_helper.exs | 7 +++++ config/config.exs | 2 +- config/test.exs | 3 ++ 8 files changed, 108 insertions(+), 18 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 apps/muster/lib/muster/storage.ex diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..7fd10c7 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,29 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "mix_task", + "name": "mix (Default task)", + "request": "launch", + "projectDir": "${workspaceRoot}" + }, + { + "type": "mix_task", + "name": "mix test", + "request": "launch", + "task": "test", + "taskArgs": [ + "--trace" + ], + "startApps": true, + "projectDir": "${workspaceRoot}", + "requireFiles": [ + "test/**/test_helper.exs", + "test/**/*_test.exs" + ] + } + ] +} \ No newline at end of file diff --git a/apps/muster/lib/muster/repository/impl.ex b/apps/muster/lib/muster/repository/impl.ex index 5b2d530..5c3cd88 100644 --- a/apps/muster/lib/muster/repository/impl.ex +++ b/apps/muster/lib/muster/repository/impl.ex @@ -1,19 +1,22 @@ defmodule Muster.Repository.Impl do + alias Muster.Storage require Logger @enforce_keys [:name, :uploads, :layers, :tags, :manifests] defstruct ~w[name uploads layers tags manifests]a + @namespace "all" + def new(name) do %__MODULE__{ name: name, # upload sessions uploads: %{}, # digest to layer blob - layers: %{}, + layers: MapSet.new(), # tag to manifest tags: %{}, - # digest to tag, tag to tag -- single source of truth for manifest reference -> tag + # digest -> tag (union) tag -> tag -- single source of truth for manifest reference -> tag manifests: %{} } end @@ -38,7 +41,7 @@ defmodule Muster.Repository.Impl do %__MODULE__{uploads: uploads, layers: layers} = state ) do with {:ok, {:started, _any}} <- Map.fetch(uploads, upload_id) do - layers = Map.put(layers, digest, blob) + put_layer(digest, blob, layers, state.name) uploads = Map.put(uploads, upload_id, {:completed, []}) state = %{state | layers: layers, uploads: uploads} {:ok, digest, state} @@ -103,7 +106,7 @@ defmodule Muster.Repository.Impl do end {:started, chunks} = Map.fetch!(state.uploads, upload_id) - layers = put_layer(digest, chunks, state.layers) + layers = put_layer(digest, chunks, state.layers, state.name) uploads = Map.put(state.uploads, upload_id, {:completed, []}) {:ok, digest, %{state | layers: layers, uploads: uploads}} @@ -112,13 +115,20 @@ defmodule Muster.Repository.Impl do end end - defp put_layer(digest, chunks, layers_state) do + defp put_layer(digest, chunks, layers_state, name) when is_list(chunks) do layer = chunks |> Enum.map(fn {_, blob} -> blob end) |> Enum.reduce(<<>>, fn a, b -> a <> b end) - Map.put(layers_state, digest, layer) + put_layer(digest, layer, layers_state, name) + end + + defp put_layer(digest, blob, layers_state, name) when is_binary(blob) do + case Storage.write_blob(@namespace, name, digest, blob) do + :ok -> MapSet.put(layers_state, digest) + {:error, _} -> layers_state + end end def upload_manifest( @@ -129,7 +139,7 @@ defmodule Muster.Repository.Impl do ) do case manifest_layers |> Enum.map(fn %{"digest" => digest} -> digest end) - |> Enum.all?(&Map.has_key?(state.layers, &1)) do + |> Enum.all?(&MapSet.member?(state.layers, &1)) do true -> state = state |> put_manifest(reference, manifest_digest, manifest) {:ok, {reference, state}} @@ -144,4 +154,15 @@ defmodule Muster.Repository.Impl do manifests = Map.put(state.manifests, digest, reference) |> Map.put(reference, reference) %{state | tags: tags, manifests: manifests} end + + def check_layer(digest, %__MODULE__{layers: layers} = _state) do + MapSet.member?(layers, digest) + end + + def get_layer(digest, %__MODULE__{layers: layers} = state) do + case MapSet.member?(layers, digest) do + false -> {:error, :not_found} + true -> Storage.get_blob(@namespace, state.name, digest) + end + end end diff --git a/apps/muster/lib/muster/repository/server.ex b/apps/muster/lib/muster/repository/server.ex index e2bb02b..80698e8 100644 --- a/apps/muster/lib/muster/repository/server.ex +++ b/apps/muster/lib/muster/repository/server.ex @@ -15,7 +15,7 @@ defmodule Muster.Repository.Server do @spec init(any) :: {:ok, %Muster.Repository.Impl{ - layers: %{}, + layers: MapSet.t(any), manifests: %{}, name: any, tags: %{}, @@ -178,19 +178,14 @@ defmodule Muster.Repository.Server do end @impl GenServer - def handle_call({:check_layer, digest}, _from, %{layers: layers} = state) do - exists? = Map.has_key?(layers, digest) + def handle_call({:check_layer, digest}, _from, state) do + exists? = Repository.Impl.check_layer(digest, state) {:reply, exists?, state} end @impl GenServer - def handle_call({:get_layer, digest}, _from, %{layers: layers} = state) do - resp = - case Map.get(layers, digest) do - nil -> {:error, :not_found} - blob -> {:ok, blob} - end - + def handle_call({:get_layer, digest}, _from, state) do + resp = Repository.Impl.get_layer(digest, state) {:reply, resp, state} end end diff --git a/apps/muster/lib/muster/storage.ex b/apps/muster/lib/muster/storage.ex new file mode 100644 index 0000000..40e042a --- /dev/null +++ b/apps/muster/lib/muster/storage.ex @@ -0,0 +1,31 @@ +defmodule Muster.Storage do + + # Application.compile_env!(:muster, :storage_root) + @storage_root Application.fetch_env!(:muster, :storage_root) + + def storage_root(), do: @storage_root + + # Storage Driver + def get_blob(namespace, name, digest) do + filepath(namespace, name, digest) + |> File.read() + |> case do + {:ok, blob} -> {:ok, blob} + {:error, :enomem} -> {:error, :ephemeral} + {:error, _cause} -> {:error, :not_found} + end + end + + def write_blob(namespace, name, digest, blob) do + Path.join([@storage_root, namespace, name]) |> File.mkdir_p() + filepath(namespace, name, digest) + |> File.write(blob) + |> case do + {:error, :enomemt} -> {:error, :ephemeral} + {:error, :enoent} -> raise "storage error: enoent" + other -> other + end + end + + defp filepath(namespace, name, digest), do: [@storage_root, namespace, name, digest] |> Path.join() +end diff --git a/apps/muster/mix.exs b/apps/muster/mix.exs index ce7bfe1..48748d0 100644 --- a/apps/muster/mix.exs +++ b/apps/muster/mix.exs @@ -5,6 +5,10 @@ defmodule Muster.MixProject do [ app: :muster, version: "0.1.0", + build_path: "../../_build", + config_path: "../../config/config.exs", + deps_path: "../../deps", + lockfile: "../../mix.lock", elixir: "~> 1.12", start_permanent: Mix.env() == :prod, deps: deps(), diff --git a/apps/muster/test/test_helper.exs b/apps/muster/test/test_helper.exs index 869559e..7fc6907 100644 --- a/apps/muster/test/test_helper.exs +++ b/apps/muster/test/test_helper.exs @@ -1 +1,8 @@ +alias Muster.Storage + ExUnit.start() + +ExUnit.after_suite(fn _ -> + Storage.storage_root() + |> File.rm_rf!() +end) diff --git a/config/config.exs b/config/config.exs index fc40d54..00f6972 100644 --- a/config/config.exs +++ b/config/config.exs @@ -12,7 +12,7 @@ import Config # Configures the muster core config :muster, - speed_limit: 5 + key: "value" config :muster_api, generators: [context_app: false] diff --git a/config/test.exs b/config/test.exs index 4f54e97..5e481dd 100644 --- a/config/test.exs +++ b/config/test.exs @@ -5,3 +5,6 @@ use Mix.Config config :muster_api, MusterApi.Endpoint, http: [port: 4002], server: false + + config :muster, + storage_root: "muster_store_test" From f58a1140eadfcc3a7daff0ce13422729a46c149e Mon Sep 17 00:00:00 2001 From: "Tyler M. Kontra" Date: Thu, 21 Oct 2021 00:31:11 -0700 Subject: [PATCH 2/2] implement stream upload because docker cli demands it --- .gitignore | 1 + apps/muster/lib/muster/blob.ex | 3 + apps/muster/lib/muster/repository.ex | 4 ++ apps/muster/lib/muster/repository/impl.ex | 43 ++++++++++-- apps/muster/lib/muster/repository/server.ex | 26 +++++--- .../controllers/registry_controller.ex | 65 +++++++++++-------- apps/muster_api/lib/muster_api/digest_plug.ex | 2 +- apps/muster_api/lib/muster_api/endpoint.ex | 2 +- apps/muster_api/lib/muster_api/router.ex | 2 +- config/config.exs | 1 + config/dev.exs | 5 +- 11 files changed, 109 insertions(+), 45 deletions(-) create mode 100644 apps/muster/lib/muster/blob.ex diff --git a/.gitignore b/.gitignore index cfb2340..d7def1a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +**muster_storage_** /_build /cover /deps diff --git a/apps/muster/lib/muster/blob.ex b/apps/muster/lib/muster/blob.ex new file mode 100644 index 0000000..3db116e --- /dev/null +++ b/apps/muster/lib/muster/blob.ex @@ -0,0 +1,3 @@ +defmodule Muster.Blob do + +end diff --git a/apps/muster/lib/muster/repository.ex b/apps/muster/lib/muster/repository.ex index 13283ba..ccc65c3 100644 --- a/apps/muster/lib/muster/repository.ex +++ b/apps/muster/lib/muster/repository.ex @@ -35,6 +35,10 @@ defmodule Muster.Repository do ) end + def chunk_upload(repo, location, range, blob) when is_nil(range) do + GenServer.call(repo, {:upload_stream, %ChunkedUploadRequest{upload_id: location, range: nil, blob: blob}}) + end + def chunk_upload(repo, location, range, blob) do GenServer.call( repo, diff --git a/apps/muster/lib/muster/repository/impl.ex b/apps/muster/lib/muster/repository/impl.ex index 5c3cd88..0d90e7f 100644 --- a/apps/muster/lib/muster/repository/impl.ex +++ b/apps/muster/lib/muster/repository/impl.ex @@ -5,8 +5,11 @@ defmodule Muster.Repository.Impl do @enforce_keys [:name, :uploads, :layers, :tags, :manifests] defstruct ~w[name uploads layers tags manifests]a + @type t :: %__MODULE__{name: String.t(), uploads: map(), layers: MapSet.t(), tags: map(), manifests: map()} + @namespace "all" + @spec new(any) :: Muster.Repository.Impl.t() def new(name) do %__MODULE__{ name: name, @@ -70,17 +73,45 @@ defmodule Muster.Repository.Impl do end end + def upload_layer_stream( + upload_id, blob, %__MODULE__{uploads: uploads} = state + ) do + Logger.debug("Streaming layer chunk for #{upload_id}") + with {:ok, {:started, chunks}} when is_list(chunks) <- Map.fetch(uploads, upload_id) + do + range_end = case chunks do + [prev | []] -> + {{_, prev_end}, _} = prev + prev_end + [prev | [_]] -> + {{_, prev_end}, _} = prev + prev_end + [] -> + 0 + end + range = range_end + byte_size(blob) + chunks = [{{nil, range}, blob} | chunks] + uploads = Map.put(uploads, upload_id, {:started, chunks}) + state = %{state | uploads: uploads} + {:ok, range, state} + else + {:ok, {:started, nil}} -> {:error, :monolithic_only} + {:error, cause} -> {:error, cause} + error -> {:error, error} + end + end + defp verify_chunk_order(chunks, range_start, range_end, blob) do chunks = case chunks do [] when range_start == 0 -> - [{range_end, blob}] + [{{0, range_end}, blob}] - chunks = [{prev_end, _blob} | _tail = []] when prev_end + 1 == range_start -> - [{range_end, blob} | chunks] + chunks = [{{_prev_start, prev_end}, _blob} | _tail = []] when prev_end + 1 == range_start -> + [{{range_start, range_end}, blob} | chunks] - chunks = [{prev_end, blob} | _tail] when prev_end + 1 == range_start -> - [{range_end, blob} | chunks] + chunks = [{{_prev_start, prev_end}, blob} | _tail] when prev_end + 1 == range_start -> + [{{range_start, range_end}, blob} | chunks] _ -> Logger.warn("Got invalid chunk sequence for range '#{range_start}-#{range_end}'") @@ -160,6 +191,8 @@ defmodule Muster.Repository.Impl do end def get_layer(digest, %__MODULE__{layers: layers} = state) do + l = layers |> Enum.map(fn l -> l["digest"] end) |> Enum.join(", ") + Logger.info("Checking layer #{digest} in layers for #{l}") case MapSet.member?(layers, digest) do false -> {:error, :not_found} true -> Storage.get_blob(@namespace, state.name, digest) diff --git a/apps/muster/lib/muster/repository/server.ex b/apps/muster/lib/muster/repository/server.ex index 80698e8..d5abf92 100644 --- a/apps/muster/lib/muster/repository/server.ex +++ b/apps/muster/lib/muster/repository/server.ex @@ -13,14 +13,7 @@ defmodule Muster.Repository.Server do @impl GenServer @spec init(any) :: - {:ok, - %Muster.Repository.Impl{ - layers: MapSet.t(any), - manifests: %{}, - name: any, - tags: %{}, - uploads: %{} - }} + {:ok, Muster.Repository.Impl.t()} def init(name) do { :ok, @@ -76,6 +69,23 @@ defmodule Muster.Repository.Server do end end + @impl GenServer + def handle_call( + {:upload_stream, + %ChunkedUploadRequest{upload_id: upload_id, range: nil, blob: blob}}, + _from, + state + ) do + case Repository.Impl.upload_layer_stream(upload_id, blob, state) do + {:ok, range, state} -> + Logger.debug("Successfully streamed chunk to #{upload_id}") + {:reply, %{location: upload_id, range: range}, state} + {:error, cause} -> + Logger.debug("Error streaming chunk to #{upload_id}: #{cause}") + {:reply, {:error, cause}, state} + end + end + # complete upload with final layer @impl GenServer def handle_call( diff --git a/apps/muster_api/lib/muster_api/controllers/registry_controller.ex b/apps/muster_api/lib/muster_api/controllers/registry_controller.ex index 15e0b2f..35e9d3d 100644 --- a/apps/muster_api/lib/muster_api/controllers/registry_controller.ex +++ b/apps/muster_api/lib/muster_api/controllers/registry_controller.ex @@ -1,5 +1,6 @@ defmodule MusterApi.RegistryController do use MusterApi, :controller + require Logger def render_json(conn, body), do: render(conn, "index.json", body: body) @@ -28,14 +29,15 @@ defmodule MusterApi.RegistryController do end case MusterApi.RegistryService.start_upload(namespace, name, type) do - %{location: location} = resp -> + %{location: location_id} -> location = - MusterApi.Router.Helpers.registry_path(conn, :upload_blob, namespace, name, location) - + MusterApi.Router.Helpers.registry_path(conn, :upload_blob, namespace, name, location_id) + Logger.info("Sending location for new upload: #{location}") conn - |> put_status(202) |> put_resp_header("Location", location) - |> render_json(%{"type" => type}) + |> put_resp_header("Range", "bytes=0-0") + |> put_resp_header("Docker-Upload-UUID", location_id) + |> send_resp(202, "") end end @@ -57,20 +59,25 @@ defmodule MusterApi.RegistryController do end end + defp range_from_headers(conn, blob) do + case Plug.Conn.get_req_header(conn, "content-range") do + [range] -> + Logger.info("Got range from headers: #{range}") + [range_start, range_end] = String.split(range, "-") + |> Enum.map(fn i -> Integer.parse(i) end) + |> Enum.map(fn {i, _} -> i end) + {range_start, range_end} + _ -> nil + end + end + def upload_final_blob_chunk( conn, %{"namespace" => namespace, "name" => name, "location" => location, "digest" => digest} = _params ) do {:ok, blob, conn} = Plug.Conn.read_body(conn) - [range] = Plug.Conn.get_req_header(conn, "content-range") - - [range_start | range_end] = - String.split(range, "-") - |> Enum.map(fn i -> Integer.parse(i) end) - |> Enum.map(fn {i, _} -> i end) - - range = {range_start, range_end} + range = range_from_headers(conn, blob) case MusterApi.RegistryService.upload_final_blob_chunk( namespace, @@ -89,30 +96,30 @@ defmodule MusterApi.RegistryController do %{"namespace" => namespace, "name" => name, "location" => location} = _params ) do {:ok, blob, conn} = Plug.Conn.read_body(conn) - [range] = Plug.Conn.get_req_header(conn, "content-range") - - [range_start | range_end] = - String.split(range, "-") - |> Enum.map(fn i -> Integer.parse(i) end) - |> Enum.map(fn {i, _} -> i end) - - range = {range_start, range_end} - + range = range_from_headers(conn, blob) + # when range is nil , upload blob chunk routes to stream upload case MusterApi.RegistryService.upload_blob_chunk(namespace, name, location, range, blob) do {:error, :illegal_chunk_sequence} -> conn |> send_resp(416, "") - - %{location: _blob_location} -> + %{location: location_id, range: range} -> location = MusterApi.Router.Helpers.registry_path(conn, :upload_blob, namespace, name, location) - conn |> put_resp_header("Location", location) + |> put_resp_header("Docker-Upload-UUID", location_id) + |> put_resp_header("Range", "0-#{range |> to_string}") + # |> log_response() |> send_resp(202, "") end end + defp log_response(conn) do + r = conn |> to_string() + Logger.debug("Response: #{r}") + conn + end + def upload_manifest( %{:body_params => manifest} = conn, %{"namespace" => namespace, "name" => name, "reference" => reference} = _params @@ -129,7 +136,8 @@ defmodule MusterApi.RegistryController do |> put_resp_header("Manifest-Digest", digests) |> send_resp(201, "") - _ -> + error -> + Logger.error("Unable to upload manifest: #{error}") conn |> send_resp(400, "") end end @@ -145,9 +153,10 @@ defmodule MusterApi.RegistryController do conn, %{"namespace" => namespace, "name" => name, "digest" => digest} = _params ) do + Logger.debug("Checking blob exists? #{digest}") case MusterApi.RegistryService.blob_exists?(namespace, name, digest) do - false -> conn |> not_found - true -> conn |> render_json(%{}) + false -> conn |> not_found() + true -> conn |> send_resp(200, "") end end diff --git a/apps/muster_api/lib/muster_api/digest_plug.ex b/apps/muster_api/lib/muster_api/digest_plug.ex index f8b8f1c..4ae6a07 100644 --- a/apps/muster_api/lib/muster_api/digest_plug.ex +++ b/apps/muster_api/lib/muster_api/digest_plug.ex @@ -1,6 +1,6 @@ defmodule MusterApi.Plug.DigestPlug do def read_body(conn, opts) do - {:ok, body, conn} = Plug.Conn.read_body(conn, opts) + {:ok, body, conn} = Plug.Conn.read_body(conn, [length: 500000000]) digests = digest_content(body) conn = update_in(conn.assigns[:digests], fn _ -> digests end) {:ok, body, conn} diff --git a/apps/muster_api/lib/muster_api/endpoint.ex b/apps/muster_api/lib/muster_api/endpoint.ex index b9a7005..b29d94e 100644 --- a/apps/muster_api/lib/muster_api/endpoint.ex +++ b/apps/muster_api/lib/muster_api/endpoint.ex @@ -49,7 +49,7 @@ defmodule MusterApi.Endpoint do body_reader: {DigestPlug, :read_body, []} plug Plug.MethodOverride - plug Plug.Head + # plug Plug.Head plug Plug.Session, @session_options plug MusterApi.Router end diff --git a/apps/muster_api/lib/muster_api/router.ex b/apps/muster_api/lib/muster_api/router.ex index 0787bc4..733c2fa 100644 --- a/apps/muster_api/lib/muster_api/router.ex +++ b/apps/muster_api/lib/muster_api/router.ex @@ -26,8 +26,8 @@ defmodule MusterApi.Router do patch "/blobs/uploads/:location", RegistryController, :upload_blob_chunk put "/manifests/:reference", RegistryController, :upload_manifest - get "/blobs/:digest", RegistryController, :get_blob head "/blobs/:digest", RegistryController, :blob_exists? + get "/blobs/:digest", RegistryController, :get_blob head "/manifests/:reference", RegistryController, :manifest_exists? get "/manifests/:reference", RegistryController, :get_manifest diff --git a/config/config.exs b/config/config.exs index 00f6972..9c7a419 100644 --- a/config/config.exs +++ b/config/config.exs @@ -35,6 +35,7 @@ config :muster_api, MusterApi.Endpoint, # Configures Elixir's Logger config :logger, :console, + level: :debug, format: "$time $metadata[$level] $message\n", metadata: [:request_id] diff --git a/config/dev.exs b/config/dev.exs index 3df6b4f..369ee82 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1,7 +1,10 @@ use Mix.Config +config :muster, + storage_root: "muster_storage_dev" + config :muster_api, MusterApi.Endpoint, - http: [port: 4000], + http: [port: 4000, ip: {0, 0, 0, 0, 0, 0, 0, 0}], debug_errors: true, code_reloader: true, check_origin: false,