Composer pipeline to upload files from Google BigQuery to an S3 bucket

I have an s3 bucket (amazon) and I want to write a composer pipeline to upload csv files into the S3 bucket on a  daily schedule. Currently all many data is in BigQuery and I'll convert it into csvs and will put it into an S3 bucket. Does anyone have any examples of doing this? Which is the easiest way to do this? I shall be grateful if someone can help CC: @ms4446 

Solved Solved
0 11 522
1 ACCEPTED SOLUTION

For your scenario, where you're aiming to upload files to an Amazon S3 bucket from a Cloud Composer and you have a single user access requirement, you don't necessarily need to use a third-party tool like JumpCloud for identity access management. AWS IAM and Google Cloud's IAM, along with the concept of workload identity federation, can suffice for your needs.

Given your use case, here's a simplified approach without needing third-party identity providers:

  • AWS IAM Role for S3 Access: Create an IAM role in AWS with the necessary permissions to write to the specified S3 bucket. This role will be assumed by a Google Cloud service account through federation.
  • Workload Identity Federation: Set up workload identity federation between AWS and Google Cloud. This allows a Google Cloud service account to assume an AWS IAM role by obtaining temporary AWS security credentials.
  • Specifying the Provider URL: When setting up AWS IAM for federation with Google Cloud, AWS will ask for an issuer URL (the URL for the OpenID Connect provider). This URL is used by AWS to trust the identity provider (Google, in this case). For Google Cloud, the issuer URL is typically https://accounts.google.com.

Steps to Implement Workload Identity Federation

  1. Create an AWS IAM Role:

    • Navigate to the AWS IAM console.
    • Create a new role and select "Web identity" as the type of trusted entity.
    • For the identity provider, select "Google" and enter your Google Cloud service account’s unique ID.
    • Attach policies that grant access to the necessary S3 resources.
  2. Set Up Google Cloud Service Account:

    • Ensure you have a Google Cloud service account that will interact with AWS services.
    • Grant this service account the necessary roles in Google Cloud to perform its intended tasks.
  3. Obtain Google Service Account Credentials:

    • Use Google Cloud IAM to create and download a JSON key file for the service account. This key file will be used to authenticate and obtain tokens for exchange with AWS temp credentials.
  4. Exchange Tokens for AWS Credentials:

    • Implement the token exchange using Google's authentication libraries and AWS APIs.

Example 

 
import boto3 

# Assuming you have obtained the Google ID token by authenticating with your service account
google_id_token = 'YOUR_GOOGLE_ID_TOKEN' 

# Assume the AWS role
sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role_with_web_identity( 
    RoleArn="arn:aws:iam::AWS_ACCOUNT_ID:role/YOUR_AWS_ROLE", 
    RoleSessionName="SessionName", 
    WebIdentityToken=google_id_token 
) 

credentials = assumed_role_object['Credentials'] 

# Now you can use these temporary credentials to access AWS services 
s3_client = boto3.client( 
    's3', 
    aws_access_key_id=credentials['AccessKeyId'], 
    aws_secret_access_key=credentials['SecretAccessKey'], 
    aws_session_token=credentials['SessionToken']
) 

# Example: List buckets
response = s3_client.list_buckets()
print(response) 

If you're finding it challenging, consider the following resources:

View solution in original post

11 REPLIES 11

Hi @ayushmaheshwari ,

Creating a data pipeline in Google Cloud Composer to upload CSV files from Google BigQuery to an Amazon S3 bucket involves several steps. Google Cloud Composer is a managed Apache Airflow service, which makes it easier to create, schedule, and monitor your workflows. Here's a step-by-step guide to achieve your goal:

Step-by-Step Guide

  1. Google Cloud Composer Environment: Ensure a Cloud Composer environment is set up. Follow Google Cloud's best practices for configuration to align with your project's needs.

  2. Google BigQuery: Prepare your data within BigQuery, ensuring it's correctly formatted and indexed for efficient extraction.

  3. Amazon S3 Bucket: Set up an S3 bucket for storing your CSV files. Apply best practices for bucket naming, security, and data lifecycle management.

  4. AWS Credentials: Securely store your AWS credentials. Use Airflow's built-in Secrets Backend, such as Google Cloud Secret Manager, for storing the AWS access key ID and secret access key. Configure an Airflow connection of type 'Amazon Web Services' with these credentials.

