From 7faef23cf2d266e8a4db450aa344988b5b441904 Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Wed, 10 Jun 2026 12:34:12 -0300 Subject: [PATCH 01/12] refactor(ServerGroup): Centralizes rules and adds whether or not it is a shared group. - Added is_shared_group property to the result to simplify the identificantion of shared group- Added is_shared_group property to the result to simplify the identification of shared group - rules centralized in get_server_groups_for_user_query() function --- web/pgadmin/utils/server_access.py | 100 +++++++++++++++++++---------- 1 file changed, 67 insertions(+), 33 deletions(-) diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index efceca83c19..4626845e286 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -14,7 +14,7 @@ have been explicitly shared with them via SharedServer entries. """ -from sqlalchemy import or_ +from sqlalchemy import or_, case, exists from flask_security import current_user from pgadmin.model import db, Server, ServerGroup @@ -68,55 +68,89 @@ def get_server(sid, only_owned=False): def get_server_group(gid): """Fetch a server group by ID, verifying user access. - Returns the group if: - - Desktop mode, OR - - The user owns it, OR - - It contains shared servers (Server.shared=True). + See get_server_groups_for_user() docstring for the underlying access logic. + Returns the group if the user owns it, or if it contains shared servers. + Returns None otherwise. + """ + sg = get_server_groups_for_user(servergroup_id=gid) + + if sg: + return sg[0] + + return None + + +def get_server_groups_for_user(hide_shared=False, servergroup_id=None): + """Return a list for server groups visible to the current user. + + Args: + hide_shared: If True, only return groups owned by the current user. + servergroup_id: If provided, filter to a specific server group ID. - Returns None otherwise. The Administrator role does not grant - access to other users' private groups. + See get_server_groups_for_user_query() docstring for the underlying query logic. """ - if not config.SERVER_MODE: - return ServerGroup.query.filter_by(id=gid).first() - return ServerGroup.query.filter( - ServerGroup.id == gid, - or_( - ServerGroup.user_id == current_user.id, - ServerGroup.id.in_( - db.session.query(Server.servergroup_id).filter( - Server.shared - ) - ) - ) - ).first() + sg = get_server_groups_for_user_query(hide_shared=hide_shared, servergroup_id=servergroup_id).all() + + result_list = [] + for group, is_shared in sg: + group.is_shared_group = is_shared + result_list.append(group) -def get_server_groups_for_user(): - """Return server groups visible to the current user. + return result_list + +def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): + """Return a query for server groups visible to the current user. Includes groups owned by the user plus groups containing shared servers (Server.shared=True, visible to all authenticated users). + is_shared_group is an additional column indicating if the group is a group + not owned by the user and contains shared servers. + The Administrator role does not grant visibility into other users' private groups — admins see the same set as a regular user with the same ownership and sharing configuration. """ if not config.SERVER_MODE: - return ServerGroup.query.filter_by( - user_id=current_user.id - ).all() + return ( ServerGroup.query.add_columns( (0).label('is_shared_group') ) + .filter( ServerGroup.user_id == current_user.id) + ) - return ServerGroup.query.filter( - or_( - ServerGroup.user_id == current_user.id, - ServerGroup.id.in_( - db.session.query(Server.servergroup_id).filter( - Server.shared - ) + + query = ServerGroup.query.add_columns( + (ServerGroup.user_id != current_user.id).label('is_shared_group') + ) + + if hide_shared: + query = query.filter(ServerGroup.user_id == current_user.id) + else: + has_shared_servers = ( + db.session.query(Server.id) + .filter( + Server.servergroup_id == ServerGroup.id, + Server.shared == True ) + .exists() ) - ).all() + + query = query.filter( + or_( + ServerGroup.user_id == current_user.id, + has_shared_servers + ) + ) + + if servergroup_id is not None: + query = query.filter(ServerGroup.id == servergroup_id) + + query = query.order_by( + case((ServerGroup.user_id == current_user.id, 0),else_=1), + ServerGroup.id + ) + + return query def get_user_server_query(): From 8bc324400a6a3b36a335b01f7306021b4385c002 Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Wed, 10 Jun 2026 12:40:43 -0300 Subject: [PATCH 02/12] refactor(servergroups): review the use of get_server_groups_for_user and shared group logic - change to call get_server_groups_for_user() to retrive the servergroups when possible - same for get_server_group() - Simplifying the method of detecting whether the group is shared or not using the information now returned by get_server_groups_for_user() and get_server_group(). --- web/pgadmin/browser/server_groups/__init__.py | 73 ++++++------------- 1 file changed, 21 insertions(+), 52 deletions(-) diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index 50c14c6d17b..82dbcd081f9 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -29,18 +29,15 @@ get_server_groups_for_user -def get_icon_css_class(group_id, group_user_id, - default_val='icon-server_group'): +def get_icon_css_class(is_shared_group, default_val='icon-server_group'): """ Returns css value - :param group_id: - :param group_user_id: + :param is_shared_group: :param default_val: :return: default_val """ - if (config.SERVER_MODE and - group_user_id != current_user.id and - ServerGroupModule.has_shared_server(group_id)): + if (config.SERVER_MODE + and is_shared_group): default_val = 'icon-server_group_shared' return default_val, True @@ -66,31 +63,13 @@ def csssnippets(self): return snippets - @staticmethod - def has_shared_server(gid): - """ - To check whether given server group contains shared server or not - :param gid: - :return: True if servergroup contains shared server else false - """ - servers = Server.query.filter_by(servergroup_id=gid) - for s in servers: - if s.shared: - return True - return False - def get_nodes(self, *arg, **kwargs): """Return a JSON document listing the server groups for the user""" - if config.SERVER_MODE: - groups = ServerGroupView.get_all_server_groups() - else: - groups = ServerGroup.query.filter_by( - user_id=current_user.id - ).order_by("id") + groups = ServerGroupView.get_all_server_groups() for idx, group in enumerate(groups): - icon_class, is_shared = get_icon_css_class(group.id, group.user_id) + icon_class, is_shared = get_icon_css_class(group.is_shared_group) yield self.generate_browser_node( "%d" % (group.id), None, group.name, @@ -201,7 +180,7 @@ def delete(self, gid): status=417, success=0, errormsg=gettext( - 'The specified server group cannot be deleted.' + 'The specified server group cannot be deleted. Shared servers are present in this group.' ) ) @@ -210,7 +189,7 @@ def delete(self, gid): status=417, success=0, errormsg=gettext( - 'The specified server group cannot be deleted.' + 'The specified server group cannot be deleted. The first server group is not deletable.' ) ) @@ -240,9 +219,7 @@ def update(self, gid): """Update the server-group properties""" # There can be only one record at most - servergroup = ServerGroup.query.filter_by( - user_id=current_user.id, - id=gid).first() + servergroup = get_server_group(gid) data = request.form if request.form else json.loads( request.data @@ -259,18 +236,20 @@ def update(self, gid): if 'name' in data: servergroup.name = data['name'] db.session.commit() + except exc.IntegrityError: db.session.rollback() return bad_request(gettext( "The specified server group already exists." )) + except Exception as e: db.session.rollback() return make_json_response( status=410, success=0, errormsg=str(e) ) - icon_class, is_shared = get_icon_css_class(gid, servergroup.user_id) + icon_class, is_shared = get_icon_css_class(servergroup.is_shared_group) return jsonify( node=self.blueprint.generate_browser_node( gid, @@ -317,10 +296,13 @@ def create(self): db.session.add(sg) db.session.commit() + # Refresh the sg object to get the id and other properties + sg = get_server_group(sg.id) + data['id'] = sg.id data['name'] = sg.name - icon_class, is_shared = get_icon_css_class(sg.id, sg.user_id) + icon_class, is_shared = get_icon_css_class(sg.is_shared_group) return jsonify( node=self.blueprint.generate_browser_node( "%d" % sg.id, @@ -387,18 +369,7 @@ def get_all_server_groups(): pref = Preferences.module('browser') hide_shared_server = pref.preference('hide_shared_server').get() - server_groups = get_server_groups_for_user() - - if hide_shared_server: - groups = [] - for group in server_groups: - if group.user_id != current_user.id and \ - ServerGroupModule.has_shared_server(group.id): - continue - groups.append(group) - return groups - - return server_groups + return get_server_groups_for_user(hide_shared_server) @pga_login_required def nodes(self, gid=None): @@ -406,14 +377,13 @@ def nodes(self, gid=None): nodes = [] if gid is None: if config.SERVER_MODE: - groups = self.get_all_server_groups() + else: - groups = ServerGroup.query.filter_by(user_id=current_user.id) + groups = get_server_groups_for_user(hide_shared=True) for group in groups: - icon_class, is_shared = get_icon_css_class(group.id, - group.user_id) + icon_class, is_shared = get_icon_css_class(group.is_shared_group) nodes.append( self.blueprint.generate_browser_node( "%d" % group.id, @@ -433,8 +403,7 @@ def nodes(self, gid=None): errormsg=gettext("Could not find the server group.") ) - icon_class, is_shared = get_icon_css_class(group.id, - group.user_id) + icon_class, is_shared = get_icon_css_class(group.is_shared_group) nodes = self.blueprint.generate_browser_node( "%d" % (group.id), None, group.name, From 6dd0f010f687379cbceb9ac6588739cac49ef4fe Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Wed, 10 Jun 2026 12:43:52 -0300 Subject: [PATCH 03/12] fix(servergroup): check if the ServerGroup is shared in can_delete - A shared group is not allowed to be deleted. - Only the owner can do it. --- web/pgadmin/browser/server_groups/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index 82dbcd081f9..f4f0e8a0786 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -76,7 +76,7 @@ def get_nodes(self, *arg, **kwargs): icon_class, True, self.node_type, - can_delete=True if idx > 0 else False, + can_delete=True if idx > 0 and not group.is_shared_group else False, user_id=group.user_id, is_shared=is_shared ) From ceb340120675ce38b8d76dba070b69b8b19f9bd5 Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Mon, 15 Jun 2026 15:39:33 -0300 Subject: [PATCH 04/12] fix(server_access.py): Error 'int' object has no attribute 'label' - In Desktop Mode get_server_groups_for_user_query() raise the error --- web/pgadmin/utils/server_access.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index 4626845e286..cc156bec115 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -14,7 +14,7 @@ have been explicitly shared with them via SharedServer entries. """ -from sqlalchemy import or_, case, exists +from sqlalchemy import or_, case, exists, literal from flask_security import current_user from pgadmin.model import db, Server, ServerGroup @@ -114,7 +114,7 @@ def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): user with the same ownership and sharing configuration. """ if not config.SERVER_MODE: - return ( ServerGroup.query.add_columns( (0).label('is_shared_group') ) + return ( ServerGroup.query.add_columns( literal(0).label('is_shared_group') ) .filter( ServerGroup.user_id == current_user.id) ) From 7cddf556819e4d431f56e2fa234becaedff247bc Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Mon, 15 Jun 2026 15:54:02 -0300 Subject: [PATCH 05/12] fix: Access regression in update() - User cannot update a shared Group, only owned Server Groups --- web/pgadmin/browser/server_groups/__init__.py | 4 ++-- web/pgadmin/utils/server_access.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index f4f0e8a0786..e0f7fa4717f 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -218,8 +218,8 @@ def delete(self, gid): def update(self, gid): """Update the server-group properties""" - # There can be only one record at most - servergroup = get_server_group(gid) + # There can be only one record at most (only owned) + servergroup = get_server_group(gid, hide_shared=True) data = request.form if request.form else json.loads( request.data diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index cc156bec115..98ed90d1635 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -65,14 +65,14 @@ def get_server(sid, only_owned=False): ).first() -def get_server_group(gid): +def get_server_group(gid,hide_shared=False): """Fetch a server group by ID, verifying user access. See get_server_groups_for_user() docstring for the underlying access logic. Returns the group if the user owns it, or if it contains shared servers. Returns None otherwise. """ - sg = get_server_groups_for_user(servergroup_id=gid) + sg = get_server_groups_for_user(servergroup_id=gid, hide_shared=hide_shared ) if sg: return sg[0] From f72a550c65d98f403808953cbec602dd7a50ee90 Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Mon, 15 Jun 2026 16:33:10 -0300 Subject: [PATCH 06/12] fix: 'NoneType' object has no attribute 'get' - Class ServerGroupView of server_groups in desktop mode pref.preferences('hide_shared_server') returns none. Included a treatment to check if it is None then hide per default (anyway there are no shared servers in desktop mode) --- web/pgadmin/browser/server_groups/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index e0f7fa4717f..542b2559171 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -367,7 +367,8 @@ def get_all_server_groups(): # Don't display shared server if user has # selected 'Hide shared server' pref = Preferences.module('browser') - hide_shared_server = pref.preference('hide_shared_server').get() + pref_item = pref.preference('hide_shared_server') + hide_shared_server = pref_item.get() if pref_item is not None else True return get_server_groups_for_user(hide_shared_server) From c7ff6eb93c6ef65e9a49c3fbfde7f0594003b39a Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Mon, 15 Jun 2026 16:37:28 -0300 Subject: [PATCH 07/12] style: fix styles problems detected - 79 char per line limit - 2 blank line between top-level functions - E712 in Server.shared == True - double space in `for group, is_shared in sg:` --- web/pgadmin/utils/server_access.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index 98ed90d1635..061abf0a74c 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -72,7 +72,8 @@ def get_server_group(gid,hide_shared=False): Returns the group if the user owns it, or if it contains shared servers. Returns None otherwise. """ - sg = get_server_groups_for_user(servergroup_id=gid, hide_shared=hide_shared ) + sg = get_server_groups_for_user(servergroup_id=gid, + hide_shared=hide_shared ) if sg: return sg[0] @@ -87,19 +88,24 @@ def get_server_groups_for_user(hide_shared=False, servergroup_id=None): hide_shared: If True, only return groups owned by the current user. servergroup_id: If provided, filter to a specific server group ID. - See get_server_groups_for_user_query() docstring for the underlying query logic. + See get_server_groups_for_user_query() docstring for the underlying query + logic. """ - sg = get_server_groups_for_user_query(hide_shared=hide_shared, servergroup_id=servergroup_id).all() + sg = ( get_server_groups_for_user_query(hide_shared=hide_shared, + servergroup_id=servergroup_id) + .all() + ) result_list = [] - for group, is_shared in sg: + for group, is_shared in sg: group.is_shared_group = is_shared result_list.append(group) return result_list + def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): """Return a query for server groups visible to the current user. @@ -114,13 +120,16 @@ def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): user with the same ownership and sharing configuration. """ if not config.SERVER_MODE: - return ( ServerGroup.query.add_columns( literal(0).label('is_shared_group') ) + return ( ServerGroup.query.add_columns( + literal(0).label('is_shared_group') + ) .filter( ServerGroup.user_id == current_user.id) ) query = ServerGroup.query.add_columns( - (ServerGroup.user_id != current_user.id).label('is_shared_group') + (ServerGroup.user_id != current_user.id) + .label('is_shared_group') ) if hide_shared: @@ -130,7 +139,7 @@ def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): db.session.query(Server.id) .filter( Server.servergroup_id == ServerGroup.id, - Server.shared == True + Server.shared ) .exists() ) From 063d4db46b313708c4756227cbc1c7bbd5a43d9d Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Tue, 16 Jun 2026 11:11:21 -0300 Subject: [PATCH 08/12] feat(server_access): Added is_first_user_group to the result - To facilitate and to have a more reliable way to determine if the servergroup is the first servergroup owned by the user changed the query to return this info --- web/pgadmin/utils/server_access.py | 35 +++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index 061abf0a74c..f87eae928e4 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -14,7 +14,7 @@ have been explicitly shared with them via SharedServer entries. """ -from sqlalchemy import or_, case, exists, literal +from sqlalchemy import or_, case, exists, func, literal from flask_security import current_user from pgadmin.model import db, Server, ServerGroup @@ -92,15 +92,15 @@ def get_server_groups_for_user(hide_shared=False, servergroup_id=None): logic. """ - sg = ( get_server_groups_for_user_query(hide_shared=hide_shared, + sg = get_server_groups_for_user_query(hide_shared=hide_shared, servergroup_id=servergroup_id) - .all() - ) result_list = [] - for group, is_shared in sg: - group.is_shared_group = is_shared + for row in sg.all(): + group = row.ServerGroup + group.is_shared_group = row.is_shared_group + group.is_first_user_group = row.is_first_user_group result_list.append(group) return result_list @@ -119,9 +119,27 @@ def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): users' private groups — admins see the same set as a regular user with the same ownership and sharing configuration. """ + + ServerGroupAlias = db.aliased(ServerGroup) + + min_id_subquery = ( + db.session.query(func.min(ServerGroupAlias.id)) + .filter( + ServerGroupAlias.user_id == current_user.id + ) + .correlate(ServerGroup) + .scalar_subquery() + ) + + is_first_user_group = ( + (ServerGroup.id == min_id_subquery) + .label('is_first_user_group') + ) + if not config.SERVER_MODE: return ( ServerGroup.query.add_columns( - literal(0).label('is_shared_group') + literal(0).label('is_shared_group'), + is_first_user_group ) .filter( ServerGroup.user_id == current_user.id) ) @@ -129,7 +147,8 @@ def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): query = ServerGroup.query.add_columns( (ServerGroup.user_id != current_user.id) - .label('is_shared_group') + .label('is_shared_group'), + is_first_user_group ) if hide_shared: From c8a0c853ad2465730326bfb08d2bad2ab3f1a96c Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Tue, 16 Jun 2026 11:20:57 -0300 Subject: [PATCH 09/12] fix: canDrop and canEdit servergroups verification in menus wasn't working properly servergroups\__init__.py: - nodes now return can_delete and cad_edit properties to the script handle it more easily and not expose the rules - uniformize information generate passed to self.blueprint.generate_browser_node() function server_groups.js - canDelete was not used so droped it - Adjusted canDrop to use the can_delete information from itemData - Added canEdit to control if the properties can be edited or not --- web/pgadmin/browser/server_groups/__init__.py | 25 +++++++++++++------ .../server_groups/static/js/server_group.js | 14 ++++------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index 542b2559171..8faf203db00 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -68,16 +68,18 @@ def get_nodes(self, *arg, **kwargs): groups = ServerGroupView.get_all_server_groups() - for idx, group in enumerate(groups): + for group in groups: icon_class, is_shared = get_icon_css_class(group.is_shared_group) yield self.generate_browser_node( - "%d" % (group.id), None, + "%d" % (group.id), + None, group.name, icon_class, True, self.node_type, - can_delete=True if idx > 0 and not group.is_shared_group else False, - user_id=group.user_id, + can_delete=(not (group.is_first_user_group + or group.is_shared_group)), + can_edit=(not group.is_shared_group), is_shared=is_shared ) @@ -258,7 +260,9 @@ def update(self, gid): icon_class, True, self.node_type, - can_delete=True, # This is user created hence can delete + can_delete=(not (servergroup.is_first_user_group + or servergroup.is_shared_group)), + can_edit=(not servergroup.is_shared_group), is_shared=is_shared ) ) @@ -311,8 +315,8 @@ def create(self): icon_class, True, self.node_type, - # This is user created hence can deleted - can_delete=True, + can_delete=(not (sg.is_first_user_group + or sg.is_shared_group)), is_shared=is_shared ) ) @@ -393,6 +397,8 @@ def nodes(self, gid=None): icon_class, True, self.node_type, + can_delete=(not (group.is_first_user_group + or group.is_shared_group)), is_shared=is_shared ) ) @@ -406,11 +412,14 @@ def nodes(self, gid=None): icon_class, is_shared = get_icon_css_class(group.is_shared_group) nodes = self.blueprint.generate_browser_node( - "%d" % (group.id), None, + "%d" % (group.id), + None, group.name, icon_class, True, self.node_type, + can_delete=(not (group.is_first_user_group + or group.is_shared_group)), is_shared=is_shared ) diff --git a/web/pgadmin/browser/server_groups/static/js/server_group.js b/web/pgadmin/browser/server_groups/static/js/server_group.js index b822781cb3e..591a1ebf565 100644 --- a/web/pgadmin/browser/server_groups/static/js/server_group.js +++ b/web/pgadmin/browser/server_groups/static/js/server_group.js @@ -7,12 +7,11 @@ // ////////////////////////////////////////////////////////////// import ServerGroupSchema from './server_group.ui'; -import _ from 'lodash'; define('pgadmin.node.server_group', [ 'sources/gettext', 'sources/url_for', 'sources/pgadmin', 'pgadmin.user_management.current_user', 'pgadmin.browser', 'pgadmin.browser.node', -], function(gettext, url_for, pgAdmin, current_user) { +], function(gettext, url_for, pgAdmin) { if (!pgAdmin.Browser.Nodes['server_group']) { pgAdmin.Browser.Nodes['server_group'] = pgAdmin.Browser.Node.extend({ @@ -39,17 +38,14 @@ define('pgadmin.node.server_group', [ }]); }, getSchema: ()=>new ServerGroupSchema(), + canEdit: function(itemData) { + return (itemData && itemData.can_edit); + }, canDrop: function(itemData) { - let serverOwner = itemData.user_id; - return !(serverOwner != current_user.id && !_.isUndefined(serverOwner)); + return (itemData && itemData.can_delete); }, dropAsRemove: true, - canDelete: function(i) { - let s = pgAdmin.Browser.tree.siblings(i, true); - /* This is the only server group - we can't remove it*/ - return !(!s || s.length == 0); - }, }); } From c37deca7e09b2835b44e3c64706ceb315a4ad049 Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Tue, 16 Jun 2026 11:34:32 -0300 Subject: [PATCH 10/12] fix: Unused current_user dependency should be removed. - removed unused dependency from the dependency array --- web/pgadmin/browser/server_groups/static/js/server_group.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/pgadmin/browser/server_groups/static/js/server_group.js b/web/pgadmin/browser/server_groups/static/js/server_group.js index 591a1ebf565..7ead306945f 100644 --- a/web/pgadmin/browser/server_groups/static/js/server_group.js +++ b/web/pgadmin/browser/server_groups/static/js/server_group.js @@ -10,7 +10,7 @@ import ServerGroupSchema from './server_group.ui'; define('pgadmin.node.server_group', [ 'sources/gettext', 'sources/url_for', - 'sources/pgadmin', 'pgadmin.user_management.current_user', 'pgadmin.browser', 'pgadmin.browser.node', + 'sources/pgadmin', 'pgadmin.browser', 'pgadmin.browser.node', ], function(gettext, url_for, pgAdmin) { if (!pgAdmin.Browser.Nodes['server_group']) { From 1f1f6638a979eefdb635c3a802d51b4a1933efb5 Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Fri, 19 Jun 2026 11:40:10 -0300 Subject: [PATCH 11/12] fix: canEdit regression and style - fix of make check-pep8 style errors detected - Adding can_edit to create() and nodes() - hide_shared_server default to false if the pref object is absent --- web/pgadmin/browser/server_groups/__init__.py | 56 +++++++++++++------ web/pgadmin/utils/server_access.py | 21 ++++--- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index 8faf203db00..b0c52bbcebc 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -36,8 +36,7 @@ def get_icon_css_class(is_shared_group, default_val='icon-server_group'): :param default_val: :return: default_val """ - if (config.SERVER_MODE - and is_shared_group): + if (config.SERVER_MODE and is_shared_group): default_val = 'icon-server_group_shared' return default_val, True @@ -77,8 +76,9 @@ def get_nodes(self, *arg, **kwargs): icon_class, True, self.node_type, - can_delete=(not (group.is_first_user_group - or group.is_shared_group)), + can_delete=( + not (group.is_first_user_group or group.is_shared_group) + ), can_edit=(not group.is_shared_group), is_shared=is_shared ) @@ -182,7 +182,8 @@ def delete(self, gid): status=417, success=0, errormsg=gettext( - 'The specified server group cannot be deleted. Shared servers are present in this group.' + 'The specified server group cannot be deleted.' + ' Shared servers are present in this group.' ) ) @@ -191,7 +192,8 @@ def delete(self, gid): status=417, success=0, errormsg=gettext( - 'The specified server group cannot be deleted. The first server group is not deletable.' + 'The specified server group cannot be deleted.' + ' The first server group is not deletable.' ) ) @@ -260,8 +262,12 @@ def update(self, gid): icon_class, True, self.node_type, - can_delete=(not (servergroup.is_first_user_group - or servergroup.is_shared_group)), + can_delete=( + not ( + servergroup.is_first_user_group or + servergroup.is_shared_group + ) + ), can_edit=(not servergroup.is_shared_group), is_shared=is_shared ) @@ -315,8 +321,13 @@ def create(self): icon_class, True, self.node_type, - can_delete=(not (sg.is_first_user_group - or sg.is_shared_group)), + can_delete=( + not ( + sg.is_first_user_group or + sg.is_shared_group + ) + ), + can_edit=(not sg.is_shared_group), is_shared=is_shared ) ) @@ -372,7 +383,9 @@ def get_all_server_groups(): # selected 'Hide shared server' pref = Preferences.module('browser') pref_item = pref.preference('hide_shared_server') - hide_shared_server = pref_item.get() if pref_item is not None else True + hide_shared_server = ( + pref_item.get() if pref_item is not None else False + ) return get_server_groups_for_user(hide_shared_server) @@ -388,7 +401,8 @@ def nodes(self, gid=None): groups = get_server_groups_for_user(hide_shared=True) for group in groups: - icon_class, is_shared = get_icon_css_class(group.is_shared_group) + icon_class, + is_shared = get_icon_css_class(group.is_shared_group) nodes.append( self.blueprint.generate_browser_node( "%d" % group.id, @@ -397,8 +411,13 @@ def nodes(self, gid=None): icon_class, True, self.node_type, - can_delete=(not (group.is_first_user_group - or group.is_shared_group)), + can_delete=( + not ( + group.is_first_user_group or + group.is_shared_group + ) + ), + can_edit=(not group.is_shared_group), is_shared=is_shared ) ) @@ -418,8 +437,13 @@ def nodes(self, gid=None): icon_class, True, self.node_type, - can_delete=(not (group.is_first_user_group - or group.is_shared_group)), + can_delete=( + not ( + group.is_first_user_group or + group.is_shared_group + ) + ), + can_edit=(not group.is_shared_group), is_shared=is_shared ) diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index f87eae928e4..ea62eff15d3 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -73,7 +73,7 @@ def get_server_group(gid,hide_shared=False): Returns None otherwise. """ sg = get_server_groups_for_user(servergroup_id=gid, - hide_shared=hide_shared ) + hide_shared=hide_shared) if sg: return sg[0] @@ -137,19 +137,18 @@ def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): ) if not config.SERVER_MODE: - return ( ServerGroup.query.add_columns( - literal(0).label('is_shared_group'), - is_first_user_group + return (ServerGroup.query.add_columns( + literal(0).label('is_shared_group'), + is_first_user_group + ) + .filter(ServerGroup.user_id == current_user.id) ) - .filter( ServerGroup.user_id == current_user.id) - ) - query = ServerGroup.query.add_columns( - (ServerGroup.user_id != current_user.id) - .label('is_shared_group'), - is_first_user_group - ) + (ServerGroup.user_id != current_user.id) + .label('is_shared_group'), + is_first_user_group + ) if hide_shared: query = query.filter(ServerGroup.user_id == current_user.id) From c9e234679908d49654a854bbac524cefda365a65 Mon Sep 17 00:00:00 2001 From: Luiz K Matsumura Date: Fri, 19 Jun 2026 12:31:34 -0300 Subject: [PATCH 12/12] fix(server_groups): Improve server group creation and query handling - Added a flush after adding a new server group to retrieve its ID immediately. - Updated the server group retrieval to include access metadata. - Enhanced the query for server groups to filter by ID when provided, ensuring more precise results. --- web/pgadmin/browser/server_groups/__init__.py | 11 +- .../tests/test_sg_permissions.py | 353 ++++++++++++++++++ web/pgadmin/utils/server_access.py | 15 +- 3 files changed, 369 insertions(+), 10 deletions(-) create mode 100644 web/pgadmin/browser/server_groups/tests/test_sg_permissions.py diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index b0c52bbcebc..3c0c8d2563c 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -304,10 +304,12 @@ def create(self): user_id=current_user.id, name=data['name']) db.session.add(sg) + db.session.flush() + group_id = sg.id db.session.commit() - # Refresh the sg object to get the id and other properties - sg = get_server_group(sg.id) + # Reload with access metadata attached + sg = get_server_group(group_id) data['id'] = sg.id data['name'] = sg.name @@ -401,8 +403,9 @@ def nodes(self, gid=None): groups = get_server_groups_for_user(hide_shared=True) for group in groups: - icon_class, - is_shared = get_icon_css_class(group.is_shared_group) + icon_class, is_shared = get_icon_css_class( + group.is_shared_group + ) nodes.append( self.blueprint.generate_browser_node( "%d" % group.id, diff --git a/web/pgadmin/browser/server_groups/tests/test_sg_permissions.py b/web/pgadmin/browser/server_groups/tests/test_sg_permissions.py new file mode 100644 index 00000000000..2570e87a889 --- /dev/null +++ b/web/pgadmin/browser/server_groups/tests/test_sg_permissions.py @@ -0,0 +1,353 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2026, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +"""Tests for server group permissions after the server_access refactor.""" + +import json +import config +from pgadmin.utils.route import BaseTestGenerator +from regression.test_setup import config_data +from regression.python_test_utils.test_utils import \ + create_user_wise_test_client +from pgadmin.model import db, ServerGroup, Server + +test_user_details = None +if config.SERVER_MODE: + test_user_details = \ + config_data['pgAdmin4_test_non_admin_credentials'] + +SG_OBJ_URL = '/browser/server_group/obj/' +SG_NODES_URL = '/browser/server_group/nodes/' + + +def _create_server_group(tester, name): + """Create a server group and return (status_code, response_data).""" + response = tester.post( + SG_OBJ_URL, + data=json.dumps({'name': name}), + content_type='html/json' + ) + response_data = json.loads(response.data.decode('utf-8')) + if response.status_code == 200 and 'node' in response_data: + node = _find_node_by_label( + _get_nodes(tester)[1], name + ) + if node is not None: + response_data['node'] = node + return response.status_code, response_data + + +def _get_nodes(tester, sg_id=None): + """Fetch server group node(s) from the nodes endpoint.""" + url = SG_NODES_URL if sg_id is None else SG_NODES_URL + str(sg_id) + response = tester.get(url, content_type='html/json') + response_data = json.loads(response.data.decode('utf-8')) + return response.status_code, response_data.get('data') + + +def _find_node(nodes, sg_id): + """Find a node by _id in a list or return the dict if it matches.""" + sg_id = str(sg_id) + if isinstance(nodes, dict): + return nodes if str(nodes.get('_id')) == sg_id else None + for node in nodes or []: + if str(node.get('_id')) == sg_id: + return node + return None + + +def _find_node_by_label(nodes, label): + """Find a node by label in a list.""" + if isinstance(nodes, dict): + return nodes if nodes.get('label') == label else None + for node in nodes or []: + if node.get('label') == label: + return node + return None + + +def _get_first_group_id(app): + """Return the lowest-id server group owned by the current user.""" + with app.app_context(): + sg = ServerGroup.query.order_by(ServerGroup.id).first() + return sg.id if sg else None + + +def _cleanup_server_group(app, sg_id): + """Delete a server group directly from the database.""" + if sg_id is None: + return + with app.app_context(): + sg = ServerGroup.query.filter_by(id=sg_id).first() + if sg: + db.session.delete(sg) + db.session.commit() + + +def _cleanup_shared_servers_in_group(tester, app, group_id): + """Remove shared servers left in a group by earlier tests in the suite.""" + with app.app_context(): + server_ids = [ + s.id for s in Server.query.filter_by( + servergroup_id=group_id, shared=True + ).all() + ] + for server_id in server_ids: + tester.delete( + '/browser/server/obj/{0}/{1}'.format(group_id, server_id) + ) + + +def _assert_owned_group_node(self, node): + """Assert metadata for a non-shared owned server group node.""" + self.assertTrue(node.get('can_edit')) + self.assertFalse(node.get('is_shared')) + self.assertEqual(node.get('icon'), 'icon-server_group') + + +def _assert_desktop_nodes_metadata(self, nodes): + """Assert desktop-mode metadata on all listed server group nodes.""" + self.assertIsInstance(nodes, list) + for node in nodes: + self.assertFalse(node.get('is_shared')) + self.assertTrue(node.get('can_edit')) + + +class OwnedGroupCrudTestCase(BaseTestGenerator): + """An owned group is visible, editable, and deletable.""" + + scenarios = [ + ('Owned group is visible, editable, and deletable', + dict(is_positive_test=True)), + ] + + def setUp(self): + super().setUp() + self.sg_id = None + status_code, response_data = _create_server_group( + self.tester, 'owned_test_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.sg_id = response_data['node']['_id'] + self.assertTrue( + response_data['node'].get('can_delete'), + 'Owned CRUD tests require a non-first server group.' + ) + + def runTest(self): + if not self.sg_id: + raise Exception("Server group not created") + + url = SG_OBJ_URL + str(self.sg_id) + response = self.tester.get(url, content_type='html/json') + self.assertEqual(response.status_code, 200) + + status_code, node = _get_nodes(self.tester, self.sg_id) + self.assertEqual(status_code, 200) + _assert_owned_group_node(self, node) + self.assertTrue(node.get('can_delete')) + + if not config.SERVER_MODE: + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + _assert_desktop_nodes_metadata(self, nodes) + + response = self.tester.put( + url, + data=json.dumps({'name': 'renamed_group'}), + content_type='html/json' + ) + self.assertEqual(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn('node', response_data) + self.assertEqual(response_data['node']['label'], 'renamed_group') + self.assertTrue(response_data['node'].get('can_edit')) + + response = self.tester.delete(url) + self.assertEqual(response.status_code, 200) + self.sg_id = None + + def tearDown(self): + _cleanup_server_group(self.app, self.sg_id) + + +class FirstGroupNotDeletableTestCase(BaseTestGenerator): + """The user's first server group cannot be deleted.""" + + scenarios = [ + ('First owned server group is not deletable', + dict(is_positive_test=False)), + ] + + def setUp(self): + super().setUp() + if config.SERVER_MODE: + # Other tests may leave shared servers in the default group; + # delete() checks that before the first-group guard. + _cleanup_shared_servers_in_group( + self.tester, self.app, config_data['server_group'] + ) + + def runTest(self): + first_group_id = _get_first_group_id(self.app) + self.assertIsNotNone(first_group_id) + + status_code, node = _get_nodes(self.tester, first_group_id) + self.assertEqual(status_code, 200) + self.assertFalse(node.get('can_delete')) + self.assertTrue(node.get('can_edit')) + self.assertFalse(node.get('is_shared')) + + if not config.SERVER_MODE: + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + _assert_desktop_nodes_metadata(self, nodes) + + url = SG_OBJ_URL + str(first_group_id) + response = self.tester.delete(url) + self.assertEqual(response.status_code, 417) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn( + 'The first server group is not deletable.', + response_data.get('errormsg', '') + ) + + status_code, response_data = _create_server_group( + self.tester, 'second_test_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.assertTrue(response_data['node'].get('can_delete')) + if not config.SERVER_MODE: + self.assertFalse(response_data['node'].get('is_shared')) + + second_group_id = response_data['node']['_id'] + _cleanup_server_group(self.app, second_group_id) + + +class SharedGroupPermissionsTestCase(BaseTestGenerator): + """A group with another user's shared server is visible but not + editable or deletable (server mode only).""" + + scenarios = [ + ('Shared server group is visible with shared flags only', + dict(is_positive_test=True)), + ] + + def setUp(self): + super().setUp() + self.sg_id = None + self.server_id = None + if not config.SERVER_MODE: + self.skipTest( + 'Shared group permission tests only apply to server mode.' + ) + + status_code, response_data = _create_server_group( + self.tester, 'shared_host_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.sg_id = response_data['node']['_id'] + + self.server['shared'] = True + url = '/browser/server/obj/{0}/'.format(self.sg_id) + response = self.tester.post( + url, + data=json.dumps(self.server), + content_type='html/json' + ) + self.assertEqual(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn('node', response_data) + self.server_id = response_data['node']['_id'] + + @create_user_wise_test_client(test_user_details) + def runTest(self): + if not self.sg_id: + raise Exception("Server group not created") + + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + node = _find_node(nodes, self.sg_id) + self.assertIsNotNone( + node, + 'Shared server group should be visible to non-owner' + ) + self.assertTrue(node.get('is_shared')) + self.assertEqual(node.get('icon'), 'icon-server_group_shared') + self.assertFalse(node.get('can_edit')) + self.assertFalse(node.get('can_delete')) + + url = SG_OBJ_URL + str(self.sg_id) + response = self.tester.get(url, content_type='html/json') + self.assertEqual(response.status_code, 200) + + response = self.tester.put( + url, + data=json.dumps({'name': 'should_not_rename'}), + content_type='html/json' + ) + self.assertEqual(response.status_code, 417) + + response = self.tester.delete(url) + self.assertEqual(response.status_code, 417) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn( + 'Shared servers are present', + response_data.get('errormsg', '') + ) + + def tearDown(self): + if self.server_id is not None: + url = '/browser/server/obj/{0}/{1}'.format( + self.sg_id, self.server_id) + self.__class__.tester.delete(url) + _cleanup_server_group(self.app, self.sg_id) + + +class DesktopGroupsMetadataTestCase(BaseTestGenerator): + """Desktop mode returns owned groups with is_shared=False.""" + + scenarios = [ + ('Desktop groups report is_shared=False', + dict(is_positive_test=True)), + ] + + def setUp(self): + super().setUp() + self.sg_id = None + if config.SERVER_MODE: + self.skipTest( + 'Desktop metadata tests only apply to desktop mode.' + ) + + def runTest(self): + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + _assert_desktop_nodes_metadata(self, nodes) + + first_group_id = _get_first_group_id(self.app) + self.assertIsNotNone(first_group_id) + first_node = _find_node(nodes, first_group_id) + self.assertIsNotNone(first_node) + self.assertFalse(first_node.get('can_delete')) + + status_code, response_data = _create_server_group( + self.tester, 'desktop_second_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.assertFalse(response_data['node'].get('is_shared')) + self.assertTrue(response_data['node'].get('can_delete')) + self.sg_id = response_data['node']['_id'] + + def tearDown(self): + _cleanup_server_group(self.app, self.sg_id) diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index ea62eff15d3..5d9dc74818c 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -137,12 +137,15 @@ def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): ) if not config.SERVER_MODE: - return (ServerGroup.query.add_columns( - literal(0).label('is_shared_group'), - is_first_user_group - ) - .filter(ServerGroup.user_id == current_user.id) - ) + query = ServerGroup.query.add_columns( + literal(0).label('is_shared_group'), + is_first_user_group + ).filter(ServerGroup.user_id == current_user.id) + + if servergroup_id is not None: + query = query.filter(ServerGroup.id == servergroup_id) + + return query.order_by(ServerGroup.id) query = ServerGroup.query.add_columns( (ServerGroup.user_id != current_user.id)