Data Flow

​ Data Flow is an Oracle Cloud Infrastructure service for creating and running Spark applications. ADS can be used to to create and run PySpark Data Flow applications directly from notebook session.

Getting started with Data Flow

  • Before running applications in Data Flow service, there are two storage buckets that are required in Object Store. Data Flow requires a bucket to store the logs, and a data warehouse bucket for Spark SQL application set up storage

  • Data Flow requires policies to be set in IAM to access resources in order to manage and run applications policy set up

  • Full Data Flow Documentation: Data Flow documentation

Create a Data Flow instance

You first need to create a DataFlow object instance.

You can specify a default path where all the data flow artifacts will be stored using the dataflow_base_folder optional argument. By default all data flow artifacts are stored under /home/datascience/dataflow. This directory dataflow_base_folder contains multiple subdirectories, each one corresponding to a different application. The name of the subdirectory corresponds to the application name to which a random string is added as a suffix. In each application directory, artifacts generated by separate data flow runs are stored in different folders. Each folder is identified by the run display name and the run creation time. All the run specific artifacts including the script, the run configuration and the run logs are saved in the corresponding run folder.

Also, you can choose to use a specific compartment by providing the optional argument compartment_id when creating the dataflow instance. Otherwise, it will use the same compartment as your notebook session to create the instance.

from ads.dataflow.dataflow import DataFlow
data_flow = DataFlow(
  compartment_id="<compartmentA_OCID>",
  dataflow_base_folder="<my_dataflow_dir>"
)

Generate a script using template

We provide simple pyspark or sparksql templates for you to get started with Data Flow. You can use data_flow.template() to generate a pre-written template.

We support two types of templates:

  1. standard_pyspark template, which is for standard pyspark job

  2. sparksql template which is for sparksql job

from ads.dataflow.dataflow import DataFlow
data_flow = DataFlow()
data_flow.template(job_type='standard_pyspark')

data_flow.template() will return the local path to the script you have generated.

Create a Data Flow Application

​ The application creation process has two stages: preparation and creation stage. ​ In the preparation stage, you prepare a configuration object necessary to create the Data Flow application. You need to provide values for the three parameters:

  • display_name - the name you give your application

  • script_bucket - the bucket used to read/write the pyspark script in object storage

  • pyspark_file_path - the local path to your pyspark script

ADS checks that the bucket exists and that you can write to it from your notebook sesssion. Optionally, you can change values for the following parameters:

  • compartment_id - OCID of the compartment to create a Data Flow application. If it’s not provided, use the same compartment as your dataflow object by default.

  • logs_bucket - the bucket used to store run logs in object storage, the default value is "dataflow-logs"

  • driver_shape - driver shape to create the application, the default value is "VM.Standard2.4"

  • executor_shape - executor shape to create the application, the default value is "VM.Standard2.4"

  • num_executors - the number of executor VMs requested, the default value is 1

Note

  1. Your local script is uploaded to the script bucket in this preparation step. Object storage supports file versioning which creates an object version when the content changes or the object is deleted. You can enable this feature Object Versioning in your bucket in the Oracle Cloud Infrastructure console to prevent overwriting of existing files on Object Storage.

  2. If you want to use a private bucket as the logs_bucket, please make sure you have added a corresponding Data Flow Service policy in the step of Getting started with Data Flow. You can check this page for more details about setting up policies.

  3. ADS validates the input parameters in the preparation step.

Then you can use prepare_app() to create the configuration object necessary to create the application.

from ads.dataflow.dataflow import DataFlow

data_flow = DataFlow()
app_config = data_flow.prepare_app(
  display_name="<app-display-name>",
  script_bucket="<your-script-bucket>" ,
  pyspark_file_path="<your-scirpt-path>"
)

Once you have the application configured, you can create a Data Flow application using create_app:

app = data_flow.create_app(app_config)

You can explore a few attributes of the DataFlowApp object.

First you can look at the configuration of the application.

app.config

You can also get a url link to the OCI console Application Details page.

app.oci_link

Load an Existing Data Flow application

As an alternative to creating applications in ADS, you can load existing applications created elsewhere. These Data Flow applications must be Python applications. To load an existing applications, you need the applications’s OCID.

existing_app = data_flow.load_app(app_id, target_folder)

You can find the app_id either directly in the the Oracle Cloud Infrastructure Console or by listing existing applications.

Optionally, you could assign a value to the parameter target_folder. This parameter is the directory in which you want to store the local artifacts of this application. If target_folder is not provided, then by default the local artifacts of this application are stored under the dataflow_base_folder defined by the dataflow object instance.

Listing Data Flow Applications

From ADS you can list applications, that are returned a as a list of dicts, with a function to provide the data in a Pandas dataframe. The default sort order is the most recent run first.

For example, to list the most recent five applications use:

from ads.dataflow.dataflow import DataFlow
data_flow = DataFlow()
data_flow.list_apps().to_dataframe().head(5)
Listing of data flow apps

Create a Data Flow run

After an application is created or loaded in your notebook session, the next logical step is to execute a run of that application. The process of executing (aka creating) a run is similar to creating an application.

The first step is to configure the run. This can be done via the prepare_run() method of the DataFlowApp object. You only need to provide a value for the name of your run via run_display_name.

run_config = app.prepare_run(run_display_name="<run-display-name>")

