Skip to content
Merged
34 changes: 20 additions & 14 deletions opengin/core-api/cmd/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
// Server implements the COREService
type Server struct {
pb.UnimplementedCOREServiceServer
mongoRepo *mongorepository.MongoRepository
neo4jRepo *neo4jrepository.Neo4jRepository
postgresRepo *postgres.PostgresRepository
mongoRepo *mongorepository.MongoRepository
neo4jRepo *neo4jrepository.Neo4jRepository
postgresRepo *postgres.PostgresRepository
attributeProcessor *engine.EntityAttributeProcessor
}

// CreateEntity handles entity creation with relationships, metadata and attributes
Expand Down Expand Up @@ -66,8 +67,7 @@ func (s *Server) CreateEntity(ctx context.Context, req *pb.Entity) (*pb.Entity,
}

// Handle attributes
processor := engine.NewEntityAttributeProcessor()
attributeResults := processor.ProcessEntityAttributes(ctx, req, "create", nil)
attributeResults := s.attributeProcessor.ProcessEntityAttributes(ctx, req, "create", nil)

// Check if any attributes failed
for attrName, result := range attributeResults {
Expand Down Expand Up @@ -172,9 +172,6 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb

log.Printf("[server.ReadEntity] Processing attributes for entity: %s, attributes: %+v", req.Entity.Id, req.Entity.Attributes)

// Use the EntityAttributeProcessor to read and process attributes
processor := engine.NewEntityAttributeProcessor()

// Extract fields and record filters from the request attributes based on storage type
fields, recordFilters := extractFieldsFromAttributes(req.Entity.Attributes)
log.Printf("Extracted fields from attributes: %v", fields)
Expand All @@ -188,7 +185,7 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb
readOptions := engine.NewReadOptions(filtersMap, fields...)

// Process the entity with attributes to get the results map
attributeResults := processor.ProcessEntityAttributes(ctx, req.Entity, "read", readOptions)
attributeResults := s.attributeProcessor.ProcessEntityAttributes(ctx, req.Entity, "read", readOptions)

log.Printf("[server.ReadEntity] Successfully processed attributes for entity: %s, results: %+v", req.Entity.Id, attributeResults)

Expand Down Expand Up @@ -263,12 +260,11 @@ func (s *Server) UpdateEntity(ctx context.Context, req *pb.UpdateEntityRequest)
}

// Handle attributes
processor := engine.NewEntityAttributeProcessor()
// Note that in the perspective of the attribute this is a creation operation
// The entity is already there but here the attribute is set later.
// There is no alignment of update operation with the attribute.
// TODO: https://github.com/LDFLK/nexoan/issues/286
attributeResults := processor.ProcessEntityAttributes(ctx, req.Entity, "create", nil)
attributeResults := s.attributeProcessor.ProcessEntityAttributes(ctx, req.Entity, "create", nil)

// Check if any attributes failed
for attrName, result := range attributeResults {
Expand Down Expand Up @@ -620,16 +616,26 @@ func main() {
}
defer postgresRepo.Close()

attributeProcessor, err := engine.NewEntityAttributeProcessor(engine.ProcessorDependencies{
PostgresRepo: postgresRepo,
Neo4jRepo: neo4jRepo,
MongoRepo: mongoRepo,
})
if err != nil {
log.Fatalf("[service.main] Failed to create attribute processor: %v", err)
}

listener, err := net.Listen("tcp", host+":"+port)
if err != nil {
log.Fatalf("[service.main] Failed to listen: %v", err)
}

grpcServer := grpc.NewServer()
server := &Server{
mongoRepo: mongoRepo,
neo4jRepo: neo4jRepo,
postgresRepo: postgresRepo,
mongoRepo: mongoRepo,
neo4jRepo: neo4jRepo,
postgresRepo: postgresRepo,
attributeProcessor: attributeProcessor,
}

pb.RegisterCOREServiceServer(grpcServer, server)
Expand Down
9 changes: 6 additions & 3 deletions opengin/core-api/commons/db/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func GetMongoConfig() *config.MongoConfig {
}
}

