HTTP 上报

HTTP 上报流程介绍

InLong 通过 DataProxy 节点处理 HTTP 上报消息,上报源定期从 Manager 获取接入点列表,然后根据自身策略从接入点列表里选择可用的 HTTP 上报节点,再采用 HTTP 协议进行数据生产。总的 HTTP 上报流程如下图示:

HTTP 上报 - 图1

  • 心跳上报:DataProxy 定期上报心跳至 Manager,提供该节点已启用接入的 {IP,Port,Protocol,Load} 信息;

  • 在线节点缓存:Manager 缓存 DataProxy 上报的心跳信息,感知集群里可用的接入节点,以及可用的上报接入信息;

  • 接入点获取:HTTP SDK(数据上报源采用 DataProxy-SDK 实现的 HttpProxySender,或者据 HTTP 上报协议自行开发的 HTTP 上报 SDK)定期通过“/inlong/manager/openapi/dataproxy/getIpList/{inlongGroupId}”方法从 Manager 获取当前上报的groupId对应的可用上报接入点列表信息;

  • 接入点选取:HTTP SDK 根据上报节点选取策略,选择待进行消息上报的 DataProxy 节点;

  • 数据上报:HTTP SDK 根据 HTTP 上报协议构造上报消息,向选中的 DataProxy 节点发送请求消息,并在收到响应后根据响应结果做是否重发、异常输出等操作;

  • 数据接纳:DataProxy 检查 HTTP 消息,如果成功接纳则返回成功响应,并将消息转发给 MQ 集群;如果消息格式或者数值不符合规范,或者消息处理失败,则 DataProxy 返回失败响应,响应里携带对应的错误码和详细的错误信息。

建议: 由于 HTTP 上报存在性能低、有效数据占比低、请求消息容易丢失等问题,建议业务尽量用 TCP 方式进行数据上报。

新建实时同步任务

在 Dashboard 或者通过命令行工具创建任务,数据源类型使用 Auto Push (自主推送)。

方式一:调用 URL 上报

  1. curl -X POST -d 'groupId=give_your_group_id&streamId=give_your_stream_id&dt=data_time&body=give_your_data_body&cnt=1' http://dataproxy_url:46802/dataproxy/message
  • 参数说明:
参数含义备注
groupId数据流组 id
streamId数据流 ID
body推送的数据内容
dt推送的数据时间毫秒为单位的时间戳
cnt推送条数
  • 返回值:
返回码含义
1成功
非1失败

方式二:封装 HTTP Client(Java)

需要 httpclientcommons-lang3jackson-databind,代码示例:

  1. public class DataPush {
  2. private static CloseableHttpClient httpClient;
  3. private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  4. private final Random rand = new Random();
  5. private String sendByHttp(List<String> bodies, String groupId, String streamId, long dataTime,
  6. long timeout, TimeUnit timeUnit, List<String> addresses) throws Exception {
  7. if (null == addresses || addresses.isEmpty()) {
  8. throw new RuntimeException("addresses are null");
  9. }
  10. HttpPost httpPost = null;
  11. CloseableHttpResponse response = null;
  12. try {
  13. if (httpClient == null) {
  14. httpClient = constructHttpClient(timeout, timeUnit);
  15. }
  16. int randomNum = rand.nextInt((addresses.size() - 1) + 1);
  17. String url = "http://" + addresses.get(randomNum) + "/dataproxy/message";
  18. httpPost = new HttpPost(url);
  19. httpPost.setHeader(HttpHeaders.CONNECTION, "close");
  20. httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
  21. ArrayList<BasicNameValuePair> contents = getContents(bodies, groupId, streamId, dataTime);
  22. String s = URLEncodedUtils.format(contents, StandardCharsets.UTF_8);
  23. httpPost.setEntity(new StringEntity(s));
  24. response = httpClient.execute(httpPost);
  25. String returnStr = EntityUtils.toString(response.getEntity());
  26. if (StringUtils.isNotBlank(returnStr) && response.getStatusLine().getStatusCode() == 200) {
  27. JsonNode jsonNode = OBJECT_MAPPER.readTree(returnStr);
  28. if (jsonNode.has("code")) {
  29. int code = jsonNode.get("code").asInt();
  30. if (code == 0) {
  31. return "success";
  32. } else {
  33. return "fail";
  34. }
  35. }
  36. } else {
  37. throw new Exception("exception to get response from request " + returnStr + " "
  38. + response.getStatusLine().getStatusCode());
  39. }
  40. } finally {
  41. if (httpPost != null) {
  42. httpPost.releaseConnection();
  43. }
  44. if (response != null) {
  45. response.close();
  46. }
  47. }
  48. return "fail";
  49. }
  50. private static synchronized CloseableHttpClient constructHttpClient(long timeout, TimeUnit timeUnit) {
  51. if (httpClient != null) {
  52. return httpClient;
  53. }
  54. long timeoutInMs = timeUnit.toMillis(timeout);
  55. RequestConfig requestConfig = RequestConfig.custom()
  56. .setConnectTimeout((int) timeoutInMs)
  57. .setSocketTimeout((int) timeoutInMs).build();
  58. HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
  59. httpClientBuilder.setDefaultRequestConfig(requestConfig);
  60. return httpClientBuilder.build();
  61. }
  62. private static ArrayList<BasicNameValuePair> getContents(List<String> bodies,
  63. String groupId, String streamId, long dt) {
  64. ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
  65. params.add(new BasicNameValuePair("groupId", groupId));
  66. params.add(new BasicNameValuePair("streamId", streamId));
  67. params.add(new BasicNameValuePair("dt", String.valueOf(dt)));
  68. params.add(new BasicNameValuePair("body", StringUtils.join(bodies, "\n")));
  69. params.add(new BasicNameValuePair("cnt", String.valueOf(bodies.size())));
  70. return params;
  71. }
  72. }