Skip to main content
Version: v2.0 print this page

Streams

Streams enables the user to stream the data into Amorphic Datasets and can be used for further ETL/Analytics. As a Beta release, Amorphic now enables users to create Kinesis Stream and can be used to push the data to the Kinesis stream.

Kinesis

Kinesis data stream can be created using the '+' icon on the top right of page under Streams section of Ingestion. Following are the fields required to create the Kinesis Stream in Amorphic. Stream creation is an asynchronous process and runs in the background after successful creation request.

Below gif shows you how to create a stream.

Streams

AttributeDescription
Stream NameName to be used for the stream.
Stream TypeOnly Kinesis is supported for now.
Stream Mode (Optional)Type of stream. Ex: On-Demand or Provisioned. By default, it'll be Provisioned.
Data Retention Period (Hrs)A period of time during which the data stream retains data records (In hours).
Shard CountNo of shards to be added for the stream, Each shard ingests up to 1 MiB/second and 1000 records/second and emits up to 2MiB/second.

The following picture depicts the Stream details page in Amorphic:

Stream Details

In the same Details page, Estimated Cost of the stream is also displayed to show approximate cost incurred since the creation time. However there are few limitations on this feature as of now, which are -

  1. User won’t be able to estimate the cost of Streams which have pre-existing Consumers due to missing CreatedTime attribute for any Consumer.
  2. Estimation of cost will not work for Stream which is older than 3 years.
  3. Estimation of cost for Kinesis Streams depends on number of shard hours provisioned. Amorphic uses LastModifiedTime in order to calculate the total number of hours for which shards have been provisioned. So even if user updates Description or any other attribute in Streams, cost will then be calculated with respect to modification time and not the creation time. Hence user may be shown estimated cost less than actual cost incurred.

After successful stream creation Amorphic provides AWS access key and secret key which can be used to push the data to the stream. Refer the AWS documentation which describes how to push the data into stream in different ways.

Stream details can be updated by using the edit option from the details page if required. Only Stream metadata & Stream configuration can be updated.

Note

All the datasets associated with a stream can be retrieved using the below API call:

/streams/{id}?request_type=get_datasets & GET method

Consumers

For consumers Amorphic uses Kinesis Data Firehose delivery streams continuously collect and load streaming data into the destinations that you specify. Each consumer is attached to a dataset which is used as the final destination of the data that is collected from stream. Following are the fields required to create a consumer in Amorphic.

AttributeDescription
Consumer NameName to be used for the consumer.
Buffer SizeOnly Kinesis is supported for now.
Buffer IntervalA period of time during which the data stream retains data records (In hours).
Target LocationFinal destination to which the streaming data is to be stored. Currently supported target locations are either Auroramysql or Redshift (based on the database selection during deployment), Lakeformation, S3 and S3 Athena.
Create DatasetOption to create a new dataset or use an existing dataset.
Dataset ConfigurationRefer (Datasets (Create Dataset)) for details on the fields.

The Dataset FileType supported for consumer target locations are:

  • Auroramysql: CSV
  • Redshift: CSV, PARQUET
  • Lakeformation: CSV, PARQUET.
  • S3 Athena: CSV, PARQUET.
  • S3: CSV, OTHERS.

Below image shows how to create a consumer.

Stream consumers

Consumer configuration(Buffer Size & Buffer Interval) can be updated from the details page if required. If user wants to update the metadata of the dataset then it can be done by navigating to dataset details page and followed by edit.

Note
  • With 1.14 release, Dataset Association of consumers that selected Create New Dataset as No at the time of consumer creation, can also be updated.
  • This feature is only supported with API.

Data Transformations

Data transformations can be used to define a user defined lambda function for doing any kind of processing on a running stream. They are defined on a per stream basis, and can be attached to any consumer of that particular stream. Data ingested using streams is first buffered before the lambda function is invoked to perform any transformations. These buffer limits are defined on a per consumer basis, and are independent of the consumer's own buffer limits. Refer the AWS documentation for more details about the backend service.

AttributeDescription
Function NameDisplay Name for the user defined function.
Lambda HandlerThe unabridged lambda handler for the user defined function.
Memory Size (Optional)The memory to be allocated for the user defined function. Default: 128 MB.
Note

The number of invocations of the lambda function is directly proportional to the number of shards in the data stream and the number of consumers attached to the data transformation function. Therefore, Amorphic limits this functionality to a maximum of 20 overall data transformation functions and 5 consumers per data transformation function.

Below image shows how to create a data transformation function:

Data Transformation Creation

Data transformations can be updated by using the edit option from the details page if required. Both the metadata and the function code can be updated.

The following picture depicts creating a data transformation enabled consumer:

Consumer Creation with Data Transformation

Data Transformation functions can also be attached to an existing consumer, as shown below:

Consumer Updation with Data Transformation

Data Transformation logs can be viewed/generated as follows:

Data Transformation Logs

Once the logs are generated, an email alert will be sent and they can be downloaded from the same page.