Oracle Cloud Infrastructure Documentation

HDFS Connector for Object Storage

The Hadoop Distributed File System (HDFS) Connector lets your Apache Hadoop application read and write data to and from the Oracle Cloud Infrastructure Object Storage service.

This SDK and sample is dual-licensed under the Universal Permissive License 1.0 and the Apache License 2.0; third-party content is separately licensed as described in the code.

Requirements

To use the HDFS connector, you must have:

  • An Oracle Cloud Infrastructure account.
  • A user created in that account, in a group with a policy that grants the desired permissions for any bucket you want to use. This can be a user for yourself, or another person/system that needs to call the API. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a basic Object Storage policy, see Let Object Storage admins manage buckets and objects.
  • Java 8
  • A TTL value of 60. For more information, see Configuring JVM TTL for DNS Name Lookups.

Credentials and Passwords

If you use an encrypted PEM file for credentials, the passphrase will be read from configuration using the getPassword Hadoop Configuration method. The getPassword option checks for a password in a registered security provider. If the security provider doesn't contain the requested key, it will fallback to reading the plaintext passphrase directly from the configuration file.

Configuring JVM TTL for DNS Name Lookups

The Java Virtual Machine (JVM) caches DNS responses from lookups for a set amount of time, called time-to-live (TTL). This ensures faster response time in code that requires frequent name resolution.

The JVM uses the networkaddress.cache.ttl property to specify the caching policy for DNS name lookups. The value is an integer that represents the number of seconds to cache the successful lookup. The default value for many JVMs, -1, indicates that the lookup should be cached forever.

Because resources in Oracle Cloud Infrastructure use DNS names that can change, we recommend that you change the the TTL value to 60 seconds. This ensures that the new IP address for the resource is returned on next DNS query. You can change this value globally or specifically for your application:

  • To set TTL globally for all applications using the JVM, add the following in the $JAVA_HOME/jre/lib/security/java.security file:

    networkaddress.cache.ttl=60
  • To set TTL only for your application, set the following in your application's initialization code:

    java.security.Security.setProperty("networkaddress.cache.ttl" , "60");

Installation

Copy the bundled jars from lib and third-party/lib the to each node of the Hadoop cluster so that they are included in Hadoop’s CLASSPATH.

Java SDK and Maven Artifacts

Building an HDFS connector relies on Maven artifacts that are provided by the Java SDK. To obtain the artifacts, you must download the Java SDK and build it locally. You can then build the HDFS connector.

Important

The Java SDK file version that you download from the Oracle Releases page must match the HDFS connector version, which you can find in the hdfs-connector/pom.xml file in the dependency tag block that has the groupId attribute.

Properties

You can set the following HDFS Connector properties in the core-site.xml file. The BmcProperties page lists additional properties that you can configure for a connection to Object Storage.

You can specify that a property value applies to a specific bucket by appending .<bucket_name>.<namespace_name> to the property name.

This example shows how properties can be configured in a core-site.xml file (the OCIDs are shortened for brevity):

<configuration>
...
  <property>
    <name>fs.oci.client.hostname</name>
    <value>https://objectstorage.us-ashburn-1.oraclecloud.com</value>
  </property>
  <property>
    <name>fs.oci.client.hostname.myBucket.myNamespace</name>
    <value>https://objectstorage.phoenix-1.oraclecloud.com</value><!-- Use Phoenix for myBucket@myNamespace -->
  </property>
  <property>
    <name>fs.oci.client.auth.tenantId</name>
    <value>ocid1.tenancy.oc1..aaaaaaaaba3pv6wkcr4j...stifsfdsq</value> 
  </property>
  <property>
    <name>fs.oci.client.auth.userId</name>
    <value>ocid1.user.oc1..aaaaaaaat5nvwcnazjc...aqw3rynjq</value>
  </property>
  <property>
    <name>fs.oci.client.auth.fingerprint</name>
    <value>20:3b:97:13:55:1c:5b:0d:d3:37:d8:50:4e:c5:3a:34</value>
  </property>
  <property>
    <name>fs.oci.client.auth.pemfilepath</name>
    <value>~/.oci/oci_api_key.pem</value>
  </property>
