Page tree

 

Contents:


This section describes how to create and deploy Java-based user-defined functions (UDFs) into your Trifacta® deployment.

Creation of UDFs requires development experience and access to an integrated development environment (IDE).

Pre-requisites

  1. Access to the Trifacta deployment
  2. IDE
  3. The Java UDF is stored in the Trifacta deployment in the following location: libs/custom-udfs-sdk/build/distributions/java-custom-udf-sdk.zip  

NOTE: If you are installing custom UDFs and the Trifacta node does not have an Internet connection, you should download the Java UDF SDK in an Internet-accessible location, build your customer UDF JAR there, and then upload the JAR to the Trifacta node

Overview

Each UDF can take one or more inputs and produces a single output value (map only).

Inputs and outputs must be one of the following types:

  • Bool
  • String
  • Long
  • Double

Known Limitations

  • In the Trifacta application, previews are not available for user-defined functions.
  • Retaining state information across the exec method is unstable. More information is provided below.

    NOTE: When a recipe containing a user-defined function is applied to text data, any null characters cause records to be truncated by the running environment during Trifacta Photon job execution. In these cases, please execute the job in the Spark running environment.

Enable Service

You must enable the Java UDF service in the Trifacta platform.

Steps:

  1. You can apply this change through the Admin Settings Page (recommended) or trifacta-conf.json. For more information, see Platform Configuration Methods.
  2. Enable the correct flag:

    "feature.enableUDFTransform.enabled": true,
  3. Save your changes.

Deployment

Steps:

  1. Unzip java-custom-udf-sdk.zip.
  2. Within the unzipped directory, execute the install command. The following is specific to the Eclipse IDE:
     

    gradlew eclipse
  3. Import the project into your IDE.

Creating a UDF

UDF Requirements

All UDFs must implement the TrifactaUDF interface. This interface adds the four methods that each UDF must override: init, exec, inputSchema, and finish.

  1. init method: Used for setting private variables in the UDF. This method may be a no-op function if no variables must be set. See the Example - Concatenate strings below. 

    Tip: In this method, perform your data validation on the input parameters, including count, data type, and other constraints.

    NOTE: The init method must be specified but can be empty, if there are no input parameters.

  2. exec method:  Contains functionality of the UDF. The output of the exec method must be one of the supported types. It is also must match the generic as described. In the following example, TrifactaUDF<String> implements a String. This method is run on each record.

    Tip: In this method, you should check the number of input columns.

    Keep state that varies across calls to the exec method can lead to unexpected behavior. One-time initialization, such as initializing the regex compiler, is safe, but do not allow state information to mutate across calls to exec. This is a known issue.

  3. inputSchema method: The inputSchema method describes the schema of the list on which the exec method is acting. The classes in the schema must be supported. Essentially, you should support the I/O types described earlier.
  4. finish method: The finish method is run at the end of UDF. Typically, it is a no-op.

    NOTE: If you are executing your UDF on the Spark running environment, the finish method cannot be invoked at this point. Instead, it is invoked as part of the shutdown of the Java VM. This later execution may result in the finish method failing to be invoked in situations like a JVM crash.

Example - Concatenate strings

The following code example concatenates two input strings in the List<Object>. This UDF can be easily modified to concatenate more strings by modifying the inputSchema function. 

Example UDF: ConcatUDF
package com.trifacta.trifactaudfs;
import java.io.IOException;
import java.util.List;


/**
 * Example UDF that concatenates two columns
 */
public class ConcatUDF implements TrifactaUDF<String> {
  @Override
  public String exec(List<Object> inputs) throws IOException {
    if (inputs == null) {
      return null;
    }
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < inputSchema().length; i += 1) {
      if (inputs.get(i) == null) {
        return null;
      }
      sb.append(inputs.get(i));
    }
    return sb.toString();
  }
  @SuppressWarnings("rawtypes")
  public Class[] inputSchema() {
    return new Class[]{String.class, String.class};
  }
  @Override
  public void finish() throws IOException {
  }
  @Override
  public void init(List<Object> initArgs) {
  }
}

Notes:

  • The first line indicates that the function is part of the com.trifacta.trifactaudfs package.
  • The defined UDF class implements the TrifactaUDF class, which is the base interface for UDFs. 
    • It is parameterized with the return type of the UDF (a Java String in this case). 
    • The input into the function is a list with input parameters in the order they are passed to the function within the Trifacta platform. See Running Your UDF below. 
  • The UDF checks the input data for null values, and if any nulls are detected, returns a null. 
  • The inputSchema describes the input list passed into the exec method. 
    • An error is thrown if the type of the data that is passed into the UDF does not match the schema.
    • The UDF must handle improper data. See Error Handling below. 

Example - Add by constant

