A tutorial for designing your extract, transform, load workflow with OpenStack Sahara.

OpenStack controls large pools of compute, storage and networking resources; This article primarily focuses on how OpenStack plays a key role in addressing a big data use case.

Big data on OpenStack!

These days, data is generated everywhere and is growing exponentially. Data from web servers, application servers, database servers in the form of user information, log files and system state information. A huge volume of data is also generated from internet of things devices like sensors, vehicles, industrial devices. Data generated from the scientific simulation models is also an example of a source of big data. Storing and performing analytics with data like this with traditional software tools can be difficult; Hadoop can address the issue.

Let me share my use case with you. I have a bulk amount of data stored in a relational database management system environment. The RDBMS doesn’t perform well when the data set grows larger — a problem that will only continue to grow along with it. At this stage, I’d like to avoid adopting NoSQL culture. I need to store and process a bulk amount of data in a cost-effective way. Should I rely on high-end server in a non virtualized environment? My requirement is to scale the cluster at any time and I also need a better dashboard to manage all of its components.

I planned to set up a Hadoop cluster on top of OpenStack and create my ETL job environment. Hadoop is an industry standard framework for storing and analyzing a large data sets with its fault tolerant Hadoop distributed file system and MapReduce implematation. Scalability, however, is a very common problem in a typical Hadoop cluster. Openstack has introduced a project called Sahara – data processing as a Service. Openstack Sahara aims to provision and manage data processing frameworks such as hadoop mapreduce, spark and storm in a cluster topology. This project is similar to the data analytics platform provided by Amazon Elastic MapReduce (EMR) service. Sahara deploys the cluster in a few minutes. Besides, Openstack Sahara can scale the cluster by adding or removing worker nodes based on demand.

Benefits of managing Hadoop cluster with Openstack Sahara

  • The cluster can be provisioned faster and easy to configure.
  • Like other OpenStack services, Sahara service can be managed through a powerful REST API, CLI and Horizon dashboard.
  • Plugins available to support mutliple Hadoop vendor such as Vannila (Apache Hadoop), HDP(ambari), CDH(Cloudera), MapR, Spark, Storm.
  • Cluster size can be scaled up and down based on demand.
  • It can be integrated with OpenStack Swift to store the data processed by Hadoop and Spark.
  • Cluster monitoring can be made simple.
  • Apart from cluster provisioning, Sahara can be used as analytics as-a-service for ad-hoc or bursty analytic workloads.

Architecture

Openstack Sahara is designed to leverage the core services and other fully managed services of OpenStack. It makes Sahara more reliable and ability to manage the Hadoop cluster efficiently and you have the option to use services including Trove and Swift. Let’s take a look at Sahara architecture.

  • Sahara service has an API server which responds to HTTP request from the end user and interacts with other OpenStack services to perform its function.
  • Keystone (identity as-a-service) – authenticates users and provides security tokens that are used to work with OpenStack, limiting a user’s abilities in Sahara to their OpenStack privileges.
  • Heat (orchestration as-a-service) – used to provision and orchestrate the deployment of data processing clusters.
  • Glance (virtual machine image as-a-service) – stores VM images with operating system and pre-installed Hadoop/Spark software packages to create a data processing cluster.
  • Nova (compute) – provisions a virual machine for data processing clusters.
  • Ironic (bare metal as-a-service) provisions a bare metal node for data processing clusters.
  • Neutron (networking) – facilitates networking services from basic to advanced topology to access the data processing clusters.
  • Cinder (block storage) – provides a persistent storage media for cluster nodes.
  • Swift (object storage) – provides a reliable storage to keep job binaries and the data processed by hadoop/spark.
  • Designate (DNS as-a-service) – provides a hosted zone to keep DNS records of the cluster instances. Hadoop services communicates with the cluster instances by their hostnames.
  • Ceilometer (telemetry) – collects and stores the metrics about the cluster for metering and monitoring purposes.
  • Manila (file share) – can be used to store job binaries and data created by the job.
  • Barbican (key management service) – stores sensitive data such as password and private keys securely.
  • Trove (database as-a-service) – provides data base instance for hive metastore and to store the states of the Hadoop services and other management services.

