Hive 函数

通过 HiveModule 使用 Hive 内置函数

在 Flink SQL 和 Table API 中,可以通过系统内置的 HiveModule 来使用 Hive 内置函数,

详细信息,请参考 HiveModule

  1. String name = "myhive";
  2. String version = "2.3.4";
  3. tableEnv.loadModue(name, new HiveModule(version));
  1. val name = "myhive"
  2. val version = "2.3.4"
  3. tableEnv.loadModue(name, new HiveModule(version));
  1. modules:
  2. - name: core
  3. type: core
  4. - name: myhive
  5. type: hive
  • 请注意旧版本的部分 Hive 内置函数存在线程安全问题。 我们建议用户及时通过补丁修正 Hive 中的这些问题。

Hive 用户自定义函数(User Defined Functions)

在 Flink 中用户可以使用 Hive 里已经存在的 UDF 函数。

支持的 UDF 类型包括:

  • UDF
  • GenericUDF
  • GenericUDTF
  • UDAF
  • GenericUDAFResolver2

在进行查询规划和执行时,Hive UDF 和 GenericUDF 函数会自动转换成 Flink 中的 ScalarFunction,GenericUDTF 会被自动转换成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 则转换成 Flink 聚合函数(AggregateFunction).

想要使用 Hive UDF 函数,需要如下几步:

  • 通过 Hive Metastore 将带有 UDF 的 HiveCatalog 设置为当前会话的 catalog 后端。
  • 将带有 UDF 的 jar 包放入 Flink classpath 中,并在代码中引入。
  • 使用 Blink planner。

使用 Hive UDF

假设我们在 Hive Metastore 中已经注册了下面的 UDF 函数:

  1. /**
  2. * 注册为 'myudf' 的简单 UDF 测试类.
  3. */
  4. public class TestHiveSimpleUDF extends UDF {
  5. public IntWritable evaluate(IntWritable i) {
  6. return new IntWritable(i.get());
  7. }
  8. public Text evaluate(Text text) {
  9. return new Text(text.toString());
  10. }
  11. }
  12. /**
  13. * 注册为 'mygenericudf' 的普通 UDF 测试类
  14. */
  15. public class TestHiveGenericUDF extends GenericUDF {
  16. @Override
  17. public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
  18. checkArgument(arguments.length == 2);
  19. checkArgument(arguments[1] instanceof ConstantObjectInspector);
  20. Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
  21. checkArgument(constant instanceof IntWritable);
  22. checkArgument(((IntWritable) constant).get() == 1);
  23. if (arguments[0] instanceof IntObjectInspector ||
  24. arguments[0] instanceof StringObjectInspector) {
  25. return arguments[0];
  26. } else {
  27. throw new RuntimeException("Not support argument: " + arguments[0]);
  28. }
  29. }
  30. @Override
  31. public Object evaluate(DeferredObject[] arguments) throws HiveException {
  32. return arguments[0].get();
  33. }
  34. @Override
  35. public String getDisplayString(String[] children) {
  36. return "TestHiveGenericUDF";
  37. }
  38. }
  39. /**
  40. * 注册为 'mygenericudtf' 的字符串分割 UDF 测试类
  41. */
  42. public class TestHiveUDTF extends GenericUDTF {
  43. @Override
  44. public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
  45. checkArgument(argOIs.length == 2);
  46. // TEST for constant arguments
  47. checkArgument(argOIs[1] instanceof ConstantObjectInspector);
  48. Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
  49. checkArgument(constant instanceof IntWritable);
  50. checkArgument(((IntWritable) constant).get() == 1);
  51. return ObjectInspectorFactory.getStandardStructObjectInspector(
  52. Collections.singletonList("col1"),
  53. Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
  54. }
  55. @Override
  56. public void process(Object[] args) throws HiveException {
  57. String str = (String) args[0];
  58. for (String s : str.split(",")) {
  59. forward(s);
  60. forward(s);
  61. }
  62. }
  63. @Override
  64. public void close() {
  65. }
  66. }

在 Hive CLI 中,可以查询到已经注册的 UDF 函数:

  1. hive> show functions;
  2. OK
  3. ......
  4. mygenericudf
  5. myudf
  6. myudtf

此时,用户如果想使用这些 UDF,在 SQL 中就可以这样写:

  1. Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s);