Archive for the ‘SAP Data Services’ Category

Connecting SAP DataServcies to Hadoop: HDFS vs. Hive

SAP DataServices (DS) supports two ways of accessing data on a Hadoop cluster:

  1. HDFS:
    DS reads directly HDFS files from Hadoop. In DataServices you need to create HDFS file formats in order to use this setup. Depending on your dataflow DataServices might read the HDFS file directly into the DS engine and then handle the further processing in its own engine.If your dataflow contains more logic that could be pushed down to Hadoop, DS may as well generate a Pig script. The Pig script will then not just read the HDFS file but also handle other transformations, aggregations etc. from your dataflow.The latter scenario is usually a preferred setup for large amount of data because this way the Hadoop cluster can provide processing power of many Hadoop nodes on inexpensive commodity hardware. The pushdown of dataflow logic to Pig/Hadoop is similar to the pushdown to relational database systems. It depends on whether the dataflow uses functions that could be processed in the underlying systems (and of course whether DS knows about all the capabilities in the underlying system). In general, the most common functions, joins and aggregations in DS should be eligible to be pushed down to Hadoop.
  2. Hive / HCatalog:
    DS reads data from Hive. Hive accesses data that is defined in HCatalog tables. In DataServices you need to setup a datastore that connects to a Hive adapter. The Hive adapter will in turn read tables from Hadoop by accessing a hive server. DS will generate HiveQL commands. HiveQL is similar to SQL. Therefore the pushdown of dataflow logic to Hive works similar as the pushdown to relational database systems.

It is difficult to say which approach better to use in DataServices: HDFS files/Pig or Hive? Both, Pig and Hive generate MapReduce jobs in Hadoop and therefore performance should be similar. Nevertheless, some aspects can be considered before deciding which way to go. I have tried to describe these aspects in the table below. They are probably not complete and they overlap, so in many cases they will not identify a clear favorite. But still the table below may give some guidance in the decision process.

Subject HDFS / Pig Hive
Setup connectivity Simple via HDFS file formats Not simple. Hive adapter and Hive datastore need to be setup. The CLASSPATH setting for the hive adapter is not trivial to determine. Different setup scenarios for DS 4.1 and 4.2.2 or later releases (see also Connecting SAP DataServices to Hadoop Hive)
Overall purpose
  • Native HDFS access is only advisable if all the data in Hadoop necessarily need to be processed within DataServices or if the data volume is not too large.
  • Pig covers more a data flow (in a general sense, not to confuse with DataServices dataflows). A pig script is more like a script that processes various transformation steps and writes the results into a target table/file. Therefore a pig script might suit well for DataServices jobs that read, transform and write back data from/to Hadoop.
Hive are queries mainly intended for DWH-like queries. They might suit well for DataServices jobs that need to join and aggregate large data in Hadoop and write the results into pre-aggregated tables in a DWH or some other datastores accessible for BI tools.
Runtime performance
  1. Loading flat files from HDFS without any further processing: faster than Hive.
  2. Mapping, joining or aggregating large amount of data (presumed logic gets pushed down to Pig): performance is determined via MapReduce jobs in Hadoop – therefore similar to Hive.
  3. Mapping, joining or aggregating small amount of data: the processing might even run faster in the DS engine. Therefore it might be an option to force DS not to push down the logic to Pig and thus just read the HDFS file natively.
  1. Loading all data of a table without processing / aggregation: slower than native HDFS, because of unnecessary setup of MapReduce jobs in Hadoop.
  2. Mapping, joining or aggregating large amount of data (presumed logic gets pushed down to Hive): performance is determined via MapReduce jobs in Hadoop . therefore similar to Pig.
  3. Mapping, joining or aggregating small amount of data: there is no way to bypass Hive/HiveQL so there will always be some MapReduce jobs initiated by Hadoop, even if the data amount is small. The overhead of initiating these MapReduce jobs usually takes some time. It may overweight the performance of the data processing itself if the data amount is small.
Development aspects
  • HDFS File format need to be defined manually in order to define the data structure in the file
  • On the other hand the HDFS file format can easily be generated from the Schema Out view in a DataServices transform (in the same way as for local file formats)
  • No data preview available
  • HCatalog tables can be imported like database tables. The table structure is already predefine by HCatalog The HCatalog table might already exist, otherwise it still needed to be specified in Hadoop.
  • Template tables do not work with Hive datastores
  • From DS version 4.2.3 and later data can be pre-viewed in DS Designer
Data structure In general, HDFS file formats will suit better for unstructured or semi-structured data. There is little benefit from accessing unstructured data via HiveQL, because Hive will first save the results of a HiveQL query into a directory on the local file system of the DS engine. DS will then read the results from that file for further processing. Instead, reading the data directly via a HDFS file might be quicker.
Future technologies: Stinger, Impala, Tez
etc
.
Some data access technologies will improve the performance of HiveQL or Pig Latin significantly (some already do so today). Most of them will support HiveQL whereas some of them will support both, HiveQL and Pig. The decision between Hive and Pig might also depend on the (future) data access engines in the given Hadoop environment.

Connecting SAP DataServices to Hadoop Hive

Connecting SAP DataServices to Hadoop Hive is not as simple as connecting to a relational database for example. In this post I want to share my experiences on how to connect DataServices (DS) to Hive.

The DS engine cannot connect to Hive directly. Instead you need to configure a Hive adapter from the DS management console which will actually manage the connection to Hive.

In the rest of this post I will assume the following setup:

  • DS is not installed on a node in the Hadoop cluster, but has access to the Hadoop cluster. The DS server should run on a Unix server. I think that such a setup is a typical in most cases.
  • The Hadoop cluster consists of a Hortonworks Data Platform (HDP) 2.x distribution. Other distributions should work similarly, though.