...
</configuration>

Using Instance Principals for Authentication

Oracle provides instance principals so that you no longer need to configure user credentials or provide PEM files on services running on instances. Each of these instances has its own identity and authenticates by using certificates added to the instance by instance principals.

To use instance principals authentication with the HDFS connector, simply provide the property fs.oci.client.custom.authenticator and set the value to com.oracle.bmc.hdfs.auth.InstancePrincipalsCustomAuthenticator.

Because using instance principals provides your instance with a custom authenticator, it is no long necessary to configure the following properties:

  • fs.oci.client.auth.tenantId
  • fs.oci.client.auth.userId
  • fs.oci.client.auth.fingerprint
  • fs.oci.client.auth.pemfilepath
  • fs.oci.client.auth.passphrase

The following example code illustrates using instance principals for authentication with the HDFS connector:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>fs.oci.client.hostname</name>
    <value>https://objectstorage.us-phoenix-1.oraclecloud.com</value>
  </property>
  <property>
    <name>fs.oci.client.custom.authenticator</name>
    <value>com.oracle.bmc.hdfs.auth.InstancePrincipalsCustomAuthenticator</value>
  </property>
</configuration>

For more information about instance principals, see Announcing Instance Principals for Identity and Access Management.

Configuring a HTTP Proxy

You can set the following optional properties in the core-site.xml file to configure a HTTP proxy:

Note

Configuring a proxy enables use of the ApacheConnectorProvider when making connections to Object Storage. It buffers requests into memory and can impact memory utilization when uploading large objects. It is recommended to enable multipart uploads and adjust the multipart properties to manage memory consumption.

Large Object Uploads

Large objects are uploaded to Object Storage using multipart uploads. The file is split into smaller parts that are uploaded in parallel, which reduces upload times. This also enables the HDFS connector to retry uploads of failed parts instead of failing the entire upload. However, uploads may transiently fail, and the connector will attempt to abort partially uploaded files. Because these files accumulate (and you will be charged for storage), list the uploads periodically and then after a certain number of days abort them manually using the Java SDK.

Information about using the Object Storage API for managing multipart uploads can be found in API Documentation.

Note

If you prefer not to use multipart uploads, you can disable them by setting the fs.oci.client.multipart.allowed property to false.

Best Practices

The following sections contain best practices to optimize usage and performance.

Directory Names

There are no actual directories in Object Storage. Directory grouping is a function of naming convention, where objects use / delimiters in their names. For example, an object named a/example.json implies there is a directory named a. However, if that object is deleted, the a directory is also deleted implicitly. To preserve filesystem semantics where the directory can exist without the presence of any files, the HDFS connector creates an actual object whose name ends in / with a path that represents the directory, (e.g., create an object named a/). Now, deleting a/example.json doesn't affect the existence of the a directory, because the a/ object maintains its presence. However, it's entirely possible that somebody could delete that a/ object without deleting the files/directories beneath it. The HDFS connector will only delete the folder object if there are no objects beneath that path. The folder object itself is zero bytes.

Inconsistent Filesystem

Deleting a directory means deleting all objects that start with the prefix representing that directory. HDFS allows you to query for the file status of a file or a directory. The file status of a directory is implemented by verifying that the folder object for that directory exists. However, it's possible that the folder object has been deleted, but some of the objects with that prefix still exist. For example, in a situation with these objects:

  • a/b/example.json
  • a/b/file.json
  • a/b/

HDFS would know that directory /a/b/ exists and is a directory, and scanning it would result in example.json and file.json. However, if object a/b/ was deleted, the filesystem would appear to be in an inconsistent state. You could query it for all files in directory /a/b/ and find the two entries, but querying for the status of the actual /a/b/ directory would result in an exception because the directory doesn't exist. The HDFS connector does not attempt to fix up the state of the filesystem.

File Creation

Object Storage supports objects that can be many gigabytes in size. Creating files will normally be done by writing to a temp file and then uploading the contents of the file when the stream is closed. The temp space must be large enough to handle multiple uploads. The temp directory used is controlled by the hadoop.tmp.dir configuration property.

