How to build a developed data network in AWS with the formation of the lake
In today’s world, where data is strength, traditional central designs often become points that impede access to data and innovation. The data network is a modern approach that gives ownership and deals with data as a producer managed by field teams. Aws Lake Formation (LF) simplifies the participation and judgment of the safe account, including the three: control what users can do via accounts, and follow compliance with regulations and standards. Take advantage of the formation of the lake, Aws Lambda, SQs, IAM, and S3 The organization can now implement a truly development network structure that promotes the interview of self -application and unified governance without prejudice to safety.
Overview of architecture
Architectural engineering follows the design of the AWS data network: Account A (product) and account B (consumer). The goal is to transfer AWS GLUE data schedule from Account A to Account B in a safe way with the help of services such as AWS Lake Formation (LF), S3, Lambda and Amazon SQs.
The process of consuming a clear file
Here, the Mastest.json file is the most important system composition file and shows those who have access to what, such as the floor, database name, account identifier, and permissions granted to them. Within our company, the service procedures are run using servicenow. The student raises Servicenow (SNOW) with each part of the relevant information arranged in organized models. The resulting mand.json file is then created and placed in the S3 bucket in Account A after approval of the ticket within our back interface systems.
Data sharing process
- Product Lambda in Account a
-
An event is scheduled to take place in Aws Lambda (Producer.py) whenever the Missest.json file is dropped in the S3 Aquarius account.
-
The Lambda product verifies the correctness of the request and is fulfilled by:
- If the request is to reach the same account or account.
- Whether the S3 bucket is registered in the formation of AWS Lake (LF).
-
Once the health verification is complete, the Lambda product sends a message to the Amazon SQs waiting menu in the account B.
-
This message includes details about Aws Resource Access Manager (RAM), which facilitates the sharing of resources via the account.
-
- Lambda consumer in the account b
- When you receive a SQS message, the Consumer Lambda function (Consumer.py) is running in the account B.
- The request is treated and the necessary permissions in the formation of the lake for the role of account B to reach the schedule of shared glue data from the account A.
- Once the arrival is given, the entry of a database/corresponding schedule in the account B will be created by the Lambda consumer for users to inquire for this joint data.
This provides automatic and developed architecture for the exchange of safe, controlled and effective data between AWS accounts while allowing compliance with the formation of AWS Lake, IAM roles, and AWS GLUE data.
Below the JSON model, product code, and consumer code that can be used to create and operate the above -mentioned structure.
MANCEEST.JSON
{
"Records": [{
"AccessType": "grant",
"Principal": "arn of IAM user/role in account a (if granting same account)",
"Table": {
"DatabaseName": "database name",
"Name": "table name",
"Wildcard":false
},
"Permissions": ["SELECT"],
"Cross_account": true,
"AccountID": "112233445566",
"Cross_role":"arn of cross account IAM user/role (if granting cross account)"
}]
}
Product Code – project
import boto3
import json
import logging
import os
from botocore.exceptions import ClientError
# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Environment Variables
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']
LF_REGISTER_ROLE = os.environ['LF_ROLE']
QUEUE_URL = os.environ['CROSS_QUEUE_URL']
S3_POLICY_ARN = os.environ['S3_POLICY_ARN']
S3_POLICY_NAME = os.environ['S3_POLICY_NAME']
def get_ram_invite(ram_client):
"""Retrieve Resource Access Manager (RAM) invite for cross-account sharing."""
try:
response = ram_client.get_resource_share_associations(
associationType='PRINCIPAL', associationStatus='ASSOCIATING'
)
return response['resourceShareAssociations'][0]['resourceShareArn']
except Exception as e:
logger.error(f"Error retrieving RAM invite: {e}")
raise
def delete_oldest_policy_version(iam_client, policy_arn):
"""Deletes the oldest version of an IAM policy (max 5 versions allowed)."""
try:
versions = iam_client.list_policy_versions(PolicyArn=policy_arn)['Versions']
non_default_versions = [v['VersionId'] for v in versions if not v['IsDefaultVersion']]
if non_default_versions:
oldest_version = min(non_default_versions)
iam_client.delete_policy_version(PolicyArn=policy_arn, VersionId=oldest_version)
except Exception as e:
logger.error(f"Error deleting old policy version: {e}")
raise
def update_lf_s3_policy(iam_client, iam_resource, bucket_name):
"""Modifies the Lake Formation IAM policy to include S3 paths."""
try:
account_id = boto3.client('sts').get_caller_identity()['Account']
policy_arn = f'arn:aws:iam::{account_id}:policy/{S3_POLICY_NAME}'
policy = iam_resource.Policy(policy_arn)
policy_json = policy.default_version.document
s3_arn = f'arn:aws:s3:::{bucket_name}'
updated = False
for statement in policy_json['Statement']:
if s3_arn not in statement['Resource']:
statement['Resource'].append(f'{s3_arn}/*')
updated = True
if updated:
delete_oldest_policy_version(iam_client, S3_POLICY_ARN)
iam_client.create_policy_version(
PolicyArn=policy_arn,
PolicyDocument=json.dumps(policy_json),
SetAsDefault=True
)
except Exception as e:
logger.error(f"Error updating LF S3 policy: {e}")
raise
def register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource, is_table=True):
"""Registers an S3 location with Lake Formation."""
try:
s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \
glue_client.get_database(Name=database)['Database']['LocationUri']
bucket_name = s3_location.split('/')[2]
registered_buckets = [res['ResourceArn'].split(':::')[1] for res in lf_client.list_resources()['ResourceInfoList']]
if bucket_name not in registered_buckets:
lf_client.register_resource(
ResourceArn=f'arn:aws:s3:::{bucket_name}',
UseServiceLinkedRole=False,
RoleArn=LF_REGISTER_ROLE
)
update_lf_s3_policy(iam_client, iam_resource, bucket_name)
except ClientError as e:
logger.error(f"Error registering S3 location: {e}")
raise
def grant_data_location_permissions(lf_client, glue_client, principal, database, table, is_table=True):
"""Grants Data Location Permissions to a principal."""
try:
s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \
glue_client.get_database(Name=database)['Database']['LocationUri']
bucket_name = s3_location.split('/')[2]
lf_client.grant_permissions(
Principal={'DataLakePrincipalIdentifier': principal},
Resource={'DataLocation': {'ResourceArn': f'arn:aws:s3:::{bucket_name}'}},
Permissions=['DATA_LOCATION_ACCESS'],
PermissionsWithGrantOption=['DATA_LOCATION_ACCESS']
)
except ClientError as e:
logger.error(f"Error granting Data Location Permissions: {e}")
def create_resource(database, table=None, wildcard=False):
"""Creates a resource dictionary for granting permissions."""
if database and table:
return {'Table': {'DatabaseName': database, 'Name': table}}
elif database and wildcard:
return {'Table': {'DatabaseName': database, 'TableWildcard': {}}}
elif database:
return {'Database': {'Name': database}}
return None
def revoke_permission(lf_client, principal, permissions, database, table, wildcard):
"""Revokes permissions from a principal."""
try:
resource = create_resource(database, table, wildcard)
lf_client.revoke_permissions(
Principal={'DataLakePrincipalIdentifier': principal},
Resource=resource,
Permissions=permissions
)
except Exception as e:
logger.error(f"Error revoking permissions for {principal}: {e}")
raise
def lambda_handler(event, context):
"""Lambda function to process S3 event and manage Lake Formation permissions."""
try:
sts_client = boto3.client('sts')
assume_role_response = sts_client.assume_role(
RoleArn=ADMIN_ROLE_ARN,
RoleSessionName='LFSession'
)
aws_session = boto3.session.Session(
aws_access_key_id=assume_role_response['Credentials']['AccessKeyId'],
aws_secret_access_key=assume_role_response['Credentials']['SecretAccessKey'],
aws_session_token=assume_role_response['Credentials']['SessionToken']
)
s3_client = aws_session.client("s3")
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_key = event['Records'][0]['s3']['object']['key']
obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
json_content = json.loads(obj["Body"].read().decode('utf-8'))
# Extracting manifest file details
record = json_content['Records'][0]
access_type = record['AccessType']
principal = record['Principal']
database = record['Table']['DatabaseName']
table = record['Table']['Name']
permissions = record['Permissions']
cross_account = record['Cross_account']
glue_client = aws_session.client('glue')
lf_client = aws_session.client('lakeformation')
iam_client = aws_session.client('iam')
iam_resource = aws_session.resource('iam')
if access_type == 'revoke':
revoke_permission(lf_client, principal, permissions, database, table, wildcard=False)
else:
register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource)
grant_data_location_permissions(lf_client, glue_client, principal, database, table)
except Exception as e:
logger.error(f"Lambda execution error: {e}")
raise
Consumer Consumer
import boto3
import json
import logging
import os
from botocore.exceptions import ClientError
# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Environment Variables
ACCOUNT_A = os.environ['SRC_ACC_NUM']
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']
def assume_role(role_arn):
"""Assume AWS IAM Role and return a session with temporary credentials."""
sts_client = boto3.client('sts')
try:
response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="LFSession")
return boto3.session.Session(
aws_access_key_id=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['SecretAccessKey'],
aws_session_token=response['Credentials']['SessionToken']
)
except ClientError as e:
logger.error(f"Error assuming role {role_arn}: {e}")
raise
def get_ram_invite(ram_client, ram_arn):
"""Retrieve a Resource Access Manager (RAM) invitation."""
try:
response = ram_client.get_resource_share_invitations(resourceShareArns=[ram_arn])
return response['resourceShareInvitations'][0]['resourceShareInvitationArn']
except ClientError as e:
logger.error(f"Error retrieving RAM invite: {e}")
raise
def accept_ram_invite(ram_client, ram_invite):
"""Accept a RAM invitation."""
try:
ram_client.accept_resource_share_invitation(resourceShareInvitationArn=ram_invite)
except ClientError:
logger.info("RAM invite already accepted")
def create_database(glue_client, database_name):
"""Create a Glue database if it does not already exist."""
try:
glue_client.create_database(DatabaseInput={'Name': database_name})
logger.info(f"Created database: {database_name}")
except ClientError:
logger.info(f"Database {database_name} already exists")
def create_resource_link_database(glue_client, rl_name, source_db, account_id):
"""Create a resource link for a shared Glue database."""
try:
glue_client.create_database(DatabaseInput={
'Name': rl_name,
"TargetDatabase": {
"CatalogId": account_id,
"DatabaseName": source_db
}
})
logger.info(f"Created resource link database: {rl_name}")
except ClientError:
logger.info(f"Resource link {rl_name} already exists")
def create_resource_link_table(glue_client, rl_db, rl_table, source_db, source_table, account_id):
"""Create a resource link for a shared Glue table."""
try:
glue_client.create_table(
DatabaseName=rl_db,
TableInput={
"Name": rl_table,
"TargetTable": {
"CatalogId": account_id,
"DatabaseName": source_db,
"Name": source_table
}
}
)
logger.info(f"Created resource link table: {rl_table}")
except ClientError:
logger.info(f"Resource link table {rl_table} already exists")
def grant_permissions(lf_client, principal, resource, permissions):
"""Grant permissions to a principal on a specified resource."""
try:
lf_client.grant_permissions(
Principal={"DataLakePrincipalIdentifier": principal},
Resource=resource,
Permissions=permissions,
PermissionsWithGrantOption=permissions
)
except ClientError as e:
logger.error(f"Error granting permissions to {principal}: {e}")
raise
def revoke_permissions(lf_client, principal, resource, permissions):
"""Revoke permissions from a principal."""
try:
lf_client.revoke_permissions(
Principal={"DataLakePrincipalIdentifier": principal},
Resource=resource,
Permissions=permissions
)
except ClientError as e:
logger.error(f"Error revoking permissions from {principal}: {e}")
raise
def construct_resource(database, table=None, wildcard=False, catalog_id=None):
"""Construct the resource dictionary for permissions."""
if table:
return {"Table": {"DatabaseName": database, "Name": table, **({"CatalogId": catalog_id} if catalog_id else {})}}
elif wildcard:
return {"Table": {"DatabaseName": database, "TableWildcard": {}}}
else:
return {"Database": {"Name": database}}
def lambda_handler(event, context):
"""Lambda function to process SQS messages and manage Lake Formation permissions."""
try:
records = [json.loads(record["body"]) for record in event['Records']]
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Error processing event data: {e}")
return
aws_session = assume_role(ADMIN_ROLE_ARN)
# AWS Clients
lf_client = aws_session.client('lakeformation')
glue_client = aws_session.client('glue')
ram_client = aws_session.client('ram')
for record in records:
ram_arn = record.get('ram_url')
principal = record.get('cross_role')
database = record.get('db_name')
table = record.get('table_name')
permissions = record.get('permissions', [])
wildcard = record.get('wild_card', False)
access_type = record.get('access_type')
rl_database = f'rl_{database}'
db_target = f'{database}_shared'
table_target = f'rl_{table}'
if access_type == 'grant':
try:
ram_invite = get_ram_invite(ram_client, ram_arn)
accept_ram_invite(ram_client, ram_invite)
except Exception as e:
logger.error(f"Error accepting RAM invite: {e}")
# Handle Database/Table Creation
if database and table:
create_database(glue_client, db_target)
create_resource_link_table(glue_client, db_target, table_target, database, table, ACCOUNT_A)
elif database:
create_resource_link_database(glue_client, rl_database, database, ACCOUNT_A)
# Handle Permissions
try:
resource_db = construct_resource(db_target)
resource_table = construct_resource(db_target, table_target)
if access_type == 'grant':
if database and table:
grant_permissions(lf_client, principal, resource_db, ['ALL'])
grant_permissions(lf_client, principal, resource_table, permissions)
elif database:
resource_wildcard = construct_resource(database, wildcard=True)
grant_permissions(lf_client, principal, resource_wildcard, permissions)
else:
if database and table:
revoke_permissions(lf_client, principal, resource_db, ['ALL'])
revoke_permissions(lf_client, principal, resource_table, permissions)
elif database:
resource_wildcard = construct_resource(rl_database, wildcard=True)
revoke_permissions(lf_client, principal, resource_wildcard, permissions)
except Exception as e:
logger.error(f"Error modifying permissions: {e}")
Operating time notes:
For the producer:
- Create a Lambda function and download the product code.
- Add an environment variable called Admin_Role_ARN and add the role of the Data Lake Arn official as a value
- Add an environment variable called Cross_queue_url and add URL to give consumer as a value
- Adding an environment variable called LF_ROLE and add the role of the lake formation service to the account a
- Add an environment variable called S3_Policy_arn and add S3 a custom policy as a value
The consumer text:
- Create Aws Lambda and download the consumer code.
- Add an environment variable called SRC_ACC_NUM and provide the AWS account number as a value
- Add an environment variable called Admin_Role_ARN and add the role of the Data Lake Arn official as a value
conclusion
Using the formation of AWS Lake, Glue Data Calcol, IAM, and S3 to place a work network at work gives you a way to publish elastic and safe data ownership, while closely monitoring things. With the help of Lambda, SQS and Aws Resource Access Manager (RAM), data sharing can be via different accounts automatically, making it easier for institutions to control access and allow different teams to deal with their data products without any obstacles. This setting allows people to conduct their data analysis with no compliance and security rules. Since the data scientist continues to change, embracing a method like this, which is uniform and well organized, can make the data easier to access teamwork, and lead to better decisions throughout the company.