1. Configure the DS server

The DS server will not be installed within the Hadoop cluster, but it will have access to it. Therefore Hadoop need to be installed and configured appropriately on the DS server. I wont’ go to much into detail for this step, because the given environment may be quite different from my tested environment.

Roughly, there are two approaches for installing Hadoop on the DS server:

  1. Manual installation:
    you may follow the instructions on Configuring Data Services on a machine not in your Hadoop cluster on SCN, sections Hadoop and Hive. I have never tried this approach, though.
  2. Installation via Ambari (preferred):
    this approach will initiate a Hadoop installation on the DS server from the Ambari system of the existing Hadoop cluster. The installation process is quite automated and will integrate the DS server as a kind of Hadoop client into the Ambari monitoring system of the cluster.
    In Ambari you need to select the Add Host option. This will start the Add Host Wizard:

    Ambari - Add host wizard

    Ambari – Add host wizard

    Before you can start the wizard you need to enable a passwordless login from the Ambari server to the DS server using an SSH private key.

    Later in the wizard process, you need to configure the DS server as a client host; this way all required Hadoop libraries, Hadoop client tools and the Ambari agent will be installed and configured on the DS server. Ambari will also monitor the availability of the DS server.

2.  Configure DS environment and test Hive

The DS jobserver needs some Hadoop environment settings. These settings mainly specify the Hadoop and Hive home directories on the DS server and some required Hadoop JAR files through CLASSPATH settings.

DS provides a script that sources these environments, please check DS Reference Guide 4.2, section 12.1  Prerequisites.

Important: for Hortonworks HDP distributions, DS provides another script than the documented script hadoop_env.sh. For HDP 2.x distributions you should use the script hadoop_HDP2_env.sh instead (this script is only available from DS version 4.2.2 and later)

On the DS server you should be able to start hive and test the connection. For instance by running the HiveQL command show databases:

Test hice connection

Test hive connection

Finally, restart the DS jobserver so that it has the same environment settings as your current session. Also make sure that the hadoop_env.sh script will be started during the DS server startup process.

3. Configure the DS Hive adapter and datastore

You may check the following documentation to set up the DS Hive adapter:

In my experience these two subjects usually will not work without problem. The tricky part here is to set CLASSPATH correctly for the adapter. This task is not well documented and depends on the Hadoop distribution and version. Therefore you might end in a series of try-and-error configurations:

  1. Configure the Hive adapter. Modify the CLASSPATH setting so that the adapter knows the location of all required Java objects. (Re-)start the adapter.
  2. Setup the Hive datastore in DS designer. This will also test the Hive adapter. Check the error message for missing Java objects. Return to step 1.

These steps are described in more detail in the following sections. The Troubleshooting section in the blog  Configuring Data Services and Hadoop on SCN may also help.

3.1 Setup of the Hive adapter

For the initial setup of the Hive adapter I used the CLASSPATH setting as described in Configuring the Data Services Hive Adapter on SCN. For instance, the initial configuration of the Hive adapter looked like this in my test environment:

Initial setup of hive adapater

Initial setup of the Hive adapter

3.2 Setup of DS datastore

In the DS designer I created a Hive datastore. The first error message I got when saving the datastore was:

Ceate Hive datastore - error message

Create Hive datastore – error message

The same error message should be visible in the Hive adapter error log:

Hive adapter - errror log

Hive adapter – error log

The error message means that the Hive adapter tries to use the Java object org.apache.hadoop.hive.jdbc.HiveDriver but cannot find it. You will need to find the corresponding JAR file that contains this Java object and add the full path of this JAR file to the Hive adapter; then return to step 3.1

There will probably be more than one Java object missing in the initial CLASSPATH setting. Therefore you may very well end up in an iterative process of configuring and re-starting the adapter and testing the adapter by saving the datastore in DS designer.

How do I find the JAR file that contains the missing Java object?

In most cases all required Java objects are in JAR files that are already provided by either the DS installation or the Hadoop installation. They are usually located in one of these directories:

  • $HADOOP_HOME
  • $HADOOP_HOME/lib
  • $HIVE_HOME/lib

I have developed a small shell script that will search for a Java object in all JAR files in a given directory:

[ds@dsserver ~]$ cat find_object_in_jars.sh
#!/bin/sh

jar_dir=$1
object=$2

