This section describes how to create and deploy Java-based user-defined functions (UDFs) into your .

Creation of UDFs requires development experience and access to an integrated development environment (IDE).

Pre-requisites

  1. Access to the 
  2. IDE
  3. The Java UDF is stored in the  in the following location: libs/custom-udfs-sdk/build/distributions/java-custom-udf-sdk.zip  

NOTE: If you are installing custom UDFs and the does not have an Internet connection, you should download the Java UDF SDK in an Internet-accessible location, build your customer UDF JAR there, and then upload the JAR to the

Overview

Each UDF can take one or more inputs and produces a single output value (map only).

Inputs and outputs must be one of the following types:

Known Limitations

Enable Service

You must enable the Java UDF service in the .

Steps:

  1. Enable the correct flag:

    "feature.enableUDFTransform.enabled": true,
  2. Save your changes.

Deployment

Steps:

  1. Unzip java-custom-udf-sdk.zip.
  2. Within the unzipped directory, execute the install command. The following is specific to the Eclipse IDE:
     

    gradlew eclipse
  3. Import the project into your IDE.

Creating a UDF

UDF Requirements

All UDFs must implement the TrifactaUDF interface. This interface adds the four methods that each UDF must override: init, exec, inputSchema, and finish.

  1. init method: Used for setting private variables in the UDF. This method may be a no-op function if no variables must be set. See the Example - Concatenate strings below. 

    Tip: In this method, perform your data validation on the input parameters, including count, data type, and other constraints.

    NOTE: The init method must be specified but can be empty, if there are no input parameters.

  2. exec method:  Contains functionality of the UDF. The output of the exec method must be one of the supported types. It is also must match the generic as described. In the following example, TrifactaUDF<String> implements a String. This method is run on each record.

    Tip: In this method, you should check the number of input columns.

    Keep state that varies across calls to the exec method can lead to unexpected behavior. One-time initialization, such as initializing the regex compiler, is safe, but do not allow state information to mutate across calls to exec. This is a known issue.

  3. inputSchema method: The inputSchema method describes the schema of the list on which the exec method is acting. The classes in the schema must be supported. Essentially, you should support the I/O types described earlier.
  4. finish method: The finish method is run at the end of UDF. Typically, it is a no-op.

    NOTE: If you are executing your UDF on the Spark running environment, the finish method cannot be invoked at this point. Instead, it is invoked as part of the shutdown of the Java VM. This later execution may result in the finish method failing to be invoked in situations like a JVM crash.

Example - Concatenate strings

The following code example concatenates two input strings in the List<Object>. This UDF can be easily modified to concatenate more strings by modifying the inputSchema function. 

package com.trifacta.trifactaudfs;
import java.io.IOException;
import java.util.List;


/**
 * Example UDF that concatenates two columns
 */
public class ConcatUDF implements TrifactaUDF<String> {
  @Override
  public String exec(List<Object> inputs) throws IOException {
    if (inputs == null) {
      return null;
    }
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < inputSchema().length; i += 1) {
      if (inputs.get(i) == null) {
        return null;
      }
      sb.append(inputs.get(i));
    }
    return sb.toString();
  }
  @SuppressWarnings("rawtypes")
  public Class[] inputSchema() {
    return new Class[]{String.class, String.class};
  }
  @Override
  public void finish() throws IOException {
  }
  @Override
  public void init(List<Object> initArgs) {
  }
}

Notes:

Example - Add by constant

In this example, the input value is added by a constant, which is defined in the init method.

package com.trifacta.trifactaudfs;
import java.io.IOException;
import java.util.List;

/**
 * Example UDF. Adds a constant amount to an Integer column.
 */
public class AdderUDF implements TrifactaUDF<Long> {
  private Long _addAmount;
  @Override
  public void init(List<Object> initArgs) {
    if (initArgs.size() != 1) {
      System.out.println("AdderUDF takes in exactly one init argument");
    }
    Long addAmount = (Long) initArgs.get(0);
    _addAmount = addAmount;
  }
  @Override
  public Long exec(List<Object> input) {
    if (input == null) {
      return null;
    }
    if (input.size() != 1) {
      return null;
    }
    return (Long) input.get(0) + _addAmount;
  }
  @SuppressWarnings("rawtypes")
  public Class[] inputSchema() {
    return new Class[]{Long.class};
  }
  @Override
  public void finish() throws IOException {
  }
}

Error Handling

The UDF must handle any error that should occur when processing the function. Two ways of dealing with errors:

  1. For null data generated in the exec method, a null value can be returned. It appears in the final generated column.
  2. Any errors that cause the UDF to stop in the init or exec methods cause an IOException to be thrown. This error signals the platform that an issue occurred with the UDF.

Tip: You can add to the through Logger. Annotate your exceptions at the appropriate logging level.

Testing the UDF

JUnit can be used to test the UDF. Below are examples of testing the two example UDFs.

Example - JUnit test for Concatenate strings:

@Test
public void concatUDFTest() throws IOException {
  ConcatUDF concat = new ConcatUDF();
  ArrayList<Object> input = new ArrayList<Object>();
  input.add("hello");
  input.add("world");
  String result = concat.exec(input);
  String expected = "helloworld";
  assertEquals(expected, result);
}


Example - JUnit test for Add by constant:
 

@Test
public void adderUDFTest() {
  AdderUDF add = new AdderUDF();
  ArrayList<Object> initArgs = new ArrayList<Object>(1);
  initArgs.add(1L);
  add.init(initArgs);
  ArrayList<Object> inputs1 = new ArrayList<Object>();
  inputs1.add(1L);
  long result = add.exec(inputs1);
  long expected = 2L;
  assertEquals(expected, result);
 
  ArrayList<Object> inputs2 = new ArrayList<Object>();
  inputs2.add(9000L);
  result = add.exec(inputs2);
  expected = 9001L;
  assertEquals(expected, result);
}

Compiling the UDF

After writing the UDF, it must be compiled and included in a JAR before registering it with the platform. To compile and package the function, run the following command from the root directory:

gradlew build

The UDF code is assembled, and unit tests are executed. If all is well, the following JAR file is created in build/libs.

NOTE: Custom UDFs should be compiled to one or more JAR files. Avoid using the example JAR filename, which can be overwritten on upgrade.

 

JDK version mismatches

To avoid an Unsupported major.minor version error during execution, the JDK version used to compile the UDF JAR file should be less than or equal to the JDK version on the Hadoop cluster.

If this is not possible, then set the value of the Compatibility properties in the local build.gradle file to the JDK version on the Hadoop cluster prior to building the JAR file.

Example:

If the Hadoop cluster is on JDK 1.8, then add the following to the build.gradle file:

targetCompatibility = '1.8'
sourceCompatibility = '1.8'

Registering the UDF

After a function is compiled it must be registered with the platform.:

  1. Enable user-defined functions (if not done so already)
  2. Path to the JAR file that was generated in the previous steps.
  3. The udfPackages value should contain the package name where the UDFs can be found.

Example configuration:

...
"feature": {
  "enableUDFTransform": {
    "enabled": true
  }
},
"udf-service": {
  "classpath": "%(topOfTree)s/services/udf-service/build/libs/udf-service.jar:%(topOfTree)s/services/udf-service/build/dependencies/*",
  "additionalJars": [
    "/vagrant/libs/custom-udfs-sdk/build/libs/custom-udfs-example.jar"
  ],
  "udfPackages": [
    "com.trifacta.trifactaudfs"
  ]
},
...

Notes:

Steps:

After modifying the config, the udf-service needs to be restarted.

    1. If you created a new UDF, restart the :

      service trifacta restart
    2. If you have modified an existing UDF, restart the UDF service:

      NOTE: For an existing UDF, you must rebuild the JAR first. Otherwise, the changes are not recognized during service re-initialization.

      service java-udf-service restart
  1. As part of the restart, any newly added Java UDFs are registered with the application.

Enabling UDF service on HDInsight cluster

By default, the UDF service utilizes compression across the websockets when running on the cluster. HDInsight clusters do not support compression on websockets. 

To make sure the UDF service works on your HDInsight cluster, please do the following. 

Steps:

  1. Locate the udf-service configuration.
  2. Insert the following extra property in the udf-service configuration area:

    "udf-service": {
       ...
       "jvmOptions": ["-Dorg.apache.tomcat.websocket.DISABLE_BUILTIN_EXTENSIONS=true"],
       ...
    }
  3. Save your changes and restart the platform.

Running Your UDF

For more information on executing your UDF in the Transformer page, see User-Defined Functions.

For examples, see Invoke External Function.

Troubleshooting

"Websocket Receive()" error in Transformer page UI

If you execute a Java UDF, you may see an error similar to the following in the Transformer page:

Please reload page (query execution failed).pp::WebSocket::Receive() error: Unspecified failure.

When you check the udf.log file on the server, the following may be present:

UDFWebsocket closed with status: CloseStatus[code=1009, reason=The decoded text message was too big for the output buffer and the endpoint does not support partial messages]

Solution

The above issue is likely to be caused by the  running environment sending too much data through the buffer of the UDF's Websocket service. By default, this buffer size is set to 1048576 bytes (1 MB).

The  running environment processes data through the Websocket service in 1024 (1 K) rows at a time for the input and output columns of the UDF. If the data in the input columns to the UDF or output columns from the UDF exceeds 1 KB (1024 characters) in total size for each row, the default size of the buffer is too small, since the  running environment processed 1K records at a time (1 K characters * 1 K rows > 1048576). The query then fails.

When setting a new buffer size:

Steps:

  1. Change the following setting:

    "udf-service.outputBufferSize": 1048576,
  2. Save your changes and restart the platform.

Photon crashes during execution of UDF

During the execution of a UDF, the Photon client can crash. Possible errors include:

Error in changeCurrentEdit Error: Transformation engine has crashed. Please reload your browser (exit code: null; message ID: 161)

Solution:


This crash can be caused by a number of issues. You can try the following:

  1. Bump the value for udf-service.udfCommunicationTimeout setting. Raise this value a bit at a time to see if that allows the UDF to execute.

    NOTE: Avoid setting this value to high, which can cause the Java heap size to be exceeded and another Photon crash. Maximum value is 2147483646.

  2. Save your changes and restart the platform.