// GetNeo4jRepository retrieves a Neo4j repository
// GetNeo4jRepository retrieves a Neo4j repository.
// Prefer using the long-lived server-injected repository in request paths to avoid creating new clients per operation.
func GetNeo4jRepository(ctx context.Context) (*neo4jrepository.Neo4jRepository, error) {
cfg := GetNeo4jConfig()
repo, err := neo4jrepository.NewNeo4jRepository(ctx, cfg)
Expand All @@ -42,7 +43,8 @@ func GetNeo4jRepository(ctx context.Context) (*neo4jrepository.Neo4jRepository,
return repo, nil
}

// GetMongoRepository retrieves a Mongo repository
// GetMongoRepository retrieves a Mongo repository.
// Prefer using the long-lived server-injected repository in request paths to avoid creating new clients per operation.
// TODO: Handle errors better
func GetMongoRepository(ctx context.Context) *mongorepository.MongoRepository {
cfg := GetMongoConfig()
Expand All @@ -61,7 +63,8 @@ func GetPostgresConfig() postgresrepository.Config {
}
}

// GetPostgresRepository retrieves a Postgres repository
// GetPostgresRepository retrieves a Postgres repository.
// Prefer using the long-lived server-injected repository in request paths to avoid creating new pools per operation.
func GetPostgresRepository(ctx context.Context) (*postgresrepository.PostgresRepository, error) {
cfg := GetPostgresConfig()
repo, err := postgresrepository.NewPostgresRepository(cfg)
Expand Down
52 changes: 36 additions & 16 deletions opengin/core-api/engine/attribute_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package engine
import (
"context"
"fmt"
dbcommons "lk/datafoundation/core-api/commons/db"
mongorepository "lk/datafoundation/core-api/db/repository/mongo"
neo4jrepository "lk/datafoundation/core-api/db/repository/neo4j"
postgresrepository "lk/datafoundation/core-api/db/repository/postgres"
pb "lk/datafoundation/core-api/lk/datafoundation/core-api"
schema "lk/datafoundation/core-api/pkg/schema"
storageinference "lk/datafoundation/core-api/pkg/storageinference"
Expand Down Expand Up @@ -54,16 +56,35 @@ type EntityAttributeProcessor struct {
graphManager *GraphMetadataManager
}

// NewEntityAttributeProcessor creates a new processor with all resolvers initialized
func NewEntityAttributeProcessor() *EntityAttributeProcessor {
// ProcessorDependencies contains database repositories used by the attribute processor.
type ProcessorDependencies struct {
PostgresRepo *postgresrepository.PostgresRepository
Neo4jRepo *neo4jrepository.Neo4jRepository
MongoRepo *mongorepository.MongoRepository
}

// NewEntityAttributeProcessor creates a processor with explicit repository dependencies.
func NewEntityAttributeProcessor(deps ProcessorDependencies) (*EntityAttributeProcessor, error) {
if deps.PostgresRepo == nil {
return nil, fmt.Errorf("postgres repository is required")
}
if deps.Neo4jRepo == nil {
return nil, fmt.Errorf("neo4j repository is required")
}
if deps.MongoRepo == nil {
return nil, fmt.Errorf("mongo repository is required")
}

processor := &EntityAttributeProcessor{
resolvers: make(map[storageinference.StorageType]AttributeResolver),
graphManager: NewGraphMetadataManager(),
graphManager: NewGraphMetadataManager(deps.Neo4jRepo, deps.MongoRepo),
}

// Initialize all resolvers
processor.resolvers[storageinference.GraphData] = &GraphAttributeResolver{}
processor.resolvers[storageinference.TabularData] = &TabularAttributeResolver{}
processor.resolvers[storageinference.TabularData] = &TabularAttributeResolver{
repo: deps.PostgresRepo,
}
processor.resolvers[storageinference.MapData] = &DocumentAttributeResolver{}

// Initialize each resolver
Expand All @@ -73,7 +94,7 @@ func NewEntityAttributeProcessor() *EntityAttributeProcessor {
}
}

return processor
return processor, nil
}

// GetResolver returns the resolver for a specific storage type
Expand Down Expand Up @@ -445,6 +466,7 @@ func (r *GraphAttributeResolver) DeleteResolve(ctx context.Context, entityID, at
// TabularAttributeResolver handles tabular data structures with columns and rows
type TabularAttributeResolver struct {
BaseAttributeResolver
repo *postgresrepository.PostgresRepository
}

func (r *TabularAttributeResolver) CreateResolve(ctx context.Context, entityID, attrName string, value *pb.TimeBasedValue) *Result {
Expand All @@ -465,17 +487,16 @@ func (r *TabularAttributeResolver) CreateResolve(ctx context.Context, entityID,

fmt.Printf("Creating tabular attribute %s for entity %s (validated as tabular) from %v to %v\n", attrName, entityID, startDate, endDate)

repo, err := dbcommons.GetPostgresRepository(ctx)
if err != nil {
if r.repo == nil {
return &Result{
Data: nil,
Success: false,
Error: fmt.Errorf("failed to get Postgres repository: %v", err),
Error: fmt.Errorf("postgres repository is not configured"),
}
}

// Initialize database tables if they don't exist
if err := repo.InitializeTables(ctx); err != nil {
if err := r.repo.InitializeTables(ctx); err != nil {
Comment thread
zaeema-n marked this conversation as resolved.
return &Result{
Data: nil,
Success: false,
Expand All @@ -492,7 +513,7 @@ func (r *TabularAttributeResolver) CreateResolve(ctx context.Context, entityID,
}
}

err = repo.HandleTabularData(ctx, entityID, attrName, value, schemaInfo)
err = r.repo.HandleTabularData(ctx, entityID, attrName, value, schemaInfo)
if err != nil {
return &Result{
Data: nil,
Expand All @@ -515,19 +536,18 @@ func (r *TabularAttributeResolver) ReadResolve(ctx context.Context, entityID, at
// - Return tabular structure
fmt.Printf("[TabularAttributeResolver.ReadResolve] Reading tabular attribute %s for entity %s with filters: %+v and fields: %+v\n", attrName, entityID, filters, fields)

repo, err := dbcommons.GetPostgresRepository(ctx)
if err != nil {
if r.repo == nil {
return &Result{
Data: nil,
Success: false,
Error: fmt.Errorf("failed to get Postgres repository: %v", err),
Error: fmt.Errorf("postgres repository is not configured"),
}
}

// Look up the actual table name from entity_attributes table
// The table name is UUID-based and stored during create operation
var tableName string
err = repo.DB().QueryRowContext(ctx,
err := r.repo.DB().QueryRowContext(ctx,
`SELECT table_name FROM entity_attributes WHERE entity_id = $1 AND attribute_name = $2`,
entityID, attrName).Scan(&tableName)
if err != nil {
Expand All @@ -540,7 +560,7 @@ func (r *TabularAttributeResolver) ReadResolve(ctx context.Context, entityID, at
log.Printf("[TabularAttributeResolver.ReadResolve] Found tableName: %s", tableName)

// Use the GetData method from the repository to retrieve data with filters and fields
anyData, err := repo.GetData(ctx, tableName, filters, fields...)
anyData, err := r.repo.GetData(ctx, tableName, filters, fields...)
if err != nil {
return &Result{
Data: nil,
Expand Down
Loading
Loading