In this example, the input value is added by a constant, which is defined in the init method.

  • The init method consumes a list of objects, each of which can be used to set a variable in the UDF. The input into the init function is a list with parameters in the order they are passed to the function within the Trifacta platform. See Running Your UDF below.
Example UDF: AdderUDF
package com.trifacta.trifactaudfs;
import java.io.IOException;
import java.util.List;

/**
 * Example UDF. Adds a constant amount to an Integer column.
 */
public class AdderUDF implements TrifactaUDF<Long> {
  private Long _addAmount;
  @Override
  public void init(List<Object> initArgs) {
    if (initArgs.size() != 1) {
      System.out.println("AdderUDF takes in exactly one init argument");
    }
    Long addAmount = (Long) initArgs.get(0);
    _addAmount = addAmount;
  }
  @Override
  public Long exec(List<Object> input) {
    if (input == null) {
      return null;
    }
    if (input.size() != 1) {
      return null;
    }
    return (Long) input.get(0) + _addAmount;
  }
  @SuppressWarnings("rawtypes")
  public Class[] inputSchema() {
    return new Class[]{Long.class};
  }
  @Override
  public void finish() throws IOException {
  }
}

Error Handling

The UDF must handle any error that should occur when processing the function. Two ways of dealing with errors:

  1. For null data generated in the exec method, a null value can be returned. It appears in the final generated column.
  2. Any errors that cause the UDF to stop in the init or exec methods cause an IOException to be thrown. This error signals the platform that an issue occurred with the UDF.

Tip: You can add to the Trifacta logs through Logger. Annotate your exceptions at the appropriate logging level.

Testing the UDF

JUnit can be used to test the UDF. Below are examples of testing the two example UDFs.

Example - JUnit test for Concatenate strings:

ConcatUDF Test
@Test
public void concatUDFTest() throws IOException {
  ConcatUDF concat = new ConcatUDF();
  ArrayList<Object> input = new ArrayList<Object>();
  input.add("hello");
  input.add("world");
  String result = concat.exec(input);
  String expected = "helloworld";
  assertEquals(expected, result);
}


Example - JUnit test for Add by constant:
 

AdderUDF Test
@Test
public void adderUDFTest() {
  AdderUDF add = new AdderUDF();
  ArrayList<Object> initArgs = new ArrayList<Object>(1);
  initArgs.add(1L);
  add.init(initArgs);
  ArrayList<Object> inputs1 = new ArrayList<Object>();
  inputs1.add(1L);
  long result = add.exec(inputs1);
  long expected = 2L;
  assertEquals(expected, result);
 
  ArrayList<Object> inputs2 = new ArrayList<Object>();
  inputs2.add(9000L);
  result = add.exec(inputs2);
  expected = 9001L;
  assertEquals(expected, result);
}

Compiling the UDF

After writing the UDF, it must be compiled and included in a JAR before registering it with the platform. To compile and package the function, run the following command from the root directory:

gradlew build

The UDF code is assembled, and unit tests are executed. If all is well, the following JAR file is created in build/libs.

NOTE: Custom UDFs should be compiled to one or more JAR files. Avoid using the example JAR filename, which can be overwritten on upgrade.

 

JDK version mismatches

To avoid an Unsupported major.minor version error during execution, the JDK version used to compile the UDF JAR file should be less than or equal to the JDK version on the Hadoop cluster.

If this is not possible, then set the value of the Compatibility properties in the local build.gradle file to the JDK version on the Hadoop cluster prior to building the JAR file.

Example:

If the Hadoop cluster is on JDK 1.8, then add the following to the build.gradle file:

targetCompatibility = '1.8'
sourceCompatibility = '1.8'

Registering the UDF

After a function is compiled it must be registered with the platform.:

  1. Enable user-defined functions (if not done so already)
  2. Path to the JAR file that was generated in the previous steps.
  3. The udfPackages value should contain the package name where the UDFs can be found.

Example configuration:

To apply this configuration change, login as an administrator to the Trifacta node. Then, edit trifacta-conf.json. Some of these settings may not be available through the Admin Settings Page. For more information, see Platform Configuration Methods.

Example Config
...
"feature": {
  "enableUDFTransform": {
    "enabled": true
  }
},
"udf-service": {
  "classpath": "%(topOfTree)s/services/udf-service/build/libs/udf-service.jar:%(topOfTree)s/services/udf-service/build/dependencies/*",
  "additionalJars": [
    "/vagrant/libs/custom-udfs-sdk/build/libs/custom-udfs-example.jar"
  ],
  "udfPackages": [
    "com.trifacta.trifactaudfs"
  ]
},
...