How to set up a Sahara cluster

Please follow the steps the installation guide for deploying Sahara in your environment. There are several ways where to deploy  it,  Kolla is also a good choice if you want to experiment with it.

You can also manage Sahara projects through the Horizon dashboard.

ETL (extract, transform and load) or ELT (extract, load and transform) with a Sahara cluster

There are numerous ETL tools available in the market;  traditional data warehouse have their own benefits and limitations, for example it might be in some other location other than your data source. I’m targeting Hadoop as an ideal platform to run ETL jobs. Data in your datastore has a variety of data including structured, semi-structured and unstructured. The Hadoop ecosystem has tools to ingest data from different data sources including databases, files and other data streams and store it in a centralized Hadoop Distributed File System (HDFS). As your data grows rapidly, the Hadoop cluster can be scaled and leverage OpenStack Sahara.

Apache Hive is the data warehouse project built on top of the Hadoop ecosystem and a proven tool for doing ETL analysis. Once the data is extracted from the data sources with tools (such as Sqoop, Flume, Kafka, etc.), it should be cleansed and transformed by Hive or pig scripts with the MapReduce technique.

Another advantage of Hive is that it is an interactive query engine and can be accessed via Hive Query Language. It resembles SQL. So, your database person can execute a job in the Hadoop ecosystem without prior knowledge of Java and MapReduce concepts. The Hive query execution engine parses the hive query and converts it in to a sequence of MapReduce/Spark job to a cluster. Hive can be accessed by JDBC/ODBC driver and thrift clients.

Oozie is a workflow engine available in Hadoop ecosystem, A workflow is simply a set of tasks that must to be executed as a sequence in a distributed environment. Oozie helps create a simple workflow to cascading multiple workflows and create coordinated jobs. It’s also ideal for creating workflows for complex ETL jobs, although it does not have modules to support all actions related to Hadoop. (A large organization might want their own workflow engine to execute their tasks, for example.) We can use any workflow engine to carryout our ETL job, for example Openstack Mistral – workflow as-a-service. Apache oozie resembles Openstack Mistral in some aspects, acting as a job scheduler that can be triggered at regular intervals.

Let’s look at a typical ETL job process with Hadoop where an application stores its data in a MySQL server. The data stored here needs to be analyzed at a minimum of cost and time.

Extract

The very first step is to extract data from MySQL and store it in HDFS. Apache Sqoop can be used to export/import from a structured data source such as RDBMS data store. If the data to be extracted is semi-structured or unstructured, you can use Apache Flume to ingest the data from data soruce such as web server log, twitter data stream or sensor data.

sqoop import \
--connect jdbc:mysql://<database host>/<database name> \
--driver com.mysql.jdbc.Driver–table <table name> \
--username <database user> \
--password <database password> \
--num-mappers 3 \
--target-dir hdfs://hadoop_sahara_local/user/rkkrishnaa/database

Transform

The data extracted from the above phase is not in a proper format, it’s just raw data. It should be cleansed by applying a proper filter and data aggregation from multiple tables. This is our staging and intermediate area for storing data in HDFS.

At this point we need to design hive schema for each table and create a database to transform the data stored in staging area. Typically, your data is in .csv format and each record is delimited by comma.

We don’t need to check HDFS data to know how it’s stored; with a few data type exceptions, it should be compatible with Hive.

Once the database is modeled, we can load the extracted data for cleaning. The data in the table is still de-normalized. Aggregate the required columns from the different table.

Open hive or beeline shell or if you have HUE (Hadoop User Experience) installed, please open the Hive editor and run the commands given below.