Read/Seek Support

When in-memory buffers are enabled (fs.oci.io.read.inmemory), seek is fully supported because the entire file is buffered into a byte array. When in-memory buffer is not enabled (likely because object sizes are large), seek is implemented by closing the stream and making a new range request starting at the specified offset.

Directory Listing

Listing a directory is essentially a List bucket operation with a prefix and delimiter specified. To create an HDFS FileStatus instance for each key, the connector performs an additional HEAD request to get ObjectMetadata for each individual key. This will be required until Object Storage supports richer list operation data.

URI Format for Filesystems and Files

HDFS filesystems and files are referenced through URIs. The scheme specifies the type of filesystem, and the remaining part of the URI is largely free for the filesystem implementation to interpret as it wants.

Because Object Storage is an object store, its ability to name objects as if they were files in a filesystem is used to mimic an actual filesystem.

Root

The root of Object Storage filesystem is denoted by a path where the authority component includes the bucket name and the namespace name, as shown:

Note

In the examples, "MyBucket" and "MyNamespace" are placeholders and should be replaced with appropriate values.

oci://MyBucket@MyNamespace/
		

This is always the root of the filesystem. The reason for using authority for both bucket and namespace is that HDFS only allows the authority portion to determine where the filesystem is; the path portion denotes just the path to the resource (so "oci//MyNamespace/MyBucket" won't work, for example). Note that the @ character is not a valid character for buckets or namespaces, and should allow the authority to be parsed correctly.

Sub-directories

Sub-directories do not actually exist, but can be mimicked by creating objects with / characters. For example, two files named a/b/c/example.json and a/b/d/path.json would appear as if they were in a common directory a/b. This would be achieved by using the Object Storage prefix- and delimiter-based querying. In the given example, referencing a sub-directory as a URI would be:

oci://MyBucket@MyNamespace/a/b/
		

Objects/Files

An object named a/b/c/example.json is referenced as:

oci://MyBucket@MyNamespace/a/b/c/example.json
		

Logging

Logging in the connector is done through SLF4J. SLF4J is a logging abstraction that allows the use of a user-supplied logging library (e.g., log4j). For more information, see, the SLF4J manual.

The following example shows how to enable basic logging to standard output.

  1. Download the SLF4J Simple binding jar: SLF4J Simple Binding
  2. Add the jar to your classpath
  3. Add the following VM arg to enable debug level logging (by default, info level is used): -Dorg.slf4j.simpleLogger.defaultLogLevel=debug

You can configure more advanced logging options by using the log4j binding.

Sample Hadoop Job

hadoop_sample_hdfs:


package com.oracle.oci.hadoop.example;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.oracle.oci.hdfs.BmcFilesystem;

import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class SampleOracleBmcHadoopJob
{
    private static final String SAMPLE_JOB_PATH = "/samplehadoopjob";
    private static final String INPUT_FILE = SAMPLE_JOB_PATH + "/input.dat";
    private static final String OUTPUT_DIR = SAMPLE_JOB_PATH + "/output";

    // non-static since this is the runner class it needs to initialize after we set the properties
    private final Logger log = LoggerFactory.getLogger(SampleOracleBmcHadoopJob.class);

    /**
     * Runner for sample hadoop job. This expects 3 args: path to configuration file, Object Store namespace, Object
     * Store bucket. To run this, you must:
     *{@code 


         * 
    Create a standard hadoop configuration file

         * 
    Create the bucket ahead of time.

         *} 


     * This runner will create a test input file in a file '/samplehadoopjob/input.dat', and job results will be written
     * to '/samplehadoopjob/output'.
     * 
     * @param args
     *            1) path to configuration file, 2) namespace, 3) bucket
     * @throws Exception
     */
    public static void main(final String[] args) throws Exception
    {
        if (args.length != 3)
        {
            throw new IllegalArgumentException(
                    "Must have 3 args: 1) path to config file, 2) object storage namespace, 3) object storage bucket");
        }

        // redirect all logs to sysout
        System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "debug");

        final SampleOracleBmcHadoopJob job = new SampleOracleBmcHadoopJob(args[0], args[1], args[2]);
        System.exit(job.execute());
    }

    private final String configurationFilePath;
    private final String namespace;
    private final String bucket;

    public int execute() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException
    {
        log.info("Creating hadoop configuration");
        final Configuration configuration = this.createConfiguration(this.configurationFilePath);

        final String authority = this.bucket + "@" + this.namespace;
        final String uri = "oci://" + authority;
        log.info("Using uri: {}", uri);

        log.info("Creating job inputs");
        this.setup(uri, configuration);

        log.info("Creating job");
        final Job job = this.createJob(configuration);

        final String in = uri + INPUT_FILE;
        final String out = uri + OUTPUT_DIR;
        log.info("Using input: {}", in);
        log.info("Using output: {}", out);

        FileInputFormat.addInputPath(job, new Path(in));
        FileOutputFormat.setOutputPath(job, new Path(out));

        log.info("Executing job...");
        final int response = job.waitForCompletion(true) ? 0 : 1;

        log.info("Attempting to read job results");
        this.tryReadResult(uri, configuration);
        return response;
    }

    private Configuration createConfiguration(final String configFilePath)
    {
        final Configuration configuration = new Configuration();
        configuration.addResource(new Path(configFilePath));
        return configuration;
    }

    private void setup(final String uri, final Configuration configuration) throws IOException, URISyntaxException
    {
        try (final BmcFilesystem fs = new BmcFilesystem())
        {
            fs.initialize(new URI(uri), configuration);
            fs.delete(new Path(SAMPLE_JOB_PATH), true);
            final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
            output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
            output.close();
        }
    }

    private Job createJob(final Configuration configuration) throws IOException
    {
        final Job job = Job.getInstance(configuration, "word count");
        job.setJarByClass(SampleOracleBmcHadoopJob.class);
        job.setMapperClass(SimpleMapper.class);
        job.setCombinerClass(SimpleReducer.class);
        job.setReducerClass(SimpleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        return job;
    }

    private void tryReadResult(final String uri, final Configuration configuration)
            throws IOException, URISyntaxException
    {
        try (final BmcFilesystem fs = new BmcFilesystem())
        {
            fs.initialize(new URI(uri), configuration);
            // this should be the output file name, but that could change
            final FSDataInputStream input = fs.open(new Path(OUTPUT_DIR + "/part-r-00000"));

            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
            IOUtils.copy(input, baos);
            log.info("\n=====\n" + baos.toString() + "=====");
            input.close();
        }
    }
}


package com.oracle.oci.hadoop.example;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SimpleMapper extends Mapper
{
    private final static IntWritable one = new IntWritable(1);
    private final Text word = new Text();

    @Override
    public void map(final Object key, final Text value, final Context context) throws IOException, InterruptedException
    {
        final StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens())
        {
            this.word.set(itr.nextToken());
            context.write(this.word, one);
        }
    }
}


package com.oracle.oci.hadoop.example;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SimpleReducer extends Reducer
{
    private final IntWritable result = new IntWritable();

    @Override
    public void reduce(final Text key, final Iterable values, final Context context)
            throws IOException, InterruptedException
    {
        int sum = 0;
        for (final IntWritable val : values)
        {
            sum += val.get();
        }
        this.result.set(sum);
        context.write(key, this.result);
    }
}

Troubleshooting

This section contains troubleshooting information for the HDFS Connector.

Troubleshooting Service Errors

Any operation resulting in a service error will cause an exception of type com.oracle.bmc.model.BmcException to be thrown by the HDFS Connector. For information about common service errors returned by OCI, see API Errors.

Java Encryption Key Size Errors

The HDFS Connector can only handle keys of 128 bit or lower key length. Users get "Invalid Key Exception" and "Illegal key size" errors when they use longer keys, such as AES256. Use one of the following workarounds to fix this issue:

Contributions

Got a fix for a bug, or a new feature you'd like to contribute? The SDK is open source and accepting pull requests on GitHub.

Notifications

If you wish to be notified when a new version of the HDFS connector is released, subscribe to the Atom feed.

Questions or Feedback

Ways to get in touch: