Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google/auth/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Shared constants."""

_SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations"
_WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/locations/global/workforcePools/{pool_id}/allowedLocations"
_WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"
71 changes: 66 additions & 5 deletions google/auth/external_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import json
import re

from google.auth import _constants
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
Expand Down Expand Up @@ -81,6 +82,7 @@ class Credentials(
credentials.Scoped,
credentials.CredentialsWithQuotaProject,
credentials.CredentialsWithTokenUri,
credentials.CredentialsWithTrustBoundary,
metaclass=abc.ABCMeta,
):
"""Base class for all external account credentials.
Expand Down Expand Up @@ -166,10 +168,7 @@ def __init__(
self._scopes = scopes
self._default_scopes = default_scopes
self._workforce_pool_user_project = workforce_pool_user_project
self._trust_boundary = {
"locations": [],
"encoded_locations": "0x0",
} # expose a placeholder trust boundary value.
self._trust_boundary = trust_boundary

if self._client_id:
self._client_auth = utils.ClientAuthentication(
Expand Down Expand Up @@ -235,6 +234,7 @@ def _constructor_args(self):
"scopes": self._scopes,
"default_scopes": self._default_scopes,
"universe_domain": self._universe_domain,
"trust_boundary": self._trust_boundary,
}
if not self.is_workforce_pool:
args.pop("workforce_pool_user_project")
Expand Down Expand Up @@ -405,8 +405,24 @@ def get_project_id(self, request):

return None

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
"""Refreshes the access token.

For impersonated credentials, this method will refresh the underlying
source credentials and the impersonated credentials. For non-impersonated
credentials, it will refresh the access token and the trust boundary.
"""
self._refresh_token(request)
# If we are impersonating, the trust boundary is handled by the
# impersonated credentials object. We need to get it from there.
if self._service_account_impersonation_url:
if self._impersonated_credentials:
self._trust_boundary = self._impersonated_credentials._trust_boundary
else:
# Otherwise, refresh the trust boundary for the external account.
self._refresh_trust_boundary(request)

def _refresh_token(self, request):
scopes = self._scopes if self._scopes is not None else self._default_scopes

# Inject client certificate into request.
Expand Down Expand Up @@ -456,6 +472,43 @@ def refresh(self, request):

self.expiry = now + lifetime

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API."""
# If it's a workforce pool, use the workforce-specific endpoint.
if self.is_workforce_pool:
# Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID
match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience)

if not match:
raise exceptions.InvalidValue("Invalid workforce pool audience format.")

pool_id = match.groups()[0]

return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
universe_domain=self._universe_domain, pool_id=pool_id
)

# For workload identity pools, parse the project number and pool ID from
# the audience.
# Audience format: //iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID
match = re.search(
r"projects/([^/]+)/locations/global/workloadIdentityPools/([^/]+)",
self._audience,
)

if not match:
raise exceptions.InvalidValue(
"Invalid workload identity pool audience format."
)

project_number, pool_id = match.groups()

return _constants._WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
universe_domain=self._universe_domain,
project_number=project_number,
pool_id=pool_id,
)

def _make_copy(self):
kwargs = self._constructor_args()
new_cred = self.__class__(**kwargs)
Expand All @@ -482,6 +535,12 @@ def with_universe_domain(self, universe_domain):
cred._universe_domain = universe_domain
return cred

@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def with_trust_boundary(self, trust_boundary):
cred = self._make_copy()
cred._trust_boundary = trust_boundary
return cred

def _should_initialize_impersonated_credentials(self):
return (
self._service_account_impersonation_url is not None
Expand Down Expand Up @@ -530,6 +589,7 @@ def _initialize_impersonated_credentials(self):
lifetime=self._service_account_impersonation_options.get(
"token_lifetime_seconds"
),
trust_boundary=self._trust_boundary,
)

def _create_default_metrics_options(self):
Expand Down Expand Up @@ -608,6 +668,7 @@ def from_info(cls, info, **kwargs):
universe_domain=info.get(
"universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
),
trust_boundary=info.get("trust_boundary"),
**kwargs
)

Expand Down
54 changes: 42 additions & 12 deletions google/auth/external_account_authorized_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import datetime
import io
import json
import re

from google.auth import _constants
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
Expand All @@ -50,6 +52,7 @@ class Credentials(
credentials.CredentialsWithQuotaProject,
credentials.ReadOnlyScoped,
credentials.CredentialsWithTokenUri,
credentials.CredentialsWithTrustBoundary,
):
"""Credentials for External Account Authorized Users.

Expand All @@ -76,6 +79,7 @@ def __init__(
scopes=None,
quota_project_id=None,
universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
trust_boundary=None,
):
"""Instantiates a external account authorized user credentials object.

Expand All @@ -101,6 +105,7 @@ def __init__(
create the credentials.
universe_domain (Optional[str]): The universe domain. The default value
is googleapis.com.
trust_boundary (Mapping[str,str]): A credential trust boundary.

Returns:
google.auth.external_account_authorized_user.Credentials: The
Expand All @@ -111,7 +116,7 @@ def __init__(
self.token = token
self.expiry = expiry
self._audience = audience
self._refresh_token = refresh_token
self._refresh_token_val = refresh_token
self._token_url = token_url
self._token_info_url = token_info_url
self._client_id = client_id
Expand All @@ -121,6 +126,7 @@ def __init__(
self._scopes = scopes
self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
self._cred_file_path = None
self._trust_boundary = trust_boundary

if not self.valid and not self.can_refresh:
raise exceptions.InvalidOperation(
Expand Down Expand Up @@ -157,7 +163,7 @@ def info(self):
def constructor_args(self):
return {
"audience": self._audience,
"refresh_token": self._refresh_token,
"refresh_token": self._refresh_token_val,
"token_url": self._token_url,
"token_info_url": self._token_info_url,
"client_id": self._client_id,
Expand All @@ -168,6 +174,7 @@ def constructor_args(self):
"scopes": self._scopes,
"quota_project_id": self._quota_project_id,
"universe_domain": self._universe_domain,
"trust_boundary": self._trust_boundary,
}

@property
Expand All @@ -177,7 +184,7 @@ def scopes(self):

@property
def requires_scopes(self):
""" False: OAuth 2.0 credentials have their scopes set when
"""False: OAuth 2.0 credentials have their scopes set when
the initial token is requested and can not be changed."""
return False

Expand All @@ -194,13 +201,13 @@ def client_secret(self):
@property
def audience(self):
"""Optional[str]: The STS audience which contains the resource name for the
workforce pool and the provider identifier in that pool."""
workforce pool and the provider identifier in that pool."""
return self._audience

@property
def refresh_token(self):
"""Optional[str]: The OAuth 2.0 refresh token."""
return self._refresh_token
return self._refresh_token_val

@property
def token_url(self):
Expand All @@ -219,13 +226,18 @@ def revoke_url(self):

@property
def is_user(self):
""" True: This credential always represents a user."""
"""True: This credential always represents a user."""
return True

@property
def can_refresh(self):
return all(
(self._refresh_token, self._token_url, self._client_id, self._client_secret)
(
self._refresh_token_val,
self._token_url,
self._client_id,
self._client_secret,
)
)

def get_project_id(self, request=None):
Expand Down Expand Up @@ -259,7 +271,7 @@ def to_json(self, strip=None):
strip = strip if strip else []
return json.dumps({k: v for (k, v) in self.info.items() if k not in strip})

def refresh(self, request):
def _refresh_token(self, request):
"""Refreshes the access token.

Args:
Expand All @@ -278,18 +290,29 @@ def refresh(self, request):
)

