Data is the new oil! Tools of Data Collection: 10.000 ft overview of AWS Data Collection Services
The main intent in writing this article is to introduce the reader to AWS services used for Data Collection. I aim to provide general information, main use cases and important bits of knowledge about these services in short but punchy bullet points, so let’s start!
You can collect data in three manners, respectively:
- Real -Time: Kinesis Data Streams, Simple Queue Service, AWS IoT
- Near Real-Time: Kinesis Data Firehose(KDF), Database Migration Service.(This is also called reactive which means that it depends on an event action)
- Batch: Snowball, Data Pipeline(to do historical analysis)
- This service is a managed alternative to Apache Kafka, it is used for collecting logs, metrics, clickstreams & IoT data.
- Main use case is for real-time big data collection.
- Can integrate with streaming processing frameworks like Apache Spark and Apache Nifi.
- Data is replicated to 3 Availability Zones so that it is fail-safe.
AWS Kinesis Streams: Low latency streaming ingest at a scale.
AWS Kinesis Analytics: Do Real-time analytics on data streams using SQL.
AWS Kinesis Firehose: Load streams into S3, Redshift, ElasticSearch or Splunk for analysis.(near Real-time)
- Streams have a data retention period for 24 hours by default, and can go up to 7 days.
- Streams are divided in shards/partitions. You can read the same data over and over, which is good for multiple applications consuming the same stream.
- Data inserted into Kinesis can’t be deleted(it is immutable).
- Billing is per shard. You can merge & re-shard them. Records are available in ordered shards.
AWS Kinesis Streams Records
- Records are data blobs of up to 1 MB, They can be anything. Record keys are used to send records to shards.
- “The Hot Partition” problem: when all your data has the same record key. This causes overwrites thus is a huge problem that needs to be dealt with. Throttles are caused by a few partitions in the table that receive more requests than the average partition. Hot partitions happen when for example all the devices use the same key and produce records to the same stream. Make sure to use a good non-repeating device id as your partition key. Or increase shards or open “retries with backoff”.
Kinesis Data Streams Important Limits:
- Producer: 1Mb/s or 1000 messages at write per Shard
- Consumer: 2Mb/s per shard across all consumers or 5 API calls per second.
How to produce data into Kinesis?
- Kinesis SDK: Uses batching, can be used on Android, IOS, choose when you have low throughput higher latency simple API. AWS IoT, CLoudWatch Logs, Kinesis Data Analytics use the SDK.
- Kinesis Producer Library (KPL): C++ or Java is used. Automated retries, Synchronous or Asynchronous API can be used. Can send metrics to CloudWatch. Can do batching.(Collect & Aggregate) Batching allows high throughput. Batching is off by default. There is also compression available.
- Spark, Kafka Connect, NiFi Connectors.
- Kinesis Agents: Used for logging, you can transform your logs to csv and json and vice versa. Java-based. Only for Linux AMIs. Connects to CloudWatch.
Batching: KPL will aggregate multiple records according to a rule. After that it will collect the aggregated records and make only one API call.
How to consume data from Kinesis?
- Kinesis SDK: records will be polled by consumers from shards. Each shard has 2 MB total aggregate throughput. There are throttle constraints 5 API calls, 200ms latency, 10Mb of data.
- Kinesis Client Library: Java-first Golang, Python etc. Disaggregates the KPL. Uses DynamoDB for checkpointing and coordination. But use either on Demand DynamoDB or allocate enough read write provisions.
- Kinesis Connector Library: Java library, you can use it to write data to S3, DynamoDB, Redshift, Elasticsearch. Runs on EC2 instances. For this use case Kinesis Firehose and AWS Lambda are used in conjunction.
- 3rd party libraries: Spark, log4j, Flume,Kafka Connect.
- Kinesis Firehose
- AWS Lambda : can read from KPL, you can do lightweight ETL with Lambda. Also can be used to send notifications & emails. Has configurable batch size. It can connect to S3, DynamoDB, Redshift, ElasticSearch.
Kinesis Enhanced Fan-out
Normally shards used to have 2Mb/s limits but now consumers who are connected to the shards have that.
- Uses HTTP2
- Reduced Latency
- Higher Throughput
When to use it?
- Normal Consumers: Low number of data consuming applications, where 200ms Latency is an OK metric, where low cost is important.
- Enhanced Fan Out: Multiple Consumers for the same data stream are allowed, It has low latency like 70ms, default limit of 5 consumers per data stream. It has Higher costs.
- Adding shards or “Shard Splitting”. (1Mbs per shard): Old shard will be closed when the data is expired.
Shard splitting will prevent hot shards as well.
- Merge Shards: can be used to merge low trafficked shards. Saves Costs.
There are no auto scaling options available for Kinesis; you can implement auto scaling by using Lambda.
You can’t do parallel resharding, so plan in advance. Can only do one resharding at a time takes a few secs. So scaling is not instantaneous. There are limitations for scaling in Kinesis which block you to scale up and down quickly.
- Keep in mind! From 1000 shards to 2000 shards it takes roughly 8 hours.
Kinesis Data Firehose: Fully-managed, near real-time(waits for batches to be filled), Use it to load data into Redshift, S3, Splunk, ElasticSearch. (memorize these).
There is automatic scaling, KDF has many data conversions, supports compression, you only pay for used capacity.
- Spark and KCL can’t read from Kinesis Data Firehose.
Kinesis Firehose acts as a Middleman.
You do not lose data with Kinesis Firehose.
- Firehose accumulates records in a buffer. Buffer Size & Buffer Time is set to flush records from the buffer. Buffers scale automatically. At least 1 minutes and at least 1–2 Mbs for buffer specs are preferred.
Kinesis Data Streams vs. Kinesis Firehose
- You can write custom code to customize producers & consumers.
- Manual Scaling
- Can use lambda to insert data into Elasticsearch.
- Data storage for 1 days to a week, replay capability, multiple consumers are supported.
- Fully-managed, autoscaled
- Serverless Data Transformations
- Near Real-time
- No Data Storage
One rule of thumb is S3 can be used as a data lake in general.There are data transformation options in Kinesis Firehose. Firehose is not true real-time.
- AMI: Amazon Machine Image for EC2
- PuTTY is used to ssh into EC2 instances in Windows.
- It is preferred to put the configuration file for the kinesis agent in a directory. This contains access keys and flows configurations. Instead of copying credentials you can assign a role to your EC2 access, this is considered as a best practice for security.(agent.json)
- You can configure agents for streaming as well. You put streaming data types and data conversions as well in the agent.json file.
Other Related Services to Data Collection
Athena: is used to query data lakes.
Redshift: AWS Data Warehousing Solution.
Amazon Quicksight: Is used for Visualization.
AWS Glue: Used for structuring data.
SQS is a queue system for message producers to send messages to and for consumers to poll messages from.(Polling means to check readiness of a device or the state of a message)
- Fully managed, scales automatically(horizontally), low latency, default retention rate of messages is 4 days.
- Messages have to be smaller than 256kb.
Producing Messages to SQS
Send a message body with metadata(optional), get back hashed version of body and message unique id.
Consuming Messages from SQS
Consumers poll for messages(up to 10), they must process the message within the visibility timeout. Afterwards they delete the message from the queue using the message unique id. This only allows one application to consume messages!.
SQS FIFO Queue
- queue name must end in .fifo
- lower throughput
- messages sent in order will start to be read from that order(123 → 123)
To send large messages with SQS, the extended client library is used with an S3 bucket.
Use Cases for SQS
- Decouple Applications.(for example handling payments)
- Buffer writes to a database.(an example is aggregating bid logs for online advertising)
- Handle large loads of messages.
SQS can also integrate with CloudWatch.
Limits: max 120000 messages in flight, 256kb, 10 messages per batch request, XML, JSON, .txt are allowed.
Pricing: Pay-per API request and networking.
Kinesis Data Streams vs SQS
- It is important to know where to use Kinesis Data Streams and where to use SQS
Kinesis vs SQS
SQS vs Kinesis Use Cases(Kinesis for Big Data Applications, SQS for Dev Workflows)
You can connect your IoT Data Streams to AWS with Thing Registry, Device Gateway(connection point), IoT Message Broker and Rules Engine(you can set rules and connect to other AWS services like Lambda, Database, Mobile Device etc.). Device Shadow is used when the device goes offline and when you want to change its state manually.
The IOT Device Gateway is fully managed, and can scale to billions of devices.
Thing Registry is the IAM of IoT.
Authentication for AWS IoT:
- Load X.509 certificates to things, and used custom tokens/authorizers
- Cognito Identities(Facebook, Google Login) for mobile devices.
- IAM, Federated Identities for Web, Desktop, CLI
You can use Rules Engine to send data to ML services. To send data to kinesis create an Iot Topic, set IoT rules and use Rules Actions.
IoT Greengrass : Local chip implemented in the device like a smart coffee pot that can run serverless functions, you can do preprocessing of data with this, execute ML predictions on the fly, it is also used to keep data in sync.
Database Migration Service
Your database stays live during using this service, you can also migrate heterogenous like from Microsoft SQL to Aurora.
Uses EC2 instance to replicate the database, uses Continuous Data Replication.
- AWS Schema Conversion Tools is used to change database schemas from one engine to another. This service creates DMS endpoints and tasks.
- You can do both OLTP(to MySQL, PostgreSQL and Aurora) and OLAP(to Redshift)
- OLTP is the data of day to day business. These are atomic transactions vital for business.
- OLAP is the analytics derived from such data.
Setup a dedicated private connection from a remote network to your VPC. Used for big data transfer. It is hybrid both on prem and cloud. Better security. Better Network quality.
A Network line for big data flows on prem to cloud. 10Gbit/s
- Physical data transfer device. If you have very big data which will take weeks to transfer use this. The data will be uploaded to an S3 bucket.
Snowball Edge allows you to use EC2 AMIs to do data processing on the go from the physical snowball device. You can also use serverless functions too.
Snowmobile: A truck for transferring data.
MSK — Managed Streaming for Apache Kafka
- Fully-managed Apache Kafka on AWS(Kinesis Alternative)
- MSK will create clusters, nodes and manage the zookeeper.
- Multi AZ and Automatic Recovery is available too.
- Data is stored on EBS Volumes.
- You just need to configure the Apache Kafka Data Plane(Consumers/Producers).
- MSK Cluster are distributed in 3 AZs.
- Max Message size in Kafka is 1Mb.
- You can override Broker settings and change many parameters for Kafka. Kafka is faster then Kinesis.
- You can introduce Delay to producers.
TLS encryption is used on flight and between brokers. You can also do Authentication using TLS certifications. Authorization and Authentication is done by using an EC2 instance. Kafka also has its own ACL lists.
Cloudwatch and Prometheus can be used for monitoring your clusters or you can send Broker logs to Kinesis Data Firehose, S3 or CloudWatch too.
Some Extra Tidbits of AWS Knowledge
- Kinesis Data Firehose scales automatically. Kinesis Data Streams do not do that!
- Kinesis Producer Library allows compression but it must be done by the end user.
- Enhanced Fan-out mode has 70ms Latency
- GetRecords API Call is limited by 5 and takes 1 seconds.
- If your KPL is lagging your DynamoDB might be under-provisioned.
- Kinesis Data Firehose does not Write to DynamoDB.
- MQTT, HTTP1.1., Websockets are supported by IoT Device Gateway.
- You can continuously replicate using Database Migration Services from on prem to cloud.
- For delivering faster data you can use the Public Internet and set a VPN. Direct Connect takes more than 2 days, and Snowball is slower as well.
Finally if you have made it this far you now know a fair bit of the AWS Data Collection Services. I hope this article has been of help to you in your Data Engineering endeavours.