Note

  1. You could use a compartment different from your application to create a run by specifying the compartment_id in prepare_run. By default, it will use the same compartment as your dataflow application to create the run.

  2. Optionally you can specify the logs_bucket to store the logs of your run. By default, the run will inherit the logs_bucket from the parent application but you can overwrite that option.

  3. Every time the Data Flow application launches a run, a local folder representing this Data Flow run is created. This folder stores all the information including the script, the run configuration, and any logs that are stored in the logs bucket.

Then, you can create a Data Flow run using the run_config generated in the preparation stage. During this process, you can monitor the Data Flow run while the job is running. You can also pull logs to your local directories by setting save_log_to_local=True.

run = app.run(run_config, save_log_to_local=True)

The DataFlowRun object has some useful attributes similar to the DataFlowApp object.

You can check the status of the run by

run.status

You can get the config file that created this run. The run configuration and the pyspark script used in this run are also saved in the corresponding run directory in your notebook environment.

run.config

You can get the run directory where the artifacts are stored in your notebook environment with:

run.local_dir

Similarily, you can get a clickable link to the Oracle Cloud Infrastructure Console Run Details page with:

run.oci_link

Fetching Logs

After a Data Flow run has completed, you can examine the logs using ADS. There are two types of logs, stdout and stderr

run.log_stdout.head()   # show first rows of stdout
run.log_stdout.tail()   # show last lines of stdout

# where the logs are stored on OCI Storage
run.log_stdout.oci_path

# the path to the saved logs in the notebook environment if ``save_log_to_local`` was ``True`` when you create this run
run.log_stdout.local_path

If save_log_to_local is set to False during app.run(...), you can fetch logs by calling the fetch_log(...).save() method on the DataFlowRun object with the correct logs type.

run.fetch_log("stdout").save()
run.fetch_log("stderr").save()

Note

Due to a limitation of pyspark (specifically Python applications in Spark) both stdout and stderr are merged into the stdout stream.

Edit and sync pyspark script

Data Flow integration with ADS supports the edit-run-edit cycle, the local pyspark script can be edited and will automatically be sync’d to object storage each time the app is run.

Data Flow sources the pyspark script from object storage so the local files in the notebook session are not visible to Data Flow. The app.run(...) method will compare the content hash of the local file with the remote copy on object storage, and if any change is detected, the new local version will be copied over to the remote. For the first run the sync will create the remote file and generate a fully qualified url with namespace that’s required for Data Flow.

Syncing is the default setting in app.run(...). If you do not want the app to sync with the local modified files, you need to include sync=False as an argument parameter in app.run(...).

Arguments and Parameters

Passing arguments to pyspark scripts is done with the arguments value in prepare_app. Additional to arguments Data Flow supports a parameter dictionary that may be used to interpolate arguments. To just simply pass arguments the script_parameter section may be ignored, however, any key/value pair defined in script_parameter may be referened in arguments using the ${key} syntax and the value of that key will be passed as the argument value.

from ads.dataflow.dataflow import DataFlow

data_flow = DataFlow()
app_config = data_flow.prepare_app(
  display_name,
  script_bucket,
  pyspark_file_path,
  arguments = ['${foo}', 'bar', '-d', '--file', '${filename}'],
  script_parameters={
    'foo': 'val1 val2',
    'filename': 'file1',
  }
)
app = data_flow.create_app(app_config)

run_config = app.prepare_run(run_display_name="test-run")
run = app.run(run_config)

Note

The arguments in the format of ${arg} will be replaced by the value provided in script parameters when passed in, while arguments not in this format are passed into the script verbatim.

You can override the values of some or all script parameters in each run by passing different values to prepare_run().

run_config = app.prepare_run(run_display_name="test-run", foo='val3')
run = app.run(run_config)

Fetching pyspark Output

After the application has run and any stdout captured in the log file the pyspark script most likely will produce some form of output. Usually a pyspark script will batch process something, for example, sampling data, aggregating data, preprocessing data. The resulting output can be loaded as an ADSDataset.open() using the ocis:// protocol handler.

The only way to get output from pyspark back into the notebook is to either create files on OCI Storage that is read into the notebook, or use the stdout stream. Here we show a simple example where the pyspark script produces some output that’s printed in a portable format, here we show JSON-L, but CSV works too. This method while convenient as an example, it’s not a recommended way for large data.

from pyspark.sql import SparkSession

def main():

    # create a spark session
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .getOrCreate()

    # load an example csv file from dataflow public storage into DataFrame
    original_df = spark\
          .read\
          .format("csv")\
          .option("header", "true")\
          .option("multiLine", "true")\
          .load("oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv")

    # the dataframe as a sql view so we can perform SQL on it
    original_df.createOrReplaceTempView("berlin")

    query_result_df = spark.sql("""
                      SELECT
                        city,
                        zipcode,
                        number_of_reviews,
                        CONCAT(latitude, ',', longitude) AS lat_long
                      FROM
                        berlin"""
                    )

    # Convert the filtered Spark DataFrame into json format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.

    print('\n'\
            .join(query_result_df\
            .toJSON()\
            .collect()))

if __name__ == '__main__':
    main()

Once the above has run the stdout stream (which contains CSV formatted data) can be interpreted as a string using Pandas

import io
import pandas as pd

from ads.dataset.factory import DatasetFactory

# the pyspark script wrote to the log as jsonL, and we read the log back as ADS dataset
ds = DatasetFactory.open(pd.read_json((str(run.log_stdout)), lines=True))

ds.head()