Create an Airflow DAG

Define your DAG to outline the data pipeline's workflow. Incorporate error handling, dynamic file naming, and clear task descriptions for improved maintainability and operability.

 

from airflow import DAG 
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator 
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': True,  # Enable email notifications on failure
    'email': ['your_email@example.com'], 
    'retries': 2,          # Increase retries for robustness
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        'bq_to_s3_pipeline',
        default_args=default_args,
        description='Transfers data from BigQuery to S3 with improved practices',
        schedule_interval='0 0 * * *', 
        start_date=days_ago(1),
        catchup=False,
) as dag:

    extract_from_bq = BigQueryToGCSOperator(
        task_id='extract_from_bq',
        source_project_dataset_table='your_project.your_dataset.your_table',
        destination_cloud_storage_uris=['gs://your-gcs-bucket/temporary/data.csv'],
        export_format='CSV',
    )

    transfer_to_s3 = GCSToS3Operator(
        task_id='transfer_to_s3',
        bucket='your-s3-bucket',
        s3_key='data/{{ ds }}.csv',  
        gcp_conn_id='google_cloud_default',
        aws_conn_id='your_aws_connection_id',
        src_bucket='your-gcs-bucket',
        src_object='temporary/data.csv',
    )

    extract_from_bq >> transfer_to_s3 

 

Key Points and Customization

  • Error Handling: Utilize Airflow's built-in mechanisms like email_on_failure to alert on task failures. Consider custom error handling logic for more complex scenarios.

  • Dynamic File Naming: Use Airflow's templating capabilities to dynamically name files based on the DAG run date or other variables, facilitating better data organization.

  • Data Transformation: For scenarios requiring data transformation between extraction and upload, integrate PythonOperator or BashOperator tasks that perform these transformations.

  • Permissions and Security: Ensure that the IAM roles for both Google Cloud and AWS have the minimum necessary permissions, adhering to the principle of least privilege.

  • Monitoring and Maintenance: Regularly monitor the DAG's performance through Airflow's UI. Set up additional monitoring and alerting through Google Cloud's operations suite for comprehensive visibility.

Thanks @ms4446 , I get all the steps. Instead of using the GCStoS3Operator, why not using the python boto3 library as below?

from google.cloud import bigquery
import boto3
from google.cloud.exceptions import NotFound
from datetime import datetime

def export_bigquery_view_to_s3(query, aws_access_key_id, aws_secret_access_key, s3_bucket, s3_key):
    # Initialize BigQuery and S3 clients
    bq_client = bigquery.Client()
    s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

    # Execute the query to get the data from the view
    query_job = bq_client.query(query)
    results = query_job.result()

    # Write the results to a temporary CSV file
    temp_csv_file = '/tmp/temp_data.csv'
    with open(temp_csv_file, 'w') as f:
        for row in results:
            f.write(','.join([str(field) for field in row]) + '\n')

    # Define the destination URI for the export
    destination_uri = f's3://{s3_bucket}/{s3_key}'

    try:
        # Upload the file to S3
        s3_client.upload_file(temp_csv_file, s3_bucket, s3_key)

        print(f'Data exported from BigQuery view query to {destination_uri}')

    except Exception as e:
        print(f'Failed to upload data to S3: {e}')

def main():
    # Set the SQL query for your BigQuery view
    query = """
    SELECT *
    FROM `your_project.your_dataset.your_view`
    """

    # Set AWS S3 bucket information
    aws_access_key_id = "your-aws-access-key-id"
    aws_secret_access_key = "your-aws-secret-access-key"
    s3_bucket = "your-s3-bucket"
    s3_key = f"exported_data_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    # Export data from BigQuery view to S3
    export_bigquery_view_to_s3(query, aws_access_key_id, aws_secret_access_key, s3_bucket, s3_key)

if __name__ == "__main__":
    main()

 

Yes, using the boto3 library is a perfectly valid approach for transferring data from Google BigQuery to Amazon S3, especially when you need more control over the process or when the existing Airflow operators do not meet your specific requirements.

Hi @ms4446 , further to our conversations above to transfer file from GCS to AWS s3,  I am planning to use temporary AWS credentials as described here

This instructs using GCP VM and then installing Amazon CLI to request credentials as described. Do you suggest any better way to have temporary AWS credentials?

The blog post you referenced suggests using a VM on Google Cloud to install the AWS CLI primarily because it's a common scenario where an individual or a system might interact with AWS services from within GCP infrastructure. However, this is not necessary for all use cases, especially when dealing with programmatic access from applications or services like Cloud Composer.

In the context of Google Cloud Composer, you don't necessarily need to set up a separate VM for the sole purpose of installing the AWS CLI, unless your specific workflow requires CLI features that are not available through the AWS SDKs (e.g., boto3 in Python).

For your use case, where you're planning to transfer data from Google BigQuery to Amazon S3, you can indeed work directly through Cloud Composer without setting up a separate VM, by leveraging temporary credentials securely. Here's how you can adapt the workload identity federation approach for Cloud Composer:

Federation Configuration

Setup Workload Identity Federation: Begin by configuring an AWS IAM Identity Provider and IAM Role according to AWS documentation. This setup enables a Google Cloud service account to assume an AWS IAM Role, facilitating secure cross-cloud authentication without the management of long-term AWS access keys.

Assuming the AWS IAM Role

Obtain Temporary AWS Credentials: Utilize the AWS SDK for Python (boto3) within Cloud Composer to programmatically assume the IAM Role. This crucial step grants you temporary credentials (access key ID, secret access key, and session token), necessary for authenticating requests to AWS S3.

Modifying Your Script for Temporary Credentials

Incorporate the process of assuming the AWS IAM Role into your data transfer script with boto3. This involves exchanging a Google-provided identity token for AWS temporary credentials.

Code Example (Python):

import boto3 

def get_google_identity_token(): 
    # Implement using Google's authentication libraries 

def get_aws_temp_credentials(google_identity_token, aws_role_arn):
    sts_client = boto3.client('sts')
    response = sts_client.assume_role_with_web_identity(
        RoleArn=aws_role_arn,
        RoleSessionName="AssumeRoleSession",
        WebIdentityToken=google_identity_token 
    )
    return response['Credentials']

# Example usage 
google_identity_token = get_google_identity_token()
temp_credentials = get_aws_temp_credentials(google_identity_token, "arn:aws:iam::123456789012:role/YourAWSS3Role") 

s3_client = boto3.client(
    's3',
    aws_access_key_id=temp_credentials['AccessKeyId'],
    aws_secret_access_key=temp_credentials['SecretAccessKey'],
    aws_session_token=temp_credentials['SessionToken'],
)

Important Considerations

  • Security and Credential Management: Prioritize secure credential exchange and handling.

  • Environment Configuration: Meticulously verify the configuration of your Cloud Composer environment and AWS IAM role.

  • Error Handling: Implement robust error handling procedures.

  • Integration with Airflow: Integrate with Airflow using a PythonOperator.

  • Documentation and Maintenance: Maintain detailed documentation for future reference.

@ms4446 I am struggling to get / implement Google's authentication libraries

For your scenario, where you're aiming to upload files to an Amazon S3 bucket from a Cloud Composer and you have a single user access requirement, you don't necessarily need to use a third-party tool like JumpCloud for identity access management. AWS IAM and Google Cloud's IAM, along with the concept of workload identity federation, can suffice for your needs.

Given your use case, here's a simplified approach without needing third-party identity providers:

  • AWS IAM Role for S3 Access: Create an IAM role in AWS with the necessary permissions to write to the specified S3 bucket. This role will be assumed by a Google Cloud service account through federation.
  • Workload Identity Federation: Set up workload identity federation between AWS and Google Cloud. This allows a Google Cloud service account to assume an AWS IAM role by obtaining temporary AWS security credentials.
  • Specifying the Provider URL: When setting up AWS IAM for federation with Google Cloud, AWS will ask for an issuer URL (the URL for the OpenID Connect provider). This URL is used by AWS to trust the identity provider (Google, in this case). For Google Cloud, the issuer URL is typically https://accounts.google.com.

Steps to Implement Workload Identity Federation

  1. Create an AWS IAM Role:

    • Navigate to the AWS IAM console.
    • Create a new role and select "Web identity" as the type of trusted entity.
    • For the identity provider, select "Google" and enter your Google Cloud service account’s unique ID.
    • Attach policies that grant access to the necessary S3 resources.
  2. Set Up Google Cloud Service Account:

    • Ensure you have a Google Cloud service account that will interact with AWS services.
    • Grant this service account the necessary roles in Google Cloud to perform its intended tasks.
  3. Obtain Google Service Account Credentials:

    • Use Google Cloud IAM to create and download a JSON key file for the service account. This key file will be used to authenticate and obtain tokens for exchange with AWS temp credentials.
  4. Exchange Tokens for AWS Credentials:

    • Implement the token exchange using Google's authentication libraries and AWS APIs.

Example 

 
import boto3 

# Assuming you have obtained the Google ID token by authenticating with your service account
google_id_token = 'YOUR_GOOGLE_ID_TOKEN' 

# Assume the AWS role
sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role_with_web_identity( 
    RoleArn="arn:aws:iam::AWS_ACCOUNT_ID:role/YOUR_AWS_ROLE", 
    RoleSessionName="SessionName", 
    WebIdentityToken=google_id_token 
) 

credentials = assumed_role_object['Credentials'] 

# Now you can use these temporary credentials to access AWS services 
s3_client = boto3.client( 
    's3', 
    aws_access_key_id=credentials['AccessKeyId'], 
    aws_secret_access_key=credentials['SecretAccessKey'], 
    aws_session_token=credentials['SessionToken']
) 

# Example: List buckets
response = s3_client.list_buckets()
print(response) 

If you're finding it challenging, consider the following resources:

Thanks a lot @ms4446 , this is the code I have for 

get_google_identity_token(): you mentioned.  I have used this API from GCP: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/generateIdT...
 
The email : 
"my_service_account.iam.gserviceaccount.com" I have my actual service account email . For this service account I have the permissions for: 
Owner
Service Account OpenID Connect Identity Token Creator
Service Account Token Creator
 
Yet, I am getting a 403 response from the line: 
    token_response = authed_session.request('POST',sa_credentials_url,
                                        data=bodyheaders=headers)
 Can you give me some tips? I shall be graeful
 

 

def get_google_identity_token():

    audience = 'https://s3.amazonaws.com'

# #1 Get the default credential to generate the access token
    credentials, project_id = google.auth.default(
            scopes='https://www.googleapis.com/auth/iam')

# #2 To use the current service account email
    service_account_email = "my_service_Account_email.iam.gserviceaccount.com"

# #3 prepare the call the the service account credentials API

    sa_credentials_url =  f'https://iamcredentials.googleapis.com/' \
                      f'v1/projects/-/serviceAccounts/'  \
                      f'{service_account_email}:generateIdToken'

  

    headers = {'Content-Type': 'application/json'}

    # Create an AuthorizedSession that includes 
# automatically the access_token based on your credentials

    authed_session = AuthorizedSession(credentials)

    body = json.dumps({'audience': audience})

    # Make the call 
    token_response = authed_session.request('POST',sa_credentials_url,
                                        data=body, headers=headers)

    jwt = token_response.json()
    id_token = jwt['token']

    return id_token

 

The above problem is solved, please ignore @ms4446 

@ayushmaheshwari  Was it a permission issue? Please share how you solved it.

@ms4446 , instead of using 

credentials, project_id = google.auth.default(
            scopes='https://www.googleapis.com/auth/iam')

I replaced it with :

credentials = service_account.Credentials.from_service_account_file(
    service_account_json_path, scopes=["https://www.googleapis.com/auth/iam"]