From afa664b527373a100e95c0d35d80a1263b3058f2 Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Wed, 22 Apr 2026 11:24:29 +0530 Subject: [PATCH 1/8] Refactor NewEntityAttributeProcessor and TabularAttributeResolver to use dependency injection for PostgresRepository --- opengin/core-api/engine/attribute_resolver.go | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/opengin/core-api/engine/attribute_resolver.go b/opengin/core-api/engine/attribute_resolver.go index 988c6894..0168160b 100644 --- a/opengin/core-api/engine/attribute_resolver.go +++ b/opengin/core-api/engine/attribute_resolver.go @@ -6,7 +6,7 @@ package engine import ( "context" "fmt" - dbcommons "lk/datafoundation/core-api/commons/db" + postgresrepository "lk/datafoundation/core-api/db/repository/postgres" pb "lk/datafoundation/core-api/lk/datafoundation/core-api" schema "lk/datafoundation/core-api/pkg/schema" storageinference "lk/datafoundation/core-api/pkg/storageinference" @@ -55,7 +55,12 @@ type EntityAttributeProcessor struct { } // NewEntityAttributeProcessor creates a new processor with all resolvers initialized -func NewEntityAttributeProcessor() *EntityAttributeProcessor { +func NewEntityAttributeProcessor(postgresRepo ...*postgresrepository.PostgresRepository) *EntityAttributeProcessor { + var repo *postgresrepository.PostgresRepository + if len(postgresRepo) > 0 { + repo = postgresRepo[0] + } + processor := &EntityAttributeProcessor{ resolvers: make(map[storageinference.StorageType]AttributeResolver), graphManager: NewGraphMetadataManager(), @@ -63,7 +68,9 @@ func NewEntityAttributeProcessor() *EntityAttributeProcessor { // Initialize all resolvers processor.resolvers[storageinference.GraphData] = &GraphAttributeResolver{} - processor.resolvers[storageinference.TabularData] = &TabularAttributeResolver{} + processor.resolvers[storageinference.TabularData] = &TabularAttributeResolver{ + repo: repo, + } processor.resolvers[storageinference.MapData] = &DocumentAttributeResolver{} // Initialize each resolver @@ -445,6 +452,7 @@ func (r *GraphAttributeResolver) DeleteResolve(ctx context.Context, entityID, at // TabularAttributeResolver handles tabular data structures with columns and rows type TabularAttributeResolver struct { BaseAttributeResolver + repo *postgresrepository.PostgresRepository } func (r *TabularAttributeResolver) CreateResolve(ctx context.Context, entityID, attrName string, value *pb.TimeBasedValue) *Result { @@ -465,17 +473,16 @@ func (r *TabularAttributeResolver) CreateResolve(ctx context.Context, entityID, fmt.Printf("Creating tabular attribute %s for entity %s (validated as tabular) from %v to %v\n", attrName, entityID, startDate, endDate) - repo, err := dbcommons.GetPostgresRepository(ctx) - if err != nil { + if r.repo == nil { return &Result{ Data: nil, Success: false, - Error: fmt.Errorf("failed to get Postgres repository: %v", err), + Error: fmt.Errorf("postgres repository is not configured"), } } // Initialize database tables if they don't exist - if err := repo.InitializeTables(ctx); err != nil { + if err := r.repo.InitializeTables(ctx); err != nil { return &Result{ Data: nil, Success: false, @@ -492,7 +499,7 @@ func (r *TabularAttributeResolver) CreateResolve(ctx context.Context, entityID, } } - err = repo.HandleTabularData(ctx, entityID, attrName, value, schemaInfo) + err = r.repo.HandleTabularData(ctx, entityID, attrName, value, schemaInfo) if err != nil { return &Result{ Data: nil, @@ -515,19 +522,18 @@ func (r *TabularAttributeResolver) ReadResolve(ctx context.Context, entityID, at // - Return tabular structure fmt.Printf("[TabularAttributeResolver.ReadResolve] Reading tabular attribute %s for entity %s with filters: %+v and fields: %+v\n", attrName, entityID, filters, fields) - repo, err := dbcommons.GetPostgresRepository(ctx) - if err != nil { + if r.repo == nil { return &Result{ Data: nil, Success: false, - Error: fmt.Errorf("failed to get Postgres repository: %v", err), + Error: fmt.Errorf("postgres repository is not configured"), } } // Look up the actual table name from entity_attributes table // The table name is UUID-based and stored during create operation var tableName string - err = repo.DB().QueryRowContext(ctx, + err := r.repo.DB().QueryRowContext(ctx, `SELECT table_name FROM entity_attributes WHERE entity_id = $1 AND attribute_name = $2`, entityID, attrName).Scan(&tableName) if err != nil { @@ -540,7 +546,7 @@ func (r *TabularAttributeResolver) ReadResolve(ctx context.Context, entityID, at log.Printf("[TabularAttributeResolver.ReadResolve] Found tableName: %s", tableName) // Use the GetData method from the repository to retrieve data with filters and fields - anyData, err := repo.GetData(ctx, tableName, filters, fields...) + anyData, err := r.repo.GetData(ctx, tableName, filters, fields...) if err != nil { return &Result{ Data: nil, From 0a46a549faba6be9358d406850ee7699cf827bb3 Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Wed, 22 Apr 2026 11:32:33 +0530 Subject: [PATCH 2/8] Inject postgresrepo to NewEntityAttributeProcessor and update tests --- opengin/core-api/cmd/server/service.go | 6 +-- .../engine/attribute_resolver_test.go | 44 +++++++++++++------ .../core-api/engine/graph_metadata_test.go | 2 +- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/opengin/core-api/cmd/server/service.go b/opengin/core-api/cmd/server/service.go index b0cd1cbd..970c51fa 100644 --- a/opengin/core-api/cmd/server/service.go +++ b/opengin/core-api/cmd/server/service.go @@ -66,7 +66,7 @@ func (s *Server) CreateEntity(ctx context.Context, req *pb.Entity) (*pb.Entity, } // Handle attributes - processor := engine.NewEntityAttributeProcessor() + processor := engine.NewEntityAttributeProcessor(s.postgresRepo) attributeResults := processor.ProcessEntityAttributes(ctx, req, "create", nil) // Check if any attributes failed @@ -173,7 +173,7 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb log.Printf("[server.ReadEntity] Processing attributes for entity: %s, attributes: %+v", req.Entity.Id, req.Entity.Attributes) // Use the EntityAttributeProcessor to read and process attributes - processor := engine.NewEntityAttributeProcessor() + processor := engine.NewEntityAttributeProcessor(s.postgresRepo) // Extract fields and record filters from the request attributes based on storage type fields, recordFilters := extractFieldsFromAttributes(req.Entity.Attributes) @@ -263,7 +263,7 @@ func (s *Server) UpdateEntity(ctx context.Context, req *pb.UpdateEntityRequest) } // Handle attributes - processor := engine.NewEntityAttributeProcessor() + processor := engine.NewEntityAttributeProcessor(s.postgresRepo) // Note that in the perspective of the attribute this is a creation operation // The entity is already there but here the attribute is set later. // There is no alignment of update operation with the attribute. diff --git a/opengin/core-api/engine/attribute_resolver_test.go b/opengin/core-api/engine/attribute_resolver_test.go index a8478852..19005da7 100644 --- a/opengin/core-api/engine/attribute_resolver_test.go +++ b/opengin/core-api/engine/attribute_resolver_test.go @@ -11,6 +11,7 @@ import ( "lk/datafoundation/core-api/commons" dbcommons "lk/datafoundation/core-api/commons/db" + postgresrepository "lk/datafoundation/core-api/db/repository/postgres" pb "lk/datafoundation/core-api/lk/datafoundation/core-api" "lk/datafoundation/core-api/pkg/schema" "lk/datafoundation/core-api/pkg/storageinference" @@ -73,6 +74,21 @@ func saveEntityToDatabase(ctx context.Context, entity *pb.Entity) error { return nil } +func postgresRepoOrSkip(t *testing.T) *postgresrepository.PostgresRepository { + t.Helper() + + repo, err := dbcommons.GetPostgresRepository(context.Background()) + if err != nil { + t.Skipf("skipping test: postgres repository unavailable: %v", err) + } + + t.Cleanup(func() { + _ = repo.Close() + }) + + return repo +} + // getOptionsForOperation returns appropriate options for each operation type func getOptionsForOperation(operation string) *Options { switch operation { @@ -112,7 +128,7 @@ func TestEntityWithGraphDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) // Test all CORE operations // create test merely checks if the ProcessEntityAttributes function is working @@ -154,7 +170,7 @@ func TestEntityWithTabularDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) // Test all CORE operations // TODO: "read", "update", "delete" @@ -205,7 +221,7 @@ func TestEntityWithDocumentDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) // Test all CORE operations operations := []string{"create", "read", "update", "delete"} @@ -264,7 +280,7 @@ func TestEntityWithMixedDataTypes(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) // Test all CORE operations // TODO: "read", "update", "delete" @@ -310,7 +326,7 @@ func TestComplexGraphEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) ctx := context.Background() // save parent entity to the database @@ -353,7 +369,7 @@ func TestComplexTabularEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) ctx := context.Background() // save parent entity to the database @@ -423,7 +439,7 @@ func TestComplexDocumentEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) ctx := context.Background() // save parent entity to the database @@ -487,7 +503,7 @@ func TestEntityWithMultipleAttributesOfSameType(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) ctx := context.Background() // save parent entity to the database @@ -549,7 +565,7 @@ func TestStorageTypeDetection(t *testing.T) { }, } - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { @@ -575,7 +591,7 @@ func TestEmptyEntity(t *testing.T) { Attributes: make(map[string]*pb.TimeBasedValueList), } - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) ctx := context.Background() // Test all CORE operations @@ -596,7 +612,7 @@ func TestEmptyEntity(t *testing.T) { // TestNilEntity tests handling of nil entity func TestNilEntity(t *testing.T) { - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) ctx := context.Background() // Test all CORE operations @@ -628,7 +644,7 @@ func TestInvalidOperation(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) ctx := context.Background() options := getOptionsForOperation("invalid_operation") @@ -654,7 +670,7 @@ func TestUnsupportedStorageType(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) ctx := context.Background() // save parent entity to the database @@ -683,7 +699,7 @@ func TestUnsupportedStorageType(t *testing.T) { // TestBasicFunctionality tests basic functionality of the attribute resolver func TestBasicFunctionality(t *testing.T) { // Test that we can create a processor - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(nil) assert.NotNil(t, processor) assert.NotNil(t, processor.resolvers) diff --git a/opengin/core-api/engine/graph_metadata_test.go b/opengin/core-api/engine/graph_metadata_test.go index 4fae4547..59e16039 100644 --- a/opengin/core-api/engine/graph_metadata_test.go +++ b/opengin/core-api/engine/graph_metadata_test.go @@ -134,7 +134,7 @@ func TestGraphMetadataIntegration(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor() + processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) ctx := context.Background() // save the parent entity in the database From 30b6830d6bb4019d2cf756f02fc7de5ca5c8e800 Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Wed, 22 Apr 2026 11:33:40 +0530 Subject: [PATCH 3/8] Enhance GetPostgresRepository documentation to recommend using long-lived server-injected repositories for improved performance. --- opengin/core-api/commons/db/utils.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opengin/core-api/commons/db/utils.go b/opengin/core-api/commons/db/utils.go index 3cb0815f..714eed83 100644 --- a/opengin/core-api/commons/db/utils.go +++ b/opengin/core-api/commons/db/utils.go @@ -61,7 +61,8 @@ func GetPostgresConfig() postgresrepository.Config { } } -// GetPostgresRepository retrieves a Postgres repository +// GetPostgresRepository retrieves a Postgres repository. +// Prefer using the long-lived server-injected repository in request paths to avoid creating new pools per operation. func GetPostgresRepository(ctx context.Context) (*postgresrepository.PostgresRepository, error) { cfg := GetPostgresConfig() repo, err := postgresrepository.NewPostgresRepository(cfg) From dcfaacd7d22297a825ebc39d4c60055e07aff0f9 Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Wed, 22 Apr 2026 12:07:59 +0530 Subject: [PATCH 4/8] Refactor GraphMetadataManager to use dependency injection for Neo4j and Mongo repositories --- opengin/core-api/engine/attribute_resolver.go | 2 +- .../core-api/engine/graph_metadata_manager.go | 54 ++++++++++++++----- .../core-api/engine/graph_metadata_test.go | 2 +- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/opengin/core-api/engine/attribute_resolver.go b/opengin/core-api/engine/attribute_resolver.go index 0168160b..7f8a66dd 100644 --- a/opengin/core-api/engine/attribute_resolver.go +++ b/opengin/core-api/engine/attribute_resolver.go @@ -63,7 +63,7 @@ func NewEntityAttributeProcessor(postgresRepo ...*postgresrepository.PostgresRep processor := &EntityAttributeProcessor{ resolvers: make(map[storageinference.StorageType]AttributeResolver), - graphManager: NewGraphMetadataManager(), + graphManager: NewGraphMetadataManager(nil, nil), } // Initialize all resolvers diff --git a/opengin/core-api/engine/graph_metadata_manager.go b/opengin/core-api/engine/graph_metadata_manager.go index 62305b7c..5b73c41f 100644 --- a/opengin/core-api/engine/graph_metadata_manager.go +++ b/opengin/core-api/engine/graph_metadata_manager.go @@ -11,7 +11,8 @@ import ( "time" "lk/datafoundation/core-api/commons" - dbcommons "lk/datafoundation/core-api/commons/db" + mongorepository "lk/datafoundation/core-api/db/repository/mongo" + neo4jrepository "lk/datafoundation/core-api/db/repository/neo4j" pb "lk/datafoundation/core-api/lk/datafoundation/core-api" "lk/datafoundation/core-api/pkg/storageinference" @@ -38,13 +39,30 @@ const IS_ATTRIBUTE_RELATIONSHIP_DIRECTION = "OUTGOING" // GraphMetadataManager handles the reference graph for tracking attributes type GraphMetadataManager struct { - // This would typically connect to Neo4j or another graph database - // For now, we'll define the interface and structure + neo4jRepository *neo4jrepository.Neo4jRepository + mongoRepository *mongorepository.MongoRepository } -// NewGraphMetadataManager creates a new graph metadata manager -func NewGraphMetadataManager() *GraphMetadataManager { - return &GraphMetadataManager{} +// NewGraphMetadataManager creates a new graph metadata manager. +func NewGraphMetadataManager(neo4jRepo *neo4jrepository.Neo4jRepository, mongoRepo *mongorepository.MongoRepository) *GraphMetadataManager { + return &GraphMetadataManager{ + neo4jRepository: neo4jRepo, + mongoRepository: mongoRepo, + } +} + +func (g *GraphMetadataManager) requireNeo4jRepository() (*neo4jrepository.Neo4jRepository, error) { + if g.neo4jRepository == nil { + return nil, fmt.Errorf("neo4j repository is not configured") + } + return g.neo4jRepository, nil +} + +func (g *GraphMetadataManager) requireMongoRepository() (*mongorepository.MongoRepository, error) { + if g.mongoRepository == nil { + return nil, fmt.Errorf("mongo repository is not configured") + } + return g.mongoRepository, nil } // AttributeMetadata represents metadata for an attribute in the graph @@ -153,7 +171,7 @@ func (g *GraphMetadataManager) createAttributeLookUpGraph(ctx context.Context, m }, } - neo4jRepository, err := dbcommons.GetNeo4jRepository(ctx) + neo4jRepository, err := g.requireNeo4jRepository() if err != nil { log.Printf("[GraphMetadataManager.CreateAttribute] Error getting Neo4j repository: %v", err) return err @@ -187,7 +205,11 @@ func (g *GraphMetadataManager) createAttributeLookUpGraph(ctx context.Context, m // create the attribute metadata in the mongo database // stored parameters: attribute_id, attribute_name, storage_type, storage_path, updated, schema - mongoRepository := dbcommons.GetMongoRepository(ctx) + mongoRepository, err := g.requireMongoRepository() + if err != nil { + log.Printf("[GraphMetadataManager.CreateAttribute] Error getting Mongo repository: %v", err) + return err + } // Check if the attribute metadata already exists existingMetadata, err := mongoRepository.ReadEntity(ctx, metadata.AttributeID) @@ -248,7 +270,7 @@ func MakeRelationshipFromAttributeMetadata(metadata *AttributeMetadata) *pb.Rela func (g *GraphMetadataManager) GetAttribute(ctx context.Context, entityID string, attributeName string, startTime time.Time) (*AttributeMetadata, error) { fmt.Printf("Getting attribute metadata: EntityID=%s, AttributeName=%s\n", entityID, attributeName) - neo4jRepository, err := dbcommons.GetNeo4jRepository(ctx) + neo4jRepository, err := g.requireNeo4jRepository() if err != nil { log.Printf("[GraphMetadataManager.GetAttribute] Error getting Neo4j repository: %v", err) return nil, err @@ -302,7 +324,11 @@ func (g *GraphMetadataManager) GetAttribute(ctx context.Context, entityID string } // Get the attribute metadata from MongoDB - mongoRepository := dbcommons.GetMongoRepository(ctx) + mongoRepository, err := g.requireMongoRepository() + if err != nil { + log.Printf("[GraphMetadataManager.GetAttribute] Error getting Mongo repository: %v", err) + return nil, err + } attributeMetadataEntity, err := mongoRepository.ReadEntity(ctx, targetAttributeID) if err != nil { log.Printf("[GraphMetadataManager.GetAttribute] Error getting attribute metadata from MongoDB for attribute %s (entity %s): %v", targetAttributeID, entityID, err) @@ -342,7 +368,7 @@ func (g *GraphMetadataManager) GetAttribute(ctx context.Context, entityID string func (g *GraphMetadataManager) ListAttributes(ctx context.Context, entityID string) ([]*AttributeMetadata, error) { fmt.Printf("Listing attributes for entity: %s\n", entityID) - neo4jRepository, err := dbcommons.GetNeo4jRepository(ctx) + neo4jRepository, err := g.requireNeo4jRepository() if err != nil { log.Printf("[GraphMetadataManager.ListAttributes] Error getting Neo4j repository: %v", err) return nil, err @@ -374,7 +400,11 @@ func (g *GraphMetadataManager) ListAttributes(ctx context.Context, entityID stri attributeNameStr := commons.ExtractStringFromAny(attributeName.Value) // Get the attribute metadata from the mongo database - mongoRepository := dbcommons.GetMongoRepository(ctx) + mongoRepository, err := g.requireMongoRepository() + if err != nil { + log.Printf("[GraphMetadataManager.ListAttributes] Error getting Mongo repository: %v", err) + return nil, err + } attributeMetadataEntity, err := mongoRepository.ReadEntity(ctx, attributeID) if err != nil { log.Printf("[GraphMetadataManager.ListAttributes] Error getting attribute metadata from MongoDB for attribute %s (entity %s): %v", attributeID, entityID, err) diff --git a/opengin/core-api/engine/graph_metadata_test.go b/opengin/core-api/engine/graph_metadata_test.go index 59e16039..26b02d4c 100644 --- a/opengin/core-api/engine/graph_metadata_test.go +++ b/opengin/core-api/engine/graph_metadata_test.go @@ -15,7 +15,7 @@ import ( // TestGraphMetadataManager tests the graph metadata manager functionality func TestGraphMetadataManager(t *testing.T) { - manager := NewGraphMetadataManager() + manager := NewGraphMetadataManager(nil, nil) assert.NotNil(t, manager) ctx := context.Background() From 7f901f8bb8701192e1f07c549a1d161ae584e35c Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Wed, 22 Apr 2026 12:18:30 +0530 Subject: [PATCH 5/8] Refactor NewEntityAttributeProcessor to use structured dependency injection for Postgres, Neo4j, and Mongo repositories; update related tests accordingly. --- opengin/core-api/cmd/server/service.go | 18 ++++++++++-- opengin/core-api/engine/attribute_resolver.go | 20 +++++++------ .../engine/attribute_resolver_test.go | 28 +++++++++---------- .../core-api/engine/graph_metadata_test.go | 2 +- 4 files changed, 42 insertions(+), 26 deletions(-) diff --git a/opengin/core-api/cmd/server/service.go b/opengin/core-api/cmd/server/service.go index 970c51fa..3da7264d 100644 --- a/opengin/core-api/cmd/server/service.go +++ b/opengin/core-api/cmd/server/service.go @@ -66,7 +66,11 @@ func (s *Server) CreateEntity(ctx context.Context, req *pb.Entity) (*pb.Entity, } // Handle attributes - processor := engine.NewEntityAttributeProcessor(s.postgresRepo) + processor := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ + PostgresRepo: s.postgresRepo, + Neo4jRepo: s.neo4jRepo, + MongoRepo: s.mongoRepo, + }) attributeResults := processor.ProcessEntityAttributes(ctx, req, "create", nil) // Check if any attributes failed @@ -173,7 +177,11 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb log.Printf("[server.ReadEntity] Processing attributes for entity: %s, attributes: %+v", req.Entity.Id, req.Entity.Attributes) // Use the EntityAttributeProcessor to read and process attributes - processor := engine.NewEntityAttributeProcessor(s.postgresRepo) + processor := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ + PostgresRepo: s.postgresRepo, + Neo4jRepo: s.neo4jRepo, + MongoRepo: s.mongoRepo, + }) // Extract fields and record filters from the request attributes based on storage type fields, recordFilters := extractFieldsFromAttributes(req.Entity.Attributes) @@ -263,7 +271,11 @@ func (s *Server) UpdateEntity(ctx context.Context, req *pb.UpdateEntityRequest) } // Handle attributes - processor := engine.NewEntityAttributeProcessor(s.postgresRepo) + processor := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ + PostgresRepo: s.postgresRepo, + Neo4jRepo: s.neo4jRepo, + MongoRepo: s.mongoRepo, + }) // Note that in the perspective of the attribute this is a creation operation // The entity is already there but here the attribute is set later. // There is no alignment of update operation with the attribute. diff --git a/opengin/core-api/engine/attribute_resolver.go b/opengin/core-api/engine/attribute_resolver.go index 7f8a66dd..cc2dbfb2 100644 --- a/opengin/core-api/engine/attribute_resolver.go +++ b/opengin/core-api/engine/attribute_resolver.go @@ -6,6 +6,8 @@ package engine import ( "context" "fmt" + mongorepository "lk/datafoundation/core-api/db/repository/mongo" + neo4jrepository "lk/datafoundation/core-api/db/repository/neo4j" postgresrepository "lk/datafoundation/core-api/db/repository/postgres" pb "lk/datafoundation/core-api/lk/datafoundation/core-api" schema "lk/datafoundation/core-api/pkg/schema" @@ -54,22 +56,24 @@ type EntityAttributeProcessor struct { graphManager *GraphMetadataManager } -// NewEntityAttributeProcessor creates a new processor with all resolvers initialized -func NewEntityAttributeProcessor(postgresRepo ...*postgresrepository.PostgresRepository) *EntityAttributeProcessor { - var repo *postgresrepository.PostgresRepository - if len(postgresRepo) > 0 { - repo = postgresRepo[0] - } +// ProcessorDependencies contains database repositories used by the attribute processor. +type ProcessorDependencies struct { + PostgresRepo *postgresrepository.PostgresRepository + Neo4jRepo *neo4jrepository.Neo4jRepository + MongoRepo *mongorepository.MongoRepository +} +// NewEntityAttributeProcessor creates a processor with explicit repository dependencies. +func NewEntityAttributeProcessor(deps ProcessorDependencies) *EntityAttributeProcessor { processor := &EntityAttributeProcessor{ resolvers: make(map[storageinference.StorageType]AttributeResolver), - graphManager: NewGraphMetadataManager(nil, nil), + graphManager: NewGraphMetadataManager(deps.Neo4jRepo, deps.MongoRepo), } // Initialize all resolvers processor.resolvers[storageinference.GraphData] = &GraphAttributeResolver{} processor.resolvers[storageinference.TabularData] = &TabularAttributeResolver{ - repo: repo, + repo: deps.PostgresRepo, } processor.resolvers[storageinference.MapData] = &DocumentAttributeResolver{} diff --git a/opengin/core-api/engine/attribute_resolver_test.go b/opengin/core-api/engine/attribute_resolver_test.go index 19005da7..71b0c8dd 100644 --- a/opengin/core-api/engine/attribute_resolver_test.go +++ b/opengin/core-api/engine/attribute_resolver_test.go @@ -128,7 +128,7 @@ func TestEntityWithGraphDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) // Test all CORE operations // create test merely checks if the ProcessEntityAttributes function is working @@ -170,7 +170,7 @@ func TestEntityWithTabularDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) + processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) // Test all CORE operations // TODO: "read", "update", "delete" @@ -221,7 +221,7 @@ func TestEntityWithDocumentDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) // Test all CORE operations operations := []string{"create", "read", "update", "delete"} @@ -280,7 +280,7 @@ func TestEntityWithMixedDataTypes(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) + processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) // Test all CORE operations // TODO: "read", "update", "delete" @@ -326,7 +326,7 @@ func TestComplexGraphEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) ctx := context.Background() // save parent entity to the database @@ -369,7 +369,7 @@ func TestComplexTabularEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) + processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) ctx := context.Background() // save parent entity to the database @@ -439,7 +439,7 @@ func TestComplexDocumentEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) ctx := context.Background() // save parent entity to the database @@ -503,7 +503,7 @@ func TestEntityWithMultipleAttributesOfSameType(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) + processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) ctx := context.Background() // save parent entity to the database @@ -565,7 +565,7 @@ func TestStorageTypeDetection(t *testing.T) { }, } - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { @@ -591,7 +591,7 @@ func TestEmptyEntity(t *testing.T) { Attributes: make(map[string]*pb.TimeBasedValueList), } - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) ctx := context.Background() // Test all CORE operations @@ -612,7 +612,7 @@ func TestEmptyEntity(t *testing.T) { // TestNilEntity tests handling of nil entity func TestNilEntity(t *testing.T) { - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) ctx := context.Background() // Test all CORE operations @@ -644,7 +644,7 @@ func TestInvalidOperation(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) ctx := context.Background() options := getOptionsForOperation("invalid_operation") @@ -670,7 +670,7 @@ func TestUnsupportedStorageType(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) ctx := context.Background() // save parent entity to the database @@ -699,7 +699,7 @@ func TestUnsupportedStorageType(t *testing.T) { // TestBasicFunctionality tests basic functionality of the attribute resolver func TestBasicFunctionality(t *testing.T) { // Test that we can create a processor - processor := NewEntityAttributeProcessor(nil) + processor := NewEntityAttributeProcessor(ProcessorDependencies{}) assert.NotNil(t, processor) assert.NotNil(t, processor.resolvers) diff --git a/opengin/core-api/engine/graph_metadata_test.go b/opengin/core-api/engine/graph_metadata_test.go index 26b02d4c..a0d21687 100644 --- a/opengin/core-api/engine/graph_metadata_test.go +++ b/opengin/core-api/engine/graph_metadata_test.go @@ -134,7 +134,7 @@ func TestGraphMetadataIntegration(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(postgresRepoOrSkip(t)) + processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) ctx := context.Background() // save the parent entity in the database From 4776265821207c01443bfd9c6c85b2e6de43c3cd Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Wed, 22 Apr 2026 13:13:49 +0530 Subject: [PATCH 6/8] Enhance documentation for GetNeo4jRepository and GetMongoRepository to recommend using long-lived server-injected repositories for improved performance. --- opengin/core-api/commons/db/utils.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/opengin/core-api/commons/db/utils.go b/opengin/core-api/commons/db/utils.go index 714eed83..1c2219c1 100644 --- a/opengin/core-api/commons/db/utils.go +++ b/opengin/core-api/commons/db/utils.go @@ -32,7 +32,8 @@ func GetMongoConfig() *config.MongoConfig { } } -// GetNeo4jRepository retrieves a Neo4j repository +// GetNeo4jRepository retrieves a Neo4j repository. +// Prefer using the long-lived server-injected repository in request paths to avoid creating new clients per operation. func GetNeo4jRepository(ctx context.Context) (*neo4jrepository.Neo4jRepository, error) { cfg := GetNeo4jConfig() repo, err := neo4jrepository.NewNeo4jRepository(ctx, cfg) @@ -42,7 +43,8 @@ func GetNeo4jRepository(ctx context.Context) (*neo4jrepository.Neo4jRepository, return repo, nil } -// GetMongoRepository retrieves a Mongo repository +// GetMongoRepository retrieves a Mongo repository. +// Prefer using the long-lived server-injected repository in request paths to avoid creating new clients per operation. // TODO: Handle errors better func GetMongoRepository(ctx context.Context) *mongorepository.MongoRepository { cfg := GetMongoConfig() From 4139d58793209adcfaeb9ba25998b3ef1e492d07 Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Wed, 22 Apr 2026 14:03:33 +0530 Subject: [PATCH 7/8] Refactor NewEntityAttributeProcessor to return an error on initialization failure; update service methods and tests to handle errors appropriately. --- opengin/core-api/cmd/server/service.go | 15 ++- opengin/core-api/engine/attribute_resolver.go | 14 ++- .../engine/attribute_resolver_test.go | 103 ++++++++++++------ .../core-api/engine/graph_metadata_test.go | 7 +- 4 files changed, 99 insertions(+), 40 deletions(-) diff --git a/opengin/core-api/cmd/server/service.go b/opengin/core-api/cmd/server/service.go index 3da7264d..299a44cc 100644 --- a/opengin/core-api/cmd/server/service.go +++ b/opengin/core-api/cmd/server/service.go @@ -66,11 +66,14 @@ func (s *Server) CreateEntity(ctx context.Context, req *pb.Entity) (*pb.Entity, } // Handle attributes - processor := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ + processor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ PostgresRepo: s.postgresRepo, Neo4jRepo: s.neo4jRepo, MongoRepo: s.mongoRepo, }) + if err != nil { + return nil, fmt.Errorf("failed to initialize attribute processor: %v", err) + } attributeResults := processor.ProcessEntityAttributes(ctx, req, "create", nil) // Check if any attributes failed @@ -177,11 +180,14 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb log.Printf("[server.ReadEntity] Processing attributes for entity: %s, attributes: %+v", req.Entity.Id, req.Entity.Attributes) // Use the EntityAttributeProcessor to read and process attributes - processor := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ + processor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ PostgresRepo: s.postgresRepo, Neo4jRepo: s.neo4jRepo, MongoRepo: s.mongoRepo, }) + if err != nil { + return nil, fmt.Errorf("failed to initialize attribute processor: %v", err) + } // Extract fields and record filters from the request attributes based on storage type fields, recordFilters := extractFieldsFromAttributes(req.Entity.Attributes) @@ -271,11 +277,14 @@ func (s *Server) UpdateEntity(ctx context.Context, req *pb.UpdateEntityRequest) } // Handle attributes - processor := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ + processor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ PostgresRepo: s.postgresRepo, Neo4jRepo: s.neo4jRepo, MongoRepo: s.mongoRepo, }) + if err != nil { + return nil, fmt.Errorf("failed to initialize attribute processor: %v", err) + } // Note that in the perspective of the attribute this is a creation operation // The entity is already there but here the attribute is set later. // There is no alignment of update operation with the attribute. diff --git a/opengin/core-api/engine/attribute_resolver.go b/opengin/core-api/engine/attribute_resolver.go index cc2dbfb2..457b8985 100644 --- a/opengin/core-api/engine/attribute_resolver.go +++ b/opengin/core-api/engine/attribute_resolver.go @@ -64,7 +64,17 @@ type ProcessorDependencies struct { } // NewEntityAttributeProcessor creates a processor with explicit repository dependencies. -func NewEntityAttributeProcessor(deps ProcessorDependencies) *EntityAttributeProcessor { +func NewEntityAttributeProcessor(deps ProcessorDependencies) (*EntityAttributeProcessor, error) { + if deps.PostgresRepo == nil { + return nil, fmt.Errorf("postgres repository is required") + } + if deps.Neo4jRepo == nil { + return nil, fmt.Errorf("neo4j repository is required") + } + if deps.MongoRepo == nil { + return nil, fmt.Errorf("mongo repository is required") + } + processor := &EntityAttributeProcessor{ resolvers: make(map[storageinference.StorageType]AttributeResolver), graphManager: NewGraphMetadataManager(deps.Neo4jRepo, deps.MongoRepo), @@ -84,7 +94,7 @@ func NewEntityAttributeProcessor(deps ProcessorDependencies) *EntityAttributePro } } - return processor + return processor, nil } // GetResolver returns the resolver for a specific storage type diff --git a/opengin/core-api/engine/attribute_resolver_test.go b/opengin/core-api/engine/attribute_resolver_test.go index 71b0c8dd..814ddc6a 100644 --- a/opengin/core-api/engine/attribute_resolver_test.go +++ b/opengin/core-api/engine/attribute_resolver_test.go @@ -6,11 +6,14 @@ package engine import ( "context" "fmt" + "os" "testing" "time" "lk/datafoundation/core-api/commons" dbcommons "lk/datafoundation/core-api/commons/db" + mongorepository "lk/datafoundation/core-api/db/repository/mongo" + neo4jrepository "lk/datafoundation/core-api/db/repository/neo4j" postgresrepository "lk/datafoundation/core-api/db/repository/postgres" pb "lk/datafoundation/core-api/lk/datafoundation/core-api" "lk/datafoundation/core-api/pkg/schema" @@ -19,6 +22,56 @@ import ( "github.com/stretchr/testify/assert" ) +var ( + testNeo4jRepo *neo4jrepository.Neo4jRepository + testMongoRepo *mongorepository.MongoRepository + testPostgresRepo *postgresrepository.PostgresRepository + testProcessor *EntityAttributeProcessor +) + +func TestMain(m *testing.M) { + ctx := context.Background() + + if os.Getenv("MONGO_URI") == "" { + fmt.Fprintln(os.Stderr, "MONGO_URI is required for engine tests") + os.Exit(1) + } + + var err error + testNeo4jRepo, err = dbcommons.GetNeo4jRepository(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to initialize Neo4j repository: %v\n", err) + os.Exit(1) + } + + testPostgresRepo, err = dbcommons.GetPostgresRepository(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to initialize Postgres repository: %v\n", err) + testNeo4jRepo.Close(ctx) + os.Exit(1) + } + + testMongoRepo = dbcommons.GetMongoRepository(ctx) + + testProcessor, err = NewEntityAttributeProcessor(ProcessorDependencies{ + PostgresRepo: testPostgresRepo, + Neo4jRepo: testNeo4jRepo, + MongoRepo: testMongoRepo, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to initialize attribute processor: %v\n", err) + _ = testPostgresRepo.Close() + testNeo4jRepo.Close(ctx) + os.Exit(1) + } + + exitCode := m.Run() + + _ = testPostgresRepo.Close() + testNeo4jRepo.Close(ctx) + os.Exit(exitCode) +} + // createTimeBasedValue creates a TimeBasedValue with the given JSON data func createTimeBasedValue(jsonStr string) (*pb.TimeBasedValue, error) { anyValue, err := schema.JSONToAny(jsonStr) @@ -61,12 +114,11 @@ func createEntityWithAttributes(entityID string, entityName string, attributes m } func saveEntityToDatabase(ctx context.Context, entity *pb.Entity) error { - neo4jRepository, err := dbcommons.GetNeo4jRepository(ctx) - if err != nil { - return fmt.Errorf("failed to get Neo4j repository: %w", err) + if testNeo4jRepo == nil { + return fmt.Errorf("neo4j repository is not initialized") } - success, err := neo4jRepository.HandleGraphEntityCreation(ctx, entity) + success, err := testNeo4jRepo.HandleGraphEntityCreation(ctx, entity) if !success { return fmt.Errorf("failed to save entity: %w", err) } @@ -74,21 +126,6 @@ func saveEntityToDatabase(ctx context.Context, entity *pb.Entity) error { return nil } -func postgresRepoOrSkip(t *testing.T) *postgresrepository.PostgresRepository { - t.Helper() - - repo, err := dbcommons.GetPostgresRepository(context.Background()) - if err != nil { - t.Skipf("skipping test: postgres repository unavailable: %v", err) - } - - t.Cleanup(func() { - _ = repo.Close() - }) - - return repo -} - // getOptionsForOperation returns appropriate options for each operation type func getOptionsForOperation(operation string) *Options { switch operation { @@ -128,7 +165,7 @@ func TestEntityWithGraphDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor // Test all CORE operations // create test merely checks if the ProcessEntityAttributes function is working @@ -170,7 +207,7 @@ func TestEntityWithTabularDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) + processor := testProcessor // Test all CORE operations // TODO: "read", "update", "delete" @@ -221,7 +258,7 @@ func TestEntityWithDocumentDataOnly(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor // Test all CORE operations operations := []string{"create", "read", "update", "delete"} @@ -280,7 +317,7 @@ func TestEntityWithMixedDataTypes(t *testing.T) { err = saveEntityToDatabase(ctx, entity) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) + processor := testProcessor // Test all CORE operations // TODO: "read", "update", "delete" @@ -326,7 +363,7 @@ func TestComplexGraphEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor ctx := context.Background() // save parent entity to the database @@ -369,7 +406,7 @@ func TestComplexTabularEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) + processor := testProcessor ctx := context.Background() // save parent entity to the database @@ -439,7 +476,7 @@ func TestComplexDocumentEntity(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor ctx := context.Background() // save parent entity to the database @@ -503,7 +540,7 @@ func TestEntityWithMultipleAttributesOfSameType(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) + processor := testProcessor ctx := context.Background() // save parent entity to the database @@ -565,7 +602,7 @@ func TestStorageTypeDetection(t *testing.T) { }, } - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { @@ -591,7 +628,7 @@ func TestEmptyEntity(t *testing.T) { Attributes: make(map[string]*pb.TimeBasedValueList), } - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor ctx := context.Background() // Test all CORE operations @@ -612,7 +649,7 @@ func TestEmptyEntity(t *testing.T) { // TestNilEntity tests handling of nil entity func TestNilEntity(t *testing.T) { - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor ctx := context.Background() // Test all CORE operations @@ -644,7 +681,7 @@ func TestInvalidOperation(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor ctx := context.Background() options := getOptionsForOperation("invalid_operation") @@ -670,7 +707,7 @@ func TestUnsupportedStorageType(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor ctx := context.Background() // save parent entity to the database @@ -699,7 +736,7 @@ func TestUnsupportedStorageType(t *testing.T) { // TestBasicFunctionality tests basic functionality of the attribute resolver func TestBasicFunctionality(t *testing.T) { // Test that we can create a processor - processor := NewEntityAttributeProcessor(ProcessorDependencies{}) + processor := testProcessor assert.NotNil(t, processor) assert.NotNil(t, processor.resolvers) diff --git a/opengin/core-api/engine/graph_metadata_test.go b/opengin/core-api/engine/graph_metadata_test.go index a0d21687..9fce4bb6 100644 --- a/opengin/core-api/engine/graph_metadata_test.go +++ b/opengin/core-api/engine/graph_metadata_test.go @@ -15,7 +15,10 @@ import ( // TestGraphMetadataManager tests the graph metadata manager functionality func TestGraphMetadataManager(t *testing.T) { - manager := NewGraphMetadataManager(nil, nil) + if testNeo4jRepo == nil || testMongoRepo == nil { + t.Fatal("test repositories are not initialized") + } + manager := NewGraphMetadataManager(testNeo4jRepo, testMongoRepo) assert.NotNil(t, manager) ctx := context.Background() @@ -134,7 +137,7 @@ func TestGraphMetadataIntegration(t *testing.T) { }) assert.NoError(t, err) - processor := NewEntityAttributeProcessor(ProcessorDependencies{PostgresRepo: postgresRepoOrSkip(t)}) + processor := testProcessor ctx := context.Background() // save the parent entity in the database From 056344e6b9bea26d03b5f0b4173484457aa9a06f Mon Sep 17 00:00:00 2001 From: zaeema-n Date: Mon, 27 Apr 2026 10:09:07 +0530 Subject: [PATCH 8/8] Refactor Server to utilize a single instance of EntityAttributeProcessor --- opengin/core-api/cmd/server/service.go | 55 ++++++++++---------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/opengin/core-api/cmd/server/service.go b/opengin/core-api/cmd/server/service.go index 299a44cc..91215d85 100644 --- a/opengin/core-api/cmd/server/service.go +++ b/opengin/core-api/cmd/server/service.go @@ -27,9 +27,10 @@ import ( // Server implements the COREService type Server struct { pb.UnimplementedCOREServiceServer - mongoRepo *mongorepository.MongoRepository - neo4jRepo *neo4jrepository.Neo4jRepository - postgresRepo *postgres.PostgresRepository + mongoRepo *mongorepository.MongoRepository + neo4jRepo *neo4jrepository.Neo4jRepository + postgresRepo *postgres.PostgresRepository + attributeProcessor *engine.EntityAttributeProcessor } // CreateEntity handles entity creation with relationships, metadata and attributes @@ -66,15 +67,7 @@ func (s *Server) CreateEntity(ctx context.Context, req *pb.Entity) (*pb.Entity, } // Handle attributes - processor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ - PostgresRepo: s.postgresRepo, - Neo4jRepo: s.neo4jRepo, - MongoRepo: s.mongoRepo, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize attribute processor: %v", err) - } - attributeResults := processor.ProcessEntityAttributes(ctx, req, "create", nil) + attributeResults := s.attributeProcessor.ProcessEntityAttributes(ctx, req, "create", nil) // Check if any attributes failed for attrName, result := range attributeResults { @@ -179,16 +172,6 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb log.Printf("[server.ReadEntity] Processing attributes for entity: %s, attributes: %+v", req.Entity.Id, req.Entity.Attributes) - // Use the EntityAttributeProcessor to read and process attributes - processor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ - PostgresRepo: s.postgresRepo, - Neo4jRepo: s.neo4jRepo, - MongoRepo: s.mongoRepo, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize attribute processor: %v", err) - } - // Extract fields and record filters from the request attributes based on storage type fields, recordFilters := extractFieldsFromAttributes(req.Entity.Attributes) log.Printf("Extracted fields from attributes: %v", fields) @@ -202,7 +185,7 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb readOptions := engine.NewReadOptions(filtersMap, fields...) // Process the entity with attributes to get the results map - attributeResults := processor.ProcessEntityAttributes(ctx, req.Entity, "read", readOptions) + attributeResults := s.attributeProcessor.ProcessEntityAttributes(ctx, req.Entity, "read", readOptions) log.Printf("[server.ReadEntity] Successfully processed attributes for entity: %s, results: %+v", req.Entity.Id, attributeResults) @@ -277,19 +260,11 @@ func (s *Server) UpdateEntity(ctx context.Context, req *pb.UpdateEntityRequest) } // Handle attributes - processor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ - PostgresRepo: s.postgresRepo, - Neo4jRepo: s.neo4jRepo, - MongoRepo: s.mongoRepo, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize attribute processor: %v", err) - } // Note that in the perspective of the attribute this is a creation operation // The entity is already there but here the attribute is set later. // There is no alignment of update operation with the attribute. // TODO: https://github.com/LDFLK/nexoan/issues/286 - attributeResults := processor.ProcessEntityAttributes(ctx, req.Entity, "create", nil) + attributeResults := s.attributeProcessor.ProcessEntityAttributes(ctx, req.Entity, "create", nil) // Check if any attributes failed for attrName, result := range attributeResults { @@ -641,6 +616,15 @@ func main() { } defer postgresRepo.Close() + attributeProcessor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{ + PostgresRepo: postgresRepo, + Neo4jRepo: neo4jRepo, + MongoRepo: mongoRepo, + }) + if err != nil { + log.Fatalf("[service.main] Failed to create attribute processor: %v", err) + } + listener, err := net.Listen("tcp", host+":"+port) if err != nil { log.Fatalf("[service.main] Failed to listen: %v", err) @@ -648,9 +632,10 @@ func main() { grpcServer := grpc.NewServer() server := &Server{ - mongoRepo: mongoRepo, - neo4jRepo: neo4jRepo, - postgresRepo: postgresRepo, + mongoRepo: mongoRepo, + neo4jRepo: neo4jRepo, + postgresRepo: postgresRepo, + attributeProcessor: attributeProcessor, } pb.RegisterCOREServiceServer(grpcServer, server)