now = _helpers.utcnow()
response_data = self._make_sts_request(request)
response_data = self._sts_client.refresh_token(request, self._refresh_token_val)

self.token = response_data.get("access_token")

lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
self.expiry = now + lifetime

if "refresh_token" in response_data:
self._refresh_token = response_data["refresh_token"]
self._refresh_token_val = response_data["refresh_token"]

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API."""
# Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID
match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience)

if not match:
raise exceptions.InvalidValue("Invalid workforce pool audience format.")

pool_id = match.groups()[0]

def _make_sts_request(self, request):
return self._sts_client.refresh_token(request, self._refresh_token)
return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
universe_domain=self._universe_domain, pool_id=pool_id
)

@_helpers.copy_docstring(credentials.Credentials)
def get_cred_info(self):
Expand Down Expand Up @@ -324,6 +347,12 @@ def with_universe_domain(self, universe_domain):
cred._universe_domain = universe_domain
return cred

@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def with_trust_boundary(self, trust_boundary):
cred = self._make_copy()
cred._trust_boundary = trust_boundary
return cred

@classmethod
def from_info(cls, info, **kwargs):
"""Creates a Credentials instance from parsed external account info.
Expand Down Expand Up @@ -360,6 +389,7 @@ def from_info(cls, info, **kwargs):
universe_domain=info.get(
"universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
),
trust_boundary=info.get("trust_boundary"),
**kwargs
)

Expand Down
4 changes: 3 additions & 1 deletion google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def _refresh_token(self, request):
self._source_credentials.token_state == credentials.TokenState.STALE
or self._source_credentials.token_state == credentials.TokenState.INVALID
):
self._source_credentials.refresh(request)
self._source_credentials._refresh_token(request)
Comment thread
sai-sunder-s marked this conversation as resolved.

body = {
"delegates": self._delegates,
Expand Down Expand Up @@ -510,13 +510,15 @@ def from_impersonated_service_account_info(cls, info, scopes=None):
target_principal = impersonation_url[start_index + 1 : end_index]
delegates = info.get("delegates")
quota_project_id = info.get("quota_project_id")
trust_boundary = info.get("trust_boundary")

return cls(
source_credentials,
target_principal,
scopes,
delegates,
quota_project_id=quota_project_id,
trust_boundary=trust_boundary,
)


Expand Down
9 changes: 4 additions & 5 deletions google/oauth2/service_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import copy
import datetime

from google.auth import _constants
from google.auth import _helpers
from google.auth import _service_account_info
from google.auth import credentials
Expand All @@ -84,9 +85,6 @@

_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
)


class Credentials(
Expand Down Expand Up @@ -520,8 +518,9 @@ def _build_trust_boundary_lookup_url(self):
raise ValueError(
"Service account email is required to build the trust boundary lookup URL."
)
return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
self._universe_domain, self._service_account_email
return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
universe_domain=self._universe_domain,
service_account_email=self._service_account_email,
)

@_helpers.copy_docstring(credentials.Signing)
Expand Down
Loading