Skip to main content
Version: v2.3 print this page

Advanced usage of file load validation node

In advanced use cases where preceding etl job can have concurrent runs, users will have to provide an input manifest file from etl job for file load validation node to identify correct files. Another use case is when users wants to add multiple partitions in addition to the default 'upload_date' partition. Following are the two cases mixed into one example.

Following are the constraints for a valid input manifest file:

  1. File should be of type json
  2. File should contain the schema shown below
Schema
    [{
"Domain": "string",
"DatasetName": "string",
"Partitions": [
{"upload_date": "string", "custom_partition": "string",......},
{"upload_date": "string", "custom_partition": "string",......},
......
]
},
............
]
  1. File should be written to the following s3 location:
s3://<etl_bucket_name>/<preceding_etl_job_id>/temp/manifest-files/<workflow_name>/workflow_execution_id-input_manifest_file.json

Important thing to note: The multi-partition concept only applies for datasets in the S3-Athena Target Location

Here are some sample input manifest files:

Input manifest file for etl job writing to a single dataset: Sample_input_manifest_file Input manifest file for etl job writing to multiple datasets: Sample_input_manifest_file_2

After workflow execution, workflow execution properties of file validation node is shown below: file_load_validation_workflow_execution_properties

When you click 'Download' button next to output manifest file, you can see details of file load statuses as shown below: file_load_validation_ouput_manifest_file

Sample code snippet writing data to multiple datasets with multi-partition:

  1. Data is written to customers dataset in insurance domain

  2. Corresponding partitions of customers dataset are stored in a dict

  3. Data is written to employees dataset in insurance domain

  4. Corresponding partitions of employees are stored in a dict

  5. Partition information of both datasets are written to etl bucket as input_manifest_file

  6. It should be noted that all values written between < > should be replaced with the appropriate values in the below sample code

    import os import sys import datetime import boto3 import time from awsglue.utils import getResolvedOptions import pandas as pd from io import StringIO import boto3 import json

def write_csv_dataframe_to_s3(s3_resource, dataframe, bucket, file_path):
"""
writes csv file to s3
"""
csv_buffer = StringIO()
dataframe.to_csv(csv_buffer, index=False)
s3_resource.Object(bucket, file_path).put(Body=csv_buffer.getvalue())

def write_json_manifest_file_to_s3(s3, json_data, file_path):
"""
writes json file to s3
"""
s3object = s3.Object('cdap-<region>-<aws_account_id>-master-etl', file_path)
s3object.put(Body=(bytes(json.dumps(json_data).encode('UTF-8'))))


json_data = []
args = getResolvedOptions(sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
s3 = boto3.resource('s3')
domain_name = 'insurance'
dataset_name = 'customers'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/customers_1.csv'.format(path))

# Create a dict for domain, dataset combination and partition information
customers_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
customers_partition_dict.update({"Partitions": partitions})
json_data.append(customers_partition_dict)

domain_name = 'insurance'
dataset_name = 'employees'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/employees_1.csv'.format(path))

# Create a dict for domain, dataset combination and partition information
employees_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
employees_partition_dict.update({"Partitions": partitions})
json_data.append(employees_partition_dict)

# Write json_data as input manifest file
file_path = "<etl_job_id>/temp/manifest-files/{}/{}-input_manifest_file.json".format(workflow_name, workflow_run_id)
write_json_manifest_file_to_s3(s3, json_data, file_path)