gpextprotocal.c

  1. #include "postgres.h"
  2. #include "fmgr.h"
  3. #include "funcapi.h"
  4. #include "access/extprotocol.h"
  5. #include "catalog/pg_proc.h"
  6. #include "utils/array.h"
  7. #include "utils/builtins.h"
  8. #include "utils/memutils.h"
  9. /* Our chosen URI format. We can change it however needed */
  10. typedef struct DemoUri
  11. {
  12. char *protocol;
  13. char *path;
  14. } DemoUri;
  15. static DemoUri *ParseDemoUri(const char *uri_str);
  16. static void FreeDemoUri(DemoUri* uri);
  17. /* Do the module magic dance */
  18. PG_MODULE_MAGIC;
  19. PG_FUNCTION_INFO_V1(demoprot_export);
  20. PG_FUNCTION_INFO_V1(demoprot_import);
  21. PG_FUNCTION_INFO_V1(demoprot_validate_urls);
  22. Datum demoprot_export(PG_FUNCTION_ARGS);
  23. Datum demoprot_import(PG_FUNCTION_ARGS);
  24. Datum demoprot_validate_urls(PG_FUNCTION_ARGS);
  25. /* A user context that persists across calls. Can be
  26. declared in any other way */
  27. typedef struct {
  28. char *url;
  29. char *filename;
  30. FILE *file;
  31. } extprotocol_t;
  32. /*
  33. * The read function - Import data into GPDB.
  34. */
  35. Datum
  36. myprot_import(PG_FUNCTION_ARGS)
  37. {
  38. extprotocol_t *myData;
  39. char *data;
  40. int datlen;
  41. size_t nread = 0;
  42. /* Must be called via the external table format manager */
  43. if (!CALLED_AS_EXTPROTOCOL(fcinfo))
  44. elog(ERROR, "myprot_import: not called by external
  45. protocol manager");
  46. /* Get our internal description of the protocol */
  47. myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
  48. if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
  49. {
  50. /* we're done receiving data. close our connection */
  51. if(myData && myData->file)
  52. if(fclose(myData->file))
  53. ereport(ERROR,
  54. (errcode_for_file_access(),
  55. errmsg("could not close file \"%s\": %m",
  56. myData->filename)));
  57. PG_RETURN_INT32(0);
  58. }
  59. if (myData == NULL)
  60. {
  61. /* first call. do any desired init */
  62. const char *p_name = "myprot";
  63. DemoUri *parsed_url;
  64. char *url = EXTPROTOCOL_GET_URL(fcinfo);
  65. myData = palloc(sizeof(extprotocol_t));
  66. myData->url = pstrdup(url);
  67. parsed_url = ParseDemoUri(myData->url);
  68. myData->filename = pstrdup(parsed_url->path);
  69. if(strcasecmp(parsed_url->protocol, p_name) != 0)
  70. elog(ERROR, "internal error: myprot called with a
  71. different protocol (%s)",
  72. parsed_url->protocol);
  73. FreeDemoUri(parsed_url);
  74. /* open the destination file (or connect to remote server in
  75. other cases) */
  76. myData->file = fopen(myData->filename, "r");
  77. if (myData->file == NULL)
  78. ereport(ERROR,
  79. (errcode_for_file_access(),
  80. errmsg("myprot_import: could not open file \"%s\"
  81. for reading: %m",
  82. myData->filename),
  83. errOmitLocation(true)));
  84. EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
  85. }
  86. /* ==========================================
  87. * DO THE IMPORT
  88. * ========================================== */
  89. data = EXTPROTOCOL_GET_DATABUF(fcinfo);
  90. datlen = EXTPROTOCOL_GET_DATALEN(fcinfo);
  91. /* read some bytes (with fread in this example, but normally
  92. in some other method over the network) */
  93. if(datlen > 0)
  94. {
  95. nread = fread(data, 1, datlen, myData->file);
  96. if (ferror(myData->file))
  97. ereport(ERROR,
  98. (errcode_for_file_access(),
  99. errmsg("myprot_import: could not write to file
  100. \"%s\": %m",
  101. myData->filename)));
  102. }
  103. PG_RETURN_INT32((int)nread);
  104. }
  105. /*
  106. * Write function - Export data out of GPDB
  107. */
  108. Datum
  109. myprot_export(PG_FUNCTION_ARGS)
  110. {
  111. extprotocol_t *myData;
  112. char *data;
  113. int datlen;
  114. size_t wrote = 0;
  115. /* Must be called via the external table format manager */
  116. if (!CALLED_AS_EXTPROTOCOL(fcinfo))
  117. elog(ERROR, "myprot_export: not called by external
  118. protocol manager");
  119. /* Get our internal description of the protocol */
  120. myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
  121. if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
  122. {
  123. /* we're done sending data. close our connection */
  124. if(myData && myData->file)
  125. if(fclose(myData->file))
  126. ereport(ERROR,
  127. (errcode_for_file_access(),
  128. errmsg("could not close file \"%s\": %m",
  129. myData->filename)));
  130. PG_RETURN_INT32(0);
  131. }
  132. if (myData == NULL)
  133. {
  134. /* first call. do any desired init */
  135. const char *p_name = "myprot";
  136. DemoUri *parsed_url;
  137. char *url = EXTPROTOCOL_GET_URL(fcinfo);
  138. myData = palloc(sizeof(extprotocol_t));
  139. myData->url = pstrdup(url);
  140. parsed_url = ParseDemoUri(myData->url);
  141. myData->filename = pstrdup(parsed_url->path);
  142. if(strcasecmp(parsed_url->protocol, p_name) != 0)
  143. elog(ERROR, "internal error: myprot called with a
  144. different protocol (%s)",
  145. parsed_url->protocol);
  146. FreeDemoUri(parsed_url);
  147. /* open the destination file (or connect to remote server in
  148. other cases) */
  149. myData->file = fopen(myData->filename, "a");
  150. if (myData->file == NULL)
  151. ereport(ERROR,
  152. (errcode_for_file_access(),
  153. errmsg("myprot_export: could not open file \"%s\"
  154. for writing: %m",
  155. myData->filename),
  156. errOmitLocation(true)));
  157. EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
  158. }
  159. /* ========================================
  160. * DO THE EXPORT
  161. * ======================================== */
  162. data = EXTPROTOCOL_GET_DATABUF(fcinfo);
  163. datlen = EXTPROTOCOL_GET_DATALEN(fcinfo);
  164. if(datlen > 0)
  165. {
  166. wrote = fwrite(data, 1, datlen, myData->file);
  167. if (ferror(myData->file))
  168. ereport(ERROR,
  169. (errcode_for_file_access(),
  170. errmsg("myprot_import: could not read from file
  171. \"%s\": %m",
  172. myData->filename)));
  173. }
  174. PG_RETURN_INT32((int)wrote);
  175. }
  176. Datum
  177. myprot_validate_urls(PG_FUNCTION_ARGS)
  178. {
  179. List *urls;
  180. int nurls;
  181. int i;
  182. ValidatorDirection direction;
  183. /* Must be called via the external table format manager */
  184. if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo))
  185. elog(ERROR, "myprot_validate_urls: not called by external
  186. protocol manager");
  187. nurls = EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo);
  188. urls = EXTPROTOCOL_VALIDATOR_GET_URL_LIST(fcinfo);
  189. direction = EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo);
  190. /*
  191. * Dumb example 1: search each url for a substring
  192. * we don't want to be used in a url. in this example
  193. * it's 'secured_directory'.
  194. */
  195. for (i = 1 ; i <= nurls ; i++)
  196. {
  197. char *url = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, i);
  198. if (strstr(url, "secured_directory") != 0)
  199. {
  200. ereport(ERROR,
  201. (errcode(ERRCODE_PROTOCOL_VIOLATION),
  202. errmsg("using 'secured_directory' in a url
  203. isn't allowed ")));
  204. }
  205. }
  206. /*
  207. * Dumb example 2: set a limit on the number of urls
  208. * used. In this example we limit readable external
  209. * tables that use our protocol to 2 urls max.
  210. */
  211. if(direction == EXT_VALIDATE_READ && nurls > 2)
  212. {
  213. ereport(ERROR,
  214. (errcode(ERRCODE_PROTOCOL_VIOLATION),
  215. errmsg("more than 2 urls aren't allowed in this protocol ")));
  216. }
  217. PG_RETURN_VOID();
  218. }
  219. /* --- utility functions --- */
  220. static
  221. DemoUri *ParseDemoUri(const char *uri_str)
  222. {
  223. DemoUri *uri = (DemoUri *) palloc0(sizeof(DemoUri));
  224. int protocol_len;
  225. uri->path = NULL;
  226. uri->protocol = NULL;
  227. /*
  228. * parse protocol
  229. */
  230. char *post_protocol = strstr(uri_str, "://");
  231. if(!post_protocol)
  232. {
  233. ereport(ERROR,
  234. (errcode(ERRCODE_SYNTAX_ERROR),
  235. errmsg("invalid protocol URI \'%s\'", uri_str),
  236. errOmitLocation(true)));
  237. }
  238. protocol_len = post_protocol - uri_str;
  239. uri->protocol = (char *)palloc0(protocol_len + 1);
  240. strncpy(uri->protocol, uri_str, protocol_len);
  241. /* make sure there is more to the uri string */
  242. if (strlen(uri_str) <= protocol_len)
  243. ereport(ERROR,
  244. (errcode(ERRCODE_SYNTAX_ERROR),
  245. errmsg("invalid myprot URI \'%s\' : missing path",
  246. uri_str),
  247. errOmitLocation(true)));
  248. /* parse path */
  249. uri->path = pstrdup(uri_str + protocol_len + strlen("://"));
  250. return uri;
  251. }
  252. static
  253. void FreeDemoUri(DemoUri *uri)
  254. {
  255. if (uri->path)
  256. pfree(uri->path);
  257. if (uri->protocol)
  258. pfree(uri->protocol);
  259. pfree(uri);
  260. }

上级主题: 安装外部表协议