Notes:

  • Set enableUDFTransform.enabled to true, which enables UDFs in general.
  • Under udf-service:

    • specify the full path to the JAR under additionalJars 

    • append the paths of any extra JAR dependencies that your UDFs require under classpath 

      NOTE: Do not include any extra JAR dependencies in the udf-service/build/dependencies directory, as this directory may be purged at build time.

    • specify the fully qualified package names under udfPackages 

      • This list contains all fully qualified names of your UDFs.

      • For example. if your UDF is com.company.ourudfs.MyUDF, then the package name is the following: com.company.ourudfs

Steps:

After modifying the config, the udf-service needs to be restarted.

    1. If you created a new UDF, restart the Trifacta application:

      service trifacta restart
    2. If you have modified an existing UDF, restart the UDF service:

      NOTE: For an existing UDF, you must rebuild the JAR first. Otherwise, the changes are not recognized during service re-initialization.

      service java-udf-service restart
  1. As part of the restart, any newly added Java UDFs are registered with the application.

Enabling UDF service on HDInsight cluster

By default, the UDF service utilizes compression across the websockets when running on the cluster. HDInsight clusters do not support compression on websockets. 

To make sure the UDF service works on your HDInsight cluster, please do the following. 

Steps:

  1. To apply this configuration change, login as an administrator to the Trifacta node. Then, edit trifacta-conf.json. Some of these settings may not be available through the Admin Settings Page. For more information, see Platform Configuration Methods.
  2. Locate the udf-service configuration.
  3. Insert the following extra property in the udf-service configuration area:

    "udf-service": {
       ...
       "jvmOptions": ["-Dorg.apache.tomcat.websocket.DISABLE_BUILTIN_EXTENSIONS=true"],
       ...
    }
  4. Save your changes and restart the platform.

Running Your UDF

For more information on executing your UDF in the Transformer page, see User-Defined Functions.

Troubleshooting

"Websocket Receive()" error in Transformer page UI

If you execute a Java UDF, you may see an error similar to the following in the Transformer page:

Please reload page (query execution failed).pp::WebSocket::Receive() error: Unspecified failure.

When you check the udf.log file on the server, the following may be present:

UDFWebsocket closed with status: CloseStatus[code=1009, reason=The decoded text message was too big for the output buffer and the endpoint does not support partial messages]

Solution

The above issue is likely to be caused by the Trifacta Photon running environment sending too much data through the buffer of the UDF's Websocket service. By default, this buffer size is set to 1048576 bytes (1 MB).

The Trifacta Photon running environment processes data through the Websocket service in 1024 (1 K) rows at a time for the input and output columns of the UDF. If the data in the input columns to the UDF or output columns from the UDF exceeds 1 KB (1024 characters) in total size for each row, the default size of the buffer is too small, since the Trifacta Photon running environment processed 1K records at a time (1 K characters * 1 K rows > 1048576). The query then fails.

When setting a new buffer size:

  • Assume that 1024 rows are processed from the buffer each time.
  • Identify the input columns and output columns for the UDF that is failing.
  • Identify the dataset that has the widest columns for both inputs and outputs here. 

    Tip: You can use the LEN function to do string-based computations of column width. See LEN Function.

  • Perform the following estimate on the widest set of input and output columns that you are processing:
    • Estimate the total expected number of characters for the input columns of the UDF. 
    • Add a 20% buffer to the above estimate. 
    • Repeat the above estimate for the widest output columns for the UDF.
    • Set your buffer size to the larger of the two estimates (input columns' width or output columns' width).
  • Example: A UDF takes two inputs and produces one output: 

    • If each input column is 256 characters, then the size of 1K rows of input would be 256 bytes * 2 (input cols) * 1024 rows = 0.5 MB. 

    • If the output of the UDF per row is estimated to be 1024 characters, then the output estimate would be 1024 bytes * 1024 rows = 1MB. 

    • So, set the buffer size to be 1 MB + 20% buffer over the larger estimate between input and output. In this example, the buffer size should be 1.2 MB or 1258291 Bytes.

Steps:

  1. You can apply this change through the Admin Settings Page (recommended) or trifacta-conf.json. For more information, see Platform Configuration Methods.
  2. Change the following setting:

    "udf-service.outputBufferSize": 1048576,
  3. Save your changes and restart the platform.

Photon crashes during execution of UDF

During the execution of a UDF, the Photon client can crash. Possible errors include:

Error in changeCurrentEdit Error: Transformation engine has crashed. Please reload your browser (exit code: null; message ID: 161)

Solution:


This crash can be caused by a number of issues. You can try the following:

  1. You can apply this change through the Admin Settings Page (recommended) or trifacta-conf.json. For more information, see Platform Configuration Methods.
  2. Bump the value for udf-service.udfCommunicationTimeout setting. Raise this value a bit at a time to see if that allows the UDF to execute.

    NOTE: Avoid setting this value to high, which can cause the Java heap size to be exceeded and another Photon crash. Maximum value is 2147483646.

  3. Save your changes and restart the platform.

This page has no comments.