CREATE DATABASE <database name>;
USE <database name>;
CREATE TABLE <table name>
(variable1 DATATYPE ,
variable2 DATATYPE ,
variable3 DATATYPE ,
. ,
. ,
. ,
variablen DATATYPE
)
PARTITIONED BY (Year INT, Month INT )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
LOAD DATA INPATH 'hdfs://hadoop_sahara_local/user/rkkrishnaa/database/files.csv'

Similarly, we can aggregate the data from the multiple tables with ‘OVERWRITE INTO TABLE’ statement.

Hive supports partitioning table to improve the query performance by distributing the execution load horizontally. We prefer to partition the column which stores year and month. Sometimes, partitioned table creates more tasks in a MapReduce job.

Load

We checked about the transform phase, now it’s time to load the transformed data in to a data warehouse dirctory in HDFS, which is the final state of the data. Here we can apply our SQL queries to get an appropriate results.

All DML commands can be used to analyse the warehouse data based on the use case.

Results can be download as a .csv, table or chart for analysis. It can be integrated with other popular BI tools such as Talend OpenStudio, Tabelau, etc.

Automate

Now we’re going to automate the ETL job using the Oozie workflow engine. If you have experience with Mistral, you can use that. Most Hadoop users are acquainted with Apache Oozie, so we’re going to use that as an example here.

Step 1: Create a job.properties file

nameNode=hdfs://hadoop_sahara_local:8020
jobTracker=<rm-host>:8050
queueName=default
examplesRoot=examples
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}
oozie.libpath=/user/root

Step 2: Create a workflow.xml file to define tasks for extracting data from MySQL data store to HDFS

<workflow-app xmlns=”uri:oozie:workflow:0.2″ name=”my-etl-job”>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
 
<start to=”extract”/>
<action name=”extract”>
<sqoop xmlns=”uri:oozie:sqoop-action:0.2″>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import –connect jdbc:mysql://<database host>:3306/<database name> –username <database user> –password <database password> –table <table name> –driver com.mysql.jdbc.Driver –target-dir hdfs://hadoop_sahara_local/user/rkkrishnaa/database${date} –m 3 </command>
</sqoop>
 
<ok to=”transform”/>
<error to=”fail”/>
 
<action name=”transform”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>/user/rkkrishnaa/oozie/hive-site.xml</value>
</property>
</configuration>
<script>transform.q</script>
</hive>
 
<ok to=”load”/>
<error to=”fail”/>
 
<action name=”load”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>/user/rkkrishnaa/oozie/hive-site.xml</value>
</property>
</configuration>
<script>load.q</script>
</hive>
 
<ok to=”end”/>
<error to=”fail”/>
</action>
<kill name=”fail”>
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
 
<end name=”end”/>
</workflow-app>

Step 3: Create a file transform.q

LOAD DATA INPATH 'hdfs://hadoop_sahara_local/user/rkkrishnaa/database${date}/files.csv'

Step 4: Create a file load.q

SELECT name, age, department from sales_department where experience > 5;

The above workflow job will do ETL operation. We can customize the workflow job based on our use case, we have lot of mechanisms to handle task failure in our workflow job. Email action helps to track the status of the workflow task. This ETL job can be executed on daily basis or weekly basis at any time. Usually organizations put these kind of long running tasks during weekends or at night time.

For more about the workflow: https://oozie.apache.org/docs/3.3.1/index.html

Conclusion

OpenStack has integrated a very large Hadoop ecosystem and many cloud providers offer Hadoop service in just a few clicks on their cloud management portals. Sahara supports most of the Hadoop vendor plugins Execute your ETL workflow with Openstack Sahara.

Enjoy learning!

References:
[1] https://www.openstack.org/software/sample-configs#big-data
[2] https://docs.openstack.org/sahara/latest/install/index.html
[3] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML
[4] https://oozie.apache.org/docs/3.3.1/index.html

 

This article first appeared on the Hortonworks Community Connection. Superuser is always interested in community content, get in touch at editorATopenstack.org

Cover Photo // CC BY NC