Add instance IDs for forgejo auth sources #196

Merged
cmo merged 4 commits from forgejo-labels into master 2026-02-19 22:26:45 +00:00
6 changed files with 78 additions and 12 deletions

View file

@ -1,3 +1,12 @@
# 5.5.0 (2026-02-19)
## New features
- Forgejo authentication sources now generate incoming labels that are unique
to the user and the Forgejo instance. This keeps incoming areas of users on
different Forgejo instances separate, even if the user names are identical.
# 5.4.0 (2026-02-02)
## New features

View file

@ -292,6 +292,10 @@ collections:
# is `user`, the incoming label will be
# `forgejo-user-<user-login>`
label_type: team
# An optional instance id. This is used to disambiguate identical
# user IDs on different Forgejo instances. If not set, a hash of
# `url` will be used instead.
instance_id: forgejo-server-1
# An optional repository. The token will only be authorized
# if the team has access to the repository. Note: if `repository`
# is set, the token must have at least repository read

View file

@ -1 +1 @@
__version__ = '5.4.0'
__version__ = '5.5.0'

View file

@ -9,6 +9,7 @@ will emit a complete repository-record including the complete owner-record.
"""
from __future__ import annotations
import hashlib
import logging
import time
from functools import wraps
@ -74,6 +75,7 @@ class ForgejoAuthenticationSource(AuthenticationSource, MethodCache):
organization: str,
team: str,
label_type: str,
instance_id: str | None = None,
repository: str | None = None,
):
"""
@ -90,6 +92,8 @@ class ForgejoAuthenticationSource(AuthenticationSource, MethodCache):
:param team: The name of the team
:param label_type: 'team' or 'user', determines how the incoming label
is created.
:param instance_id: Optional instance ID. If present, will be used to
disambig
:param repository: Optional repository. If this is provided, access
will only be granted if the team has access to the repository.
"""
@ -98,6 +102,7 @@ class ForgejoAuthenticationSource(AuthenticationSource, MethodCache):
self.organization = organization
self.team = team
self.label_type = label_type
self.instance_id = instance_id
self.repository = repository
def _get_json_from_endpoint(
@ -210,6 +215,11 @@ class ForgejoAuthenticationSource(AuthenticationSource, MethodCache):
)
return permissions
def _instance_label(self) -> str:
return self.instance_id or hashlib.md5(
self.api_url.encode()
).hexdigest()
@MethodCache.cache_temporary(duration=60)
def authenticate(
self,
@ -272,7 +282,7 @@ class ForgejoAuthenticationSource(AuthenticationSource, MethodCache):
),
user_id=user_info['email'],
incoming_label=
f'forgejo-team-{organization["name"]}-{team["name"]}'
f'forgejo-{self._instance_label()}-team-{organization["name"]}-{team["name"]}'
if self.label_type == 'team'
else f'forgejo-user-{user_info["login"]}',
else f'forgejo-{self._instance_label()}-user-{user_info["login"]}',
)

View file

@ -121,6 +121,7 @@ class ForgejoAuthConfig(StrictModel):
organization: str
team: str
label_type: Literal['team', 'user']
instance_id: str | None = None
repository: str | None = None

View file

@ -42,12 +42,14 @@ team_1 = json.loads(team_template.format(id=1, action='none'))
team_2 = json.loads(team_template.format(id=2, action='none'))
team_3 = json.loads(team_template.format(id=3, action='write'))
def setup_http_server(http_server) -> None:
http_server.expect_request('/api/v1/user').respond_with_json(user_1)
http_server.expect_request('/api/v1/user/teams').respond_with_json([team_1, team_3])
http_server.expect_request('/api/v1/orgs/org_1').respond_with_json(org_1)
http_server.expect_request('/api/v1/orgs/org_1/teams').respond_with_json([team_1, team_2, team_3])
http_server.expect_request('/api/v1/repos/org_1/repo_1/teams').respond_with_json([team_1, team_2, team_3])
for instance in ('1', '2'):
http_server.expect_request(f'/api/v{instance}/user').respond_with_json(user_1)
http_server.expect_request(f'/api/v{instance}/user/teams').respond_with_json([team_1, team_3])
http_server.expect_request(f'/api/v{instance}/orgs/org_1').respond_with_json(org_1)
http_server.expect_request(f'/api/v{instance}/orgs/org_1/teams').respond_with_json([team_1, team_2, team_3])
http_server.expect_request(f'/api/v{instance}/repos/org_1/repo_1/teams').respond_with_json([team_1, team_2, team_3])
@pytest.mark.parametrize('repository', ['repo_1', None])
@ -61,13 +63,14 @@ def test_forgejo_auth_team(httpserver, label_type, repository):
team='team_1',
label_type=label_type,
repository=repository,
instance_id='inst_1',
)
r = forgejo_auth_source.authenticate(token='something')
if label_type == 'team':
assert r.incoming_label == 'forgejo-team-org_1-team_1'
assert r.incoming_label == 'forgejo-inst_1-team-org_1-team_1'
else:
assert r.incoming_label == 'forgejo-user-user_1'
assert r.incoming_label == 'forgejo-inst_1-user-user_1'
assert r.token_permission == TokenPermission(
curated_read=True,
incoming_read=True,
@ -86,14 +89,15 @@ def test_forgejo_auth_curator(httpserver, label_type, repository):
organization='org_1',
team='team_3',
label_type=label_type,
instance_id='inst_1',
repository=repository,
)
r = forgejo_auth_source.authenticate(token='something')
if label_type == 'team':
assert r.incoming_label == 'forgejo-team-org_1-team_3'
assert r.incoming_label == 'forgejo-inst_1-team-org_1-team_3'
else:
assert r.incoming_label == 'forgejo-user-user_1'
assert r.incoming_label == 'forgejo-inst_1-user-user_1'
assert r.token_permission == TokenPermission(
curated_read=True,
incoming_read=True,
@ -102,3 +106,41 @@ def test_forgejo_auth_curator(httpserver, label_type, repository):
zones_access=True,
)
assert r.user_id == 'user_1@example.com'
@pytest.mark.parametrize('label_type', ['user', 'team'])
def test_forgejo_disambiguate(httpserver, label_type):
setup_http_server(httpserver)
forgejo_authentication_results = [
ForgejoAuthenticationSource(
api_url=httpserver.url_for(f'/api/v{instance}'),
organization='org_1',
team='team_1',
label_type=label_type,
).authenticate(token='something')
for instance in ('1', '2')
]
assert (
forgejo_authentication_results[0].incoming_label
!= forgejo_authentication_results[1].incoming_label
)
forgejo_authentication_results = [
ForgejoAuthenticationSource(
api_url=httpserver.url_for(f'/api/v{instance}'),
organization='org_1',
team='team_1',
label_type=label_type,
instance_id=f'instance_{instance}',
).authenticate(token='something')
for instance in ('1', '2')
]
assert (
forgejo_authentication_results[0].incoming_label
!= forgejo_authentication_results[1].incoming_label
)
assert 'instance_1' in forgejo_authentication_results[0].incoming_label
assert 'instance_2' in forgejo_authentication_results[1].incoming_label