echo "Object $object found in these jars:"
for jar_file in $jar_dir/*.jar
do
  object_found=`jar tf $jar_file | grep $object`
  if [ -n "$object_found" ]
  then
    echo $jar_file
  fi
done

exit 0

For example, the script above found the object org.apache.hadoop.hive.jdbc.HiveDriver in the file  $HIVE_HOME//lib/hive-jdbc-0.12.0.2.0.10.0-1.jar. The full path of this file need to be added to the CLASSPATH setting of the Hive adapter:

Finding objects in JAR files

Finding objects in JAR files

Note: the script above is using the jar utility. You need to install a Java development package (such as Java Open SDK) if the jar utility is not available on your DS server.

4. DS 4.2.1 and hive server

DS need to connect to a hive server. The hive sever actually splits the HiveQL commands into MapReduce jobs, accesses the data on the cluster and returns the results to the caller.

DS 4.2.2 and later versions are using the current hive server called HiveServer2 to connect to Hive. This version of the hive server is the default for most Hadoop clusters.

The older version of hive server, simply called HiveServer, is usually not started per default  on current versions of Hadoop clusters. But DS version 4.2.1 only works with the old hive server version.

4.1 Starting the old hive server version

The old version of hive server can easily be started from the hive client tool, see HiveServer documentation.

The default port number for HiveServer is 10000. But because HiveServer2 might already be  running and listening on this port you should define another port for HiveServer, let’s say 10001:

Starting HiveSever

Starting HiveServer

You can start the HiveServer on a node in the Hadoop cluster or on the DS server. I recommend to start it on the same node where HiveServer2 is already running – this is usually a management node within the Hadoop cluster.

It is also worth to test the HiveServer connection from the hive command line tool on the DS server. When calling the hive CLI tool without parameters it does not act as a client tool and does not connect to a hive server (instead it then acts as a kind of rich client). When calling the hive CLI tool with the host and port number parameters it will act as a client tool and connect to the hive server. For instance:

Testing HiveServer connection

Testing a HiveServer connection

5. Upgrading Hadoop

After upgrading either DS or Hadoop you might need to reconfigure the Hive connection.

5.1 Upgrading in DS

Upgrading from DS 4.2.1 to 4.2.2 requires a switch from HiveServer to HiveServer2, see section 4 above.

Furthermore, newer releases of DS might use other Java objects. Then the CLASSPATH of the Hive adapter need to be adapted as described in section 3.

5.2 Upgrading Hadoop

After upgrading Hadoop most often the names of jar files have changed because their names contain the version number. For example:

hive*.jar files in $HIVE_HOME/lib

hive*.jar files in $HIVE_HOME/lib

For instance, when upgrading HDP from version 2.0 to 2.1 the HDP upgrade process replaced some jar files with newer versions. The CLASSPATH setting of the Hive adapter need to be modified accordingly. I found this approach quite easy:

  • Open the Hive adapter configuration page in the DS management console
  • Save the configuration again. You will get an error message if the CLASSPATH contains a file reference that DS cannot resolve. For instance:
    Hive adapter - saving an invalid CLASSPATH

    Hive adapter: saving an invalid CLASSPATH

    Search for the new file name in the relevant file system directory and modify the CLASSPATH of the Hive adapter accordingly. Save again…

Note: the HDP installation and upgrade process maintains symbolic links without version number in their names. The symbolic links point to the latest version of the jar file. It is of course a good idea to reference these symbolic links in the CLASSPATH setting of the Hive adapter. But unfortunately not all of the required jar files do have symbolic links.

SAP DataServices Text Analysis and Hadoop – the Details

I have already used the text analysis feature within SAP DataServices in various projects (the transform in DataServices is called Text Data Processing or TDP in short). Usually, the TDP transform runs in the DataServices engine, means that DataServices first loads the source text in its own memory  and then runs the text analysis on its own server / engines.

The text sources are usually unstructured text or binary files such as Word, Excel, PDF files etc. If these files reside on a Hadoop cluster as HDFS files, DataServices can also push down the TDP transform as a MapReduce job to the Hadoop cluster.

Why running DataServices Text Analysis within Hadoop ?

Running the text analysis within Hadoop (means as MapReduce jobs) can be an appealing approach if the total volume of source files is big and at the same time the Hadoop cluster has enough resources. Then the text analysis might run much quicker inside Hadoop than within DataServices.

The text analysis extracts entities from unstructured text and as such it will transform unstructured data into structured data. This is a fundamental pre-requisite in order to be able to run any kind of analysis with the data in the text sources. You only need the text sources as input for the text analysis. Afterwards you could theoretically delete the text sources, because they are no longer needed for the analysis process. But in reality you still want to keep the text sources for various reasons:

  • Quality assurance of the text analysis process: doing manual spot checks by reviewing the text source.
    Is the generated sentiment entity correct? Is the generated organisation entity Apple correct or does the text rather refer to the fruit apple ? …
  • As an outcome of the QA you might want to improve the text analysis process and therefore rerun the text analysis with documents that already had been analyzed previously.
  • You might want to run other kinds of text analysis with the same documents. Let’s say you did a sentiment analysis on text documents until now. But in future you might have a different use case and you want to analyze customer requests with the same documents.

Anyway, in all these scenarios the text documents are only used as a source. Any BI solution will rely on the structured results of the text analysis. But it will not need to lookup any more information from these documents. Keeping large volumes of these documents in a Hadoop cluster can therefore be a cost-effective solution. If at the same time the text analysis process can run in parallel on multiple nodes on that cluster the overall performance might be improved – again on comparative cheap hardware.

I was therefore curious how the push-down of the DataServices TDP transform to Hadoop works technically. Unfortunately my test cluster is too small to test the performance. Instead my test results focus on technical functionality only. Below are my experiences and test results:

My Infrastructure

For my tests I am using a Hortonworks Data Platform HDP 2.1 installation on just one cluster. All HDP services are running on this node. The cluster is installed on a virtual machine with CentOS 6.5, 8 GB memory and 4 cores.

DataServices 4.2.3 is installed on another virtual machine based on CentOS 6.5, 8 GB memory and 4 cores.

My Test Cases

I have been running 3 test cases:

  1. Text analysis on 1000 Twitter tweets in one CSV file:
    The tweets are a small sample from a bigger file extracted from Twitter on September 9 using a DataSift stream. The tweets in the stream were filtered, they all mention Apple products like iPhone, iPad,AppleWatch and so on.

    Dataflow TA_HDP_APPLE_TWWET

    Dataflow TA_HDP_APPLE_TWWET

  2. 3126 binary MSG files. These files contain emails that I had sent out in 2013:

    Dataflow TA_HDP_EMAILS

    Dataflow TA_HDP_EMAILS

  3. 3120 raw text files. These files contain emails that I had sent out in 2014:

    Dataflow DF_TA_HDP_EMIALS_TEXT

    Dataflow DF_TA_HDP_EMIALS_TEXT

I have been running each of these test cases in two variants:

  • Running the TDP transform within the DS engine. In order to achieve this, the source files were saved on a local filesystem
  • Running the TDP transform within Hadoop. In order to achieve this, the source files were saved on the Hadoop cluster.

The TDP transform of both variants of the same test case is basically the same. This means they are copies of each other with exactly the same configuration options. For all the test cases I have configured standard entity extraction in addition to a customer provided dictionary which defines the names of Apple products. Furthermore, I specified sentiment and request analysis for English and German languages:

TDP configuration options

TDP configuration options

Whether a TDP transform runs in the DS engine or is pushed down to Hadoop depends solely on the type of source documents: on a local file system or in HDFS.

Finally, I wanted to compare the results between both variants. Ideally, there should be no differences.

DataServices vs. Hadoop: comparing the results

Surprisingly, there are significant differences between the two variants. When the TDP transform runs in the DS engine the transform produces much more output than when running the same transform inside Hadoop as a MapReduce job. More precise, for some source files the MapReduce job did not generate any output at all, while the DS engine generates output for the same documents:

Comparing number of documents and entities between the two variants

Comparing number of documents and entities between the two variants

For  example, the DS engine produced 4647 entities from 991 documents. This makes sense, because 9 tweets do not contain any meaningful content at all, so that the text analysis does not generate any entities for these documents. But the MapReduce job generated 2288 entities from only 511 documents. This does no longer make sense. The question is what happened with the other 502 documents?

The picture is similar for the other two test cases analyzing email files. So, the problem is not related to the format of the source files. I tried to narrow down this problem within Hadoop – please see the section Problem Tracking –> Missing entities at the end of this blog for more details.

On the other hand, for those documents when both, the DS engine and the MapReduce job generated entities, the results between the two variants are nearly identical. For example, these are the results of both variants for one example tweet.

TDP output for one tweet - TDP running in DS

TDP output for one tweet – TDP running in DS

TDP output for one tweet - TDP running in Hadoop

TDP output for one tweet – TDP running in Hadoop

The minor differences are highlighted in the Hadoop version. They can actually be ignored, because they won’t have any impact on a BI solution processing this data. Please note that the entity type APPLE_PRODUCT has been derived from the custom dictionary that I provided, whereas the entity type PRODUCT has been derived from the system provided dictionary.

We can also see that the distribution of entity types is nearly identical between the two variants, just the number of entities is different. The chart below shows the distribution of entity types across all three test cases:

Distribution of entity types

Distribution of entity types

Preparing Hadoop for TDP

It is necessary to read the relevant section in the SAP DataServices reference manual. Section 12.1.2 Configuring Hadoop for text data processing describes all the necessary prerequisites. The most important topics here are:

Transferring TDP libraries to the Hadoop cluster

The $LINK_DIR/hadoop/bin/hadoop_env.sh -c script will simply transfer some required libraries and the complete $LINK_DIR/TextAnalysis/languages directory from the DS server to the Hadoop cluster. The target location in the Hadoop cluster is set in the environment variable $HDFS_LIB_DIR (this variable will be set in the same script when called with the -e option).You need to provide this directory in the Hadoop cluster with appropriate permissions. I recommend that the directory in the Hadoop cluster is owned by the same login under which that the Data Services engines are running (in my environment this is the user ds).
After $LINK_DIR/hadoop/bin/hadoop_env.sh -c has been executed successfully the directory in HDFS will look something like this:

$HDFS_LIB_DIR in HDSF

Note that the $LINK_DIR/TextAnalysis/languages directory will be compressed in HDFS.
Important: If you are using the Hortonworks Data Platform (HDP) distribution version 2.x you have to use another script instead: hadoop_HDP2_env.sh. hadoop_env.sh does not transfer all the required libraries for HDP 2.x installations.

Optimizing text data processing for use in the Hadoop framework

The description in the SAP DataServices reference manual, section 12.1.2 Configuring Hadoop only works for older Hadoop distributions as the referred Hadoop parameters are deprecated. For HDP 2.x I had to use different parameters. See the section Memory Configuration in YARN and Hadoop in this blog below for more details.

HDFS source file formats

Reading the SAP DataServices reference manual word-for-word, it says that only unstructured text files work for the push-down to Hadoop and furthermore that the source file must be connected (directly?) to the TDP transform:

Extract from DataServices Reference Guide: sources for TDP

Extract from DataServices Reference Guide: sources for TDP

My tests – fortunately – showed that the TDP MapReduce job can handle more formats:

  • HDFS Unstructured Text
  • HDFS Unstructured Binary
    I just tested MSG and PDF files, but I assume that the same binary formats as documented in the SAP DataServices 4.2 Designer Guide – see section 6.10 Unstructured file formats will work. The TDP transform is able to generate some metadata for unstructured binary files (the TDP option Document Properties) : this feature does not work when the TDP transform is pushed down to Hadoop. The job still analyzes the text in the binary file. But the MapReduce job will temporary stage the entity records for the document metadata. When reading this staging file and passing the records back to DataServices file format, warnings will be thrown. Thus you will still receive all generated text entities but not those for the document metadata.
  • HDFS CSV files:
    I was also able to place a query transform between the HDFS source format and the TDP transform and specify some filters, mappings and so in the query transform. All these settings still got pushed down to Hadoop via a Pig script. This is a useful feature, the push-down seems to work very similar to the push-down to relational databases: as long as DataServices knows about corresponding functions in the source system (here HDFS and Pig Latin language) it pushes as much as possible to the underlying system instead of processing the functions in its own engine.

TDP pushed down to Hadoop

It is very useful to understand roughly how the TDP push down to Hadoop works, because it will ease performance and problem tracking. Once all perquisites for the push-down are met (see previous section Preparing Hadoop for TDP –> HDFS source file formats) DataServices will generate a Pig script which in turn starts the TDP MapReduce job on the Hadoop cluster. The generated Pig script will be logged in the DataServices trace log file:

DS trace log with pig script

DS trace log with pig script

The Pig script runs on the DataServices server. But the commands in the Pig script will be executed against the Hadoop cluster. This means – obviously – that the Hadoop client libraries and tools must be installed on the DataServices server. The Hadoop client does not necessarily need to be part of the Hadoop cluster, but it must have access to the Hadoop cluster. More details on the various installation options are provided in the SAP DataServices reference manual or on SCN – Configuring Data Services and Hadoop.

In my tests I have noticed two different sorts of Pig scripts that can be generated, depending on the source file formats.

  1. CSV files
    The Pig scripts for analyzing text fields in CSF files usually looks similar to this:Obviously,DataServices provides its own Pig functions for loading and storing data. They are also used for staging temporary results within the Pig script. During problem tracking it might be useful to check these files. All required files
    and information are located in one local directory on theDataServices server:

    Pig script for CSV files

    Pig script for CSV files

    Given all these information, you might also run performance tests or problem analysis by executing or modifying the Pig script directly on the DataServices server.

    Pig directory

    Pig directory

  2. Unstructured binary or text files
    The Pig script for unstructured binary or text files looks a bit different. This is obviously becausetheMapReduce jar file providedbyDataServices can read these files directly. In contrast, in case of CSV files, some additional pre-processing need to happen in order to extract the text fields from the CSV file.In case of unstructured binary or text files the Pig script calls another Pig script which in turn runs a Unix shell command (I don’t know what the purpose of these two wrapper Pig scripts is, though?). The Unix shell command simply runs theMapReduce job within theHadoop framework:

    Pig script for CSV files

    Pig script for unstructured files

What if DS does not push down the TDP transform to Hadoop?

Unstructured binary or text HDFS files:
If unstructured binary or text HDFS files are directly connected to the TDP transform, the transform should get pushed down to Hadoop. In most cases it does not make sense to place other transforms between the HDFS source file format and the TDP transform. If you do have one or more query transforms after the HDFS source file format, you should only use functions in the query transforms that DataServices is able to push down to Pig. As long as you are using common standard functions such as substring() and so on, the chances are hight that DataServices will push them down to Pig. In most cases you will anyway have to find out on your own which functions are candidates for a push-down because they are not documented in the SAP manuals.

CSV files:
Basically the same rules apply as for unstructured binary or text files. In addition there are potential pitfalls with the settings in the CSV file format:
keep in mind that DataServices provides its own functions for reading HDFS CSV files (see previous section TDP pushed down to Hadoop). We don’t know about the implemented features of the DSLoad Pig function and they are not documented in the SAP manuals. It may be that some of the settings in the CSV file format are not supported by the DSLoad function. In this case DataServices cannot push down the TDP transform to Hadoop, because it first need to read the complete CSV file from HDFS into its own engine. It will then process the specific CSV settings locally and also process the TDP transform locally in its own engine.

A typical example for this behaviour are the row skipping options in the CSV file format. They are apparently not supported by the DSLoad function. If you set these options, DataServices will not push down the TDP transform to Hadoop:

HDFS file format options for CSV

HDFS file format options for CSV

Memory Configuration in YARN and Hadoop

I managed to run a MapReduce job using standard entity extraction and english sentiment analysis using the default memory settings for YARN and MapReduce (means the settings initially provided when I setup the cluster). But when using the German voice-of-customer libraries (for sentiment or request analysis) I had to increase some memory settings. According to SAP the german language is more complex so that these libraries require much more memory. Please note that the memory required for the MapReduce jobs depends on these libraries and not on the size of the source text files that will be analyzed.

If you do not configure enough memory the TDP MapReduce job will fail. Such kind of problems cannot be tracked using the DataServices errorlogs. See the section Problem Tracking in this blog below.

I followed the descriptions in the Hortonworks manuals about YARN and MapReduce memory configurations. This way I had overridden the following default settings in my cluster (just as an example!):

Overriden YARN configurations

Overriden YARN configurations

Overriden MR2 configurations

Overriden MR2 configurations

Using these configurations I managed to run the TDP transform with German VOC libraries as MapReduce jobs.

Problem Tracking

Problems while running a TDP transform as a MapReduce job are usually more difficult to track. The errorlog of the DataServices job will simply contain the errorlog provided by the generated Pig script. It might already point you to the source of the problem. On the other hand, other kind of problems might not be listed here. Instead you need to review the relevant Hadoop or MapReduce log files within Hadoop.

For example, in my case I identified a memory issue within MapReduce like this:

DataServices errorolog

If a TDP MapReduce jobs fails due to insufficient memory the DataServices errorlog might look similar to this:

DS errorlog with failed TDP MapReduce job

DS errorlog with failed TDP MapReduce job

In this DS errorlog excerpt you just see that one MapReduce job from the generated Pig script failed, but you don’t see any more details about the failure. Also, the other referred log files pig_*.err and hdfsRead.err do not provide any more details. In such cases you ned to review the errorlog of the MapReduce job in Hadoop to get more details.

MapReduce errorlogs

In general it is helpful to roughly understand how logging works inside Hadoop. In this specific case of YARN and/or MapReduce I found the chapter Log files in Hortonworks’ YARN and MR2 documentation useful (especially section MapReduce V2 Container Log Files).

It is also good to know about YARN log aggregation, see  HDP configuration files for more information. Depending on the setting of the yarn.log-aggregation-enable parameter the most recent log files are either still on the local file system of the NameNode or they are compressed within the HDFS cluster.

In anyway, you can start the error log review with tools such as Hue or the JobHsitory server provided with Hadoop. In my example I used the Job browser view within Hue. Clicking on the TDP MapReduce job lists various failed attempts of the job to obtain a so-called memory container from YARN:

Failed task attempts

Failed task attempts

The log file of one of these attempts will show the reason for failure, for instance low memory:

Error log for low memory

Error log for low memory

In this case the MapReduce job couldn’t start at all and an error will be returned to the DataServices engine which started the Pig script. So in this case the DataServices job will also get a failed status.

Other type of errors to watch out

Once a task attempt succeeds it does not necessarily mean that the task itself will succeed. It means that a task has successfully started. Any errors during task execution will then be logged in the task log file. In case of TDP, a task on one node will sequentially analyze all the text source files. If the task fails when analyzing one of the source documents the error will be logged but the task will go ahead with the next source document. This is a typical fault tolerant behaviour of MapReduce jobs in Hadoop.

Important: although that errors with individual documents or records get logged in the log file of the MapReduce TDP task, the overall result of the MapReduce job will be SUCCEDED and no error or even warnings get returned to DataServices. The DataServices job will get a successful status in such situations.

It is therefore important to monitor and analyze the MapReduce log files in addition to the DataServices error logs!

During my tests I found some other typical errors in the MapReduce log files that hadn’t been visible in the DataServices errorlog. For example:

  • Missing language libraries:
    MapReduce error log: missing language libraries

    MapReduce error log: missing language libraries

    I configured automatic language detection in the TDP transform, but forgot to install all the available TDP languages during DS installation. After installing all the languages and re-executing $LINK_DIR/hadoop/bin/hadoop_env.sh -c these errors disappeared.
    (BTW: when running the same job with the TPD transform in the DS engine instead of having it pushed-down to Hadoop DataServices did not print any warnings in the errorlog, so I probably would have never caught this issue)

  • Non-ASCII characters in text sources:
    In the case of the Twitter test case some tweets may contain special characters such as the © or signs. The TDP transform (or MapReduce job) cannot interpret such characters as raw text and then aborts the analysis of this document.
    If the TDP runs within the DS engine these errors will be printed as warnings on the DS errorlog.
    If the TDP runs as MapReduce job there will be no warnings in the DS errorlog, but the errorlog of the MapReduce will contain these errors.
  • Missing entities:
    As described above I found that the TDP transform running as a MapReduce job generates much less entities than when running the same TDP transform (with the same input files) within the DS engine.
    To narrow down the problem I checked the stdout log file of theMapReduce job. For each text source file (or for each record in a CSV file) it prints a message like this if the analysis succeeds:

    Map Reduce stdout: analyzing text documents

    Map Reduce stdout: analyzing text documents

    In order to understand how many text sources had been successfully analyzed I grepped the stdout file:

    MapReduce stdout: check number of successfully analyzed documents.

    MapReduce stdout: check number of successfully analyzed documents.

    Well, in this particular case it actually didn’t help me to solve the problem: there had been 3126 text files as source for the TDP transform. According to stdout all of them had been successfully analyzed by the TDP job. Nevertheless, text entities had been generated for only 2159 documents. This would actually mean that 967 documents are empty or have no meaningful content so that the text analysis does not generate text entities. But because the same TDP transform when running in the DS engine generates entities for 2662 documents, there seem to be something wrong in the MapReduce job of the TDP transform!

    Having a closer look at how the generated entities distribute across various metadata I recognized that entities were missing only for documents for which the auto language detection mechanism has determined English language:

    Entities per language

    Entities per language

    With these findings I manage to narrow down the problem to the language settings in the TDP transform. If I change settings from Auto detection (and English as default language) to English as fix language both jobs (DS engine and Hadoop MapReduce – will produce the exact same number of entities.

    I have passed this issue to the SAP – hopefully there will be fix available soon.

SAP Data Services: SCD 2 with bulk loading in Sybase IQ / SAP Hana

In a POC last year I implemented a Slowly Changing Dimension (type 2, SCD 2) load into a Sybase IQ data warehouse (see SCD for more information on slowly changing dimensions).

I needed to implement a special workaround due to the fact that Data Services cannot use bulk loads in such a scenario because in SCD 2 the previous versions of changed dimension records need to be updated (the columns IsValid and ValidTo need to be updated.)

Bulk loading is a MUST in IQ if you are loading large amounts of data (let’s say everything above 100.000 records). If you are not using the bulk load option for target tables Data Services will send a single INSERT SQL statement for each record to IQ. Columnar databases like IQ are not designed for such kind of transactional processing and performance will decrease dramatically (I actually had the impression that the system hangs when I implemented the first version of my load without the bulk load option). With respect to performance, bulk loads are simply the only feasible way to load large amounts of data into IQ.

Likewise, you should not update thousands or millions of records by issuing onr UPDATE SQL statement per record within a loop. Instead, in columnar databases you need to update the whole data set with one batch update statement.

Until now I did not have the chance to implement a similar scenario with Hana. But because Hana is columnar database too (and in BI environments the column store will be used anyway) my findings and rest of this article will apply to Hana in the same way as to IQ.

Typical dataflow for SCD 2 loads

In Data Services a typical dataflow implementing a SCD 2 load looks like this:

Typical dataflow implementing SCD 2

Figure 1: Typical dataflow implementing SCD 2

I am assuming here that we do not get any change-capture data from the source system, so a table comparison in Data Services will do the job to identify changes in the dimensions compared to the previous load. The History Preserving transform implements the actual logic behind SCD 2 and will output records with INSERT opcodes for new dimensions and new versions of changed dimensions. The old versions of changed dimensions will be output as records with an UPDATE opcode. The key generation transform will then generate new surrogate keys for all new dimensions with an INSERT opcode.

Because at the end of the data flow there will be records with INSERT and UPDATE opcodes you cannot use the bulk load option in the target table (bulk load would simply insert all records – even the ones which are intended to get updated – and thus violate referential integrity). On the other hand, performance is simply not acceptable if not using the bulk load option.

Workaround – Data Services 4.0 or previous versions

The workaround for Data Services 4.0 or lower is illustrated in figure 2:

Dataflow implementing SCD 2 with bulk load

Figure 2: Dataflow implementing SCD 2 with bulk load

After the History preserve transform the records are split into different branches: the records with INSERT opcodes are filtered and routed to the key generation transform and the final target table. Here we can finally use the bulk load option.

The records with UPDATE opcodes get filtered and mapped into NORMAL opcodes.:

Map Tranform for old versions of changed dimensions

Figure 3: Map Transform for old versions of changed dimensions

The following query transform filters the surrogate key of the records. This is the primary key of the table and need be used later to update the IsValid and ValidTo columns:

Figure 4: Filter surrogate key column of old versions of changed dimension records

The surrogate keys of the old versions of changed dimensions then get inserted into a working table: TMP_Outdated_Customers. Here again, we are using the bulk load option (with truncate before load).

At this stage we managed to load the new dimensions and new versions of changed dimensions into the DWH using the bulk load. We also loaded the surrogate (primary) keys of old versions of changed dimensions using bulk load.

Finally, we only need to update the IsValid and ValidTo columns in the target dimension table using the primary keys in the working table TMP_Outdated_Customers. This can be done with one single UPDATE SQL statement. This is implemented in a Data Services script directly after the dataflow (not illustrated in Figure 2). For instance:

# Arne Weitzel, IT-Logix AG, 2011
#
# Update customers that had been identified as no longer valid as part of a previous history preserving transform
# This is to avoid single-record updates as part of the standard processing of the history preserving
# Instead, outdated records had been temporary loaded into TMP_Outdated_Customers

print ('Updating outdated customer records after history preserving...');
sql('iq_server_db_1', '
  update DIM_Customers
     set IsValid = 0,
         ValidTo = dateadd(dd, -1, convert(date, {$G_LOAD_DATE_CHAR}, 104))
    from TMP_Outdated_Customers oc
   where DIM_Customers.CustomerSK = oc.CustomerSK');
print ('Update finsished');

Please note that we are using a special SQL feature in Sybase IQ which is called an update join: joining two or more tables and using the result set to update a table. In HANA you can use the UPSERT statement instead.

In IQ such a batch update statement is quite fast: in our test environment we were using IQ on a 4 CPU Windows server with 8 GB memory. With the update statement above 10 million dimension records got updated within 7 seconds. The good performance can be achieved here because we are only updating two columns. If we needed to update more columns in a columnar database the performance would decrease according!

Solution in Data Services 4.1

The problem has been addressed by SAP in Data Services 4.1: you can use the bulk load option for target tables, even if the records contain UPDATE or DELETE opcodes. In this case Data Services creates staging tables for the update and/or delete records, bulk loads these records into the staging table and then applies the updates/deletes in batch SQL statements within the target database.This is a similar solution as the workaround described above,  but the coding and maintenance is simple again: you can now go back t the typical solution described in figure 1 and at the same time apply bulk loading.

The DS 4.1 documentation describes in detail when this mechanism applies:
For Sybase IQ (source: Technical manuals Data Services 4.1, Performance Optimzation Guide):

SAP BusinessObjects Data Services supports bulk loading to Sybase IQ databases via the Sybase IQ
LOAD TABLE SQL command. For detailed information about the Sybase IQ LOAD TABLE parameters
and their behavior in the Sybase IQ database environment, see the relevant Sybase IQ product
documentation.

For improved performance when using changed-data capture or auto correct load, Data Services uses
a termporary staging table to load the target table. Data Services first loads the data to the staging
table, then it applies the operation codes (INSERT, UPDATE, and DELETE) to update the target table.
With the Bulk load option selected in the target table editor, any one of the following conditions triggers
the staging mechanism:
• The data flow contains a Map_CDC_Operation transform
• The data flow contains a Map_Operation transform that outputs UPDATE or DELETE rows
• The Auto correct load option in the target table editor is set to Yes
If none of these conditions are met, that means the input data contains only INSERT rows. Therefore,
Data Services does only a bulk INSERT operation, which does not require a staging table or the need
to execute any additional SQL.

Note that because the bulk loader for Sybase IQ also supports UPDATE and DELETE operations, the
following options (target table editor Options > Advanced > Update control) are also available for
bulk loading:
• Use input keys
• Auto correct load

For Hana (source: Technical manuals Data Services 4.1, Performance Optimzation Guide):

SAP BusinessObjects Data Services supports bulk loading to the SAP HANA database.


For improved performance when using changed-data capture or auto correct load, Data Services uses
a temporary staging table to load the target table. Data Services first loads the data to the staging table,
then it applies the operation codes (INSERT, UPDATE, and DELETE) to update the target table. With
the Bulk load option selected in the target table editor, any one of the following conditions triggers the
staging mechanism:


• The data flow contains a Map_CDC_Operation transform
• The data flow contains a Map_Operation transform that outputs UPDATE or DELETE rows
• The data flow contains a Table_Comparison transform
• The Auto correct load option in the target table editor is set to Yes
If none of these conditions are met, that means the input data contains only INSERT rows. Therefore
Data Services does only a bulk insert operation, which does not require a staging table or the need to
execute any additional SQL.


By default, Data Services automatically detects the SAP HANA target table type and updates the table
accordingly for optimal performance.

Because the bulk loader for SAP HANA is scalable and supports UPDATE and DELETE operations,
the following options (target table editorOptions > Advanced > Update control) are also available for
bulk loading:
• Use input keys
• Auto correct load

Performance considerations

The solution in DS 4.1 simplifies coding and maintenance. But the performance may still be a bit slower as the workaround described above for DS 4.0 or lower:

With the new solution in 4.1 Data Services bulk loads ALL columns of the UPDATE records in the dataflow to the staging table. In contrast, the workaround for DS  4.0 only uploads the primary key column(s). In case your  target table has a lot of columns the bulk load performance will be much better with the DS 4.0 workaraound solution.

Data Services: Loading flat files into IQ

We had a POC where it was required to load large flat files into a Sybase IQ data warehouse. We used Data Services 4.0 (DS)  to load the files into the staging area of the data warehouse.

Loading files into a database table is an easy job for an ETL tool. In DS, all you need to do is create a file format and import the staging table from IQ. Then connect the file format with the staging table in a dataflow, for instance:

Please note that this is a typical scenario if you are loading large files into an analytic or columnar database such as SAP Sybase IQ or SAP Hana. Even if you need to do some transformations at the beginning of your ETL process you would rather do this in a second step, because the database can most often process transformations much faster than Data Services could do. So, instead of using a Data Services query transform right after the file format you may want to simply first import the file in the database. Then in the next dataflow you would use the imported table (IMP_Customers in the example above) as a source table and specify the transformation logic in a query transform. Ideally the transformation logic will get pushed down to IQ and thus benefit from the high performance of the columnar database. In most of the cases I have seen so far this approach proofed to be quicker as opposed to executing  the transformation logic within the ETL tool.

So far so good, but there two things to keep in mind when loading flat files directly into a Sybase IQ table. First you need to decide on the loading mechanism that you specify in the target table options in Data Services:

Available loading mechanisms:

  1. Singe-record insert statements: this is the default option in Data Services. The tool will send a SQL insert statement  for each record to be loaded. In fact it is a clear no-go if you are loading large amounts of data. It can work fine for a few hundred or thousand records. It will be a definite showstopper if you are loading hundred thousands or millions or records. The only feasible solution for this is to use the bulk loading option in the target table.
  2. With the bulk loading option the client (Data Services) simply sends the LOAD TABLE … USING FILE …command to IQ. IQ will then handle the data load process completely on its own. Here again, IQ offers various other options which are all supported by Data Services. You can specify these options in the advanced datastore configuration:
  • JS and DB on same machine: the file can reside local on the Sybase IQ server or remote. If it is remote, the IQ ODBC client will handle the transfer of the file from the client machine (in this case the Data Services server) to the IQ server.
  • Use named pipe: the bulk load can be managed using a named pipe as data file: in this scenario another process keeps on writing data into the file (named pipe) while at the same time IQ picks up the new incoming records from the file and loads them into the table. Again, named pipes can reside local on the IQ server or remote on the client machine.

In all my tests that I have done so far in various setups the option of loading a complete file which is local on the IQ server was always the fastest option. Named pipes seem to slow down the bulk load. Still, I would recommend that you test these various options in your own environment.  If you decide not to use named pipes there is still one important issue to keep in mind:

Bulk loading from Data Services into IQ:

The  screenshot above might infer that Data Services would simply send a LOAD TABLE … USING FILE … command to IQ. The whole loading process would be managed completely by IQ and Data Services actually had nothing to do in the data flow. This not true, though:

  • First, Data Services loads the file into its memory. In this step it will also perform all kind of error handling that is specified at the level of the file format!
  • Second, Data Services writes the just loaded data to another file into its bulk loader directory (option Bulk loader directory in the advanced datastore settings – if not specified it is the ~/BusinessObjects/DataServices/log/bulkloader directory).
  • Third it sends the LOAD TABLE … USING FILE …command to IQ to initiate the bulk loading process within IQ.

You can read these processing steps from the trace log, for instance:

In this example we are loading 130 million records with the dataflow as shown in the screenshot above. From 10:53 until 11:07  Data Services did the first two steps: loading the file into its memory and writing it into the bulk load directory. At 11:07 Data Services sent the LOAD TABLE statement to IQ. From 11:07 until 11:11 IQ loaded the interim file from the bulk load directory into its table.

If you are not depending on the file error handling from Data Services the extra work from Data Services from 10:53 until 11:07 looks like useless overhead and waste of time. Furthermore, a similar kind of error handling could be defined at the level of the bulk load statement in IQ. It is also supported by Data Services: you can specify the error handling options of the IQ bulk load in the target table options (obviously, your ETL process still needed to check the IQ error log files if you need to have some tighter control in case of loading errors):

How to avoid the extra work of Data Services ?

If you don’t need the error handling features of Data Services while loading a file (or if you can handle these with the IQ bulk load error handling) you may want to get rid of the extra loading/writing steps that Data Services is doing. There are two options:

  1. Write your own LOAD TABLE statement in a Data Services script using the sql function. I did this by copying the LOAD table statement from the trace log from Data Services. You will still need to adapt this LOAD TABLE statement, because Data Services will probably write its own interim file in a slight different format than the original file. Probably row delimiter and field delimiter need to be adapted.
  2. (More unrealistic:) vote for my improvement suggestion in SAP idea place: the solution  above works definitely, but this is not the way we want  to code an ETL process when using an ETL tool. I therefore suggested that the IQ bulk load from a data file (which is provided by some supplier outside Data Services) bypasses the loading into the memory of Data Services. In this case features like error handling in Data Services needed to be deactivated.