1. #include "postgres.h"
    2. #include "fmgr.h"
    3. #include "funcapi.h"
    4. #include "access/extprotocol.h"
    5. #include "catalog/pg_proc.h"
    6. #include "utils/array.h"
    7. #include "utils/builtins.h"
    8. #include "utils/memutils.h"
    9. /* Our chosen URI format. We can change it however needed */
    10. typedef struct DemoUri
    11. {
    12. char *protocol;
    13. char *path;
    14. } DemoUri;
    15. static DemoUri *ParseDemoUri(const char *uri_str);
    16. static void FreeDemoUri(DemoUri* uri);
    17. /* Do the module magic dance */
    18. PG_MODULE_MAGIC;
    19. PG_FUNCTION_INFO_V1(demoprot_export);
    20. PG_FUNCTION_INFO_V1(demoprot_import);
    21. PG_FUNCTION_INFO_V1(demoprot_validate_urls);
    22. Datum demoprot_export(PG_FUNCTION_ARGS);
    23. Datum demoprot_import(PG_FUNCTION_ARGS);
    24. Datum demoprot_validate_urls(PG_FUNCTION_ARGS);
    25. /* A user context that persists across calls. Can be
    26. declared in any other way */
    27. typedef struct {
    28. char *url;
    29. char *filename;
    30. FILE *file;
    31. } extprotocol_t;
    32. /*
    33. * The read function - Import data into GPDB.
    34. */
    35. Datum
    36. myprot_import(PG_FUNCTION_ARGS)
    37. {
    38. extprotocol_t *myData;
    39. char *data;
    40. int datlen;
    41. size_t nread = 0;
    42. /* Must be called via the external table format manager */
    43. if (!CALLED_AS_EXTPROTOCOL(fcinfo))
    44. elog(ERROR, "myprot_import: not called by external
    45. protocol manager");
    46. /* Get our internal description of the protocol */
    47. myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    48. if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
    49. {
    50. /* we're done receiving data. close our connection */
    51. if(myData && myData->file)
    52. if(fclose(myData->file))
    53. ereport(ERROR,
    54. (errcode_for_file_access(),
    55. errmsg("could not close file \"%s\": %m",
    56. myData->filename)));
    57. PG_RETURN_INT32(0);
    58. }
    59. if (myData == NULL)
    60. {
    61. /* first call. do any desired init */
    62. const char *p_name = "myprot";
    63. DemoUri *parsed_url;
    64. char *url = EXTPROTOCOL_GET_URL(fcinfo);
    65. myData = palloc(sizeof(extprotocol_t));
    66. myData->url = pstrdup(url);
    67. parsed_url = ParseDemoUri(myData->url);
    68. myData->filename = pstrdup(parsed_url->path);
    69. if(strcasecmp(parsed_url->protocol, p_name) != 0)
    70. elog(ERROR, "internal error: myprot called with a
    71. different protocol (%s)",
    72. parsed_url->protocol);
    73. FreeDemoUri(parsed_url);
    74. /* open the destination file (or connect to remote server in
    75. other cases) */
    76. myData->file = fopen(myData->filename, "r");
    77. if (myData->file == NULL)
    78. ereport(ERROR,
    79. (errcode_for_file_access(),
    80. errmsg("myprot_import: could not open file \"%s\"
    81. for reading: %m",
    82. myData->filename),
    83. errOmitLocation(true)));
    84. EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    85. }
    86. /* ==========================================
    87. * DO THE IMPORT
    88. * ========================================== */
    89. data = EXTPROTOCOL_GET_DATABUF(fcinfo);
    90. datlen = EXTPROTOCOL_GET_DATALEN(fcinfo);
    91. /* read some bytes (with fread in this example, but normally
    92. in some other method over the network) */
    93. if(datlen > 0)
    94. {
    95. nread = fread(data, 1, datlen, myData->file);
    96. if (ferror(myData->file))
    97. ereport(ERROR,
    98. (errcode_for_file_access(),
    99. errmsg("myprot_import: could not write to file
    100. \"%s\": %m",
    101. myData->filename)));
    102. }
    103. PG_RETURN_INT32((int)nread);
    104. }
    105. /*
    106. * Write function - Export data out of GPDB
    107. */
    108. Datum
    109. myprot_export(PG_FUNCTION_ARGS)
    110. {
    111. extprotocol_t *myData;
    112. char *data;
    113. int datlen;
    114. size_t wrote = 0;
    115. /* Must be called via the external table format manager */
    116. if (!CALLED_AS_EXTPROTOCOL(fcinfo))
    117. elog(ERROR, "myprot_export: not called by external
    118. protocol manager");
    119. /* Get our internal description of the protocol */
    120. myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    121. if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
    122. {
    123. /* we're done sending data. close our connection */
    124. if(myData && myData->file)
    125. if(fclose(myData->file))
    126. ereport(ERROR,
    127. (errcode_for_file_access(),
    128. errmsg("could not close file \"%s\": %m",
    129. myData->filename)));
    130. PG_RETURN_INT32(0);
    131. }
    132. if (myData == NULL)
    133. {
    134. /* first call. do any desired init */
    135. const char *p_name = "myprot";
    136. DemoUri *parsed_url;
    137. char *url = EXTPROTOCOL_GET_URL(fcinfo);
    138. myData = palloc(sizeof(extprotocol_t));
    139. myData->url = pstrdup(url);
    140. parsed_url = ParseDemoUri(myData->url);
    141. myData->filename = pstrdup(parsed_url->path);
    142. if(strcasecmp(parsed_url->protocol, p_name) != 0)
    143. elog(ERROR, "internal error: myprot called with a
    144. different protocol (%s)",
    145. parsed_url->protocol);
    146. FreeDemoUri(parsed_url);
    147. /* open the destination file (or connect to remote server in
    148. other cases) */
    149. myData->file = fopen(myData->filename, "a");
    150. if (myData->file == NULL)
    151. ereport(ERROR,
    152. (errcode_for_file_access(),
    153. errmsg("myprot_export: could not open file \"%s\"
    154. for writing: %m",
    155. myData->filename),
    156. errOmitLocation(true)));
    157. EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    158. }
    159. /* ========================================
    160. * DO THE EXPORT
    161. * ======================================== */
    162. data = EXTPROTOCOL_GET_DATABUF(fcinfo);
    163. datlen = EXTPROTOCOL_GET_DATALEN(fcinfo);
    164. if(datlen > 0)
    165. {
    166. wrote = fwrite(data, 1, datlen, myData->file);
    167. if (ferror(myData->file))
    168. ereport(ERROR,
    169. (errcode_for_file_access(),
    170. errmsg("myprot_import: could not read from file
    171. \"%s\": %m",
    172. myData->filename)));
    173. }
    174. PG_RETURN_INT32((int)wrote);
    175. }
    176. Datum
    177. myprot_validate_urls(PG_FUNCTION_ARGS)
    178. {
    179. List *urls;
    180. int nurls;
    181. int i;
    182. ValidatorDirection direction;
    183. /* Must be called via the external table format manager */
    184. if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo))
    185. elog(ERROR, "myprot_validate_urls: not called by external
    186. protocol manager");
    187. nurls = EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo);
    188. urls = EXTPROTOCOL_VALIDATOR_GET_URL_LIST(fcinfo);
    189. direction = EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo);
    190. /*
    191. * Dumb example 1: search each url for a substring
    192. * we don't want to be used in a url. in this example
    193. * it's 'secured_directory'.
    194. */
    195. for (i = 1 ; i <= nurls ; i++)
    196. {
    197. char *url = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, i);
    198. if (strstr(url, "secured_directory") != 0)
    199. {
    200. ereport(ERROR,
    201. (errcode(ERRCODE_PROTOCOL_VIOLATION),
    202. errmsg("using 'secured_directory' in a url
    203. isn't allowed ")));
    204. }
    205. }
    206. /*
    207. * Dumb example 2: set a limit on the number of urls
    208. * used. In this example we limit readable external
    209. * tables that use our protocol to 2 urls max.
    210. */
    211. if(direction == EXT_VALIDATE_READ && nurls > 2)
    212. {
    213. ereport(ERROR,
    214. (errcode(ERRCODE_PROTOCOL_VIOLATION),
    215. errmsg("more than 2 urls aren't allowed in this protocol ")));
    216. }
    217. PG_RETURN_VOID();
    218. }
    219. /* --- utility functions --- */
    220. static
    221. DemoUri *ParseDemoUri(const char *uri_str)
    222. {
    223. DemoUri *uri = (DemoUri *) palloc0(sizeof(DemoUri));
    224. int protocol_len;
    225. uri->path = NULL;
    226. uri->protocol = NULL;
    227. /*
    228. * parse protocol
    229. */
    230. char *post_protocol = strstr(uri_str, "://");
    231. if(!post_protocol)
    232. {
    233. ereport(ERROR,
    234. (errcode(ERRCODE_SYNTAX_ERROR),
    235. errmsg("invalid protocol URI \'%s\'", uri_str),
    236. errOmitLocation(true)));
    237. }
    238. protocol_len = post_protocol - uri_str;
    239. uri->protocol = (char *)palloc0(protocol_len + 1);
    240. strncpy(uri->protocol, uri_str, protocol_len);
    241. /* make sure there is more to the uri string */
    242. if (strlen(uri_str) <= protocol_len)
    243. ereport(ERROR,
    244. (errcode(ERRCODE_SYNTAX_ERROR),
    245. errmsg("invalid myprot URI \'%s\' : missing path",
    246. uri_str),
    247. errOmitLocation(true)));
    248. /* parse path */
    249. uri->path = pstrdup(uri_str + protocol_len + strlen("://"));
    250. return uri;
    251. }
    252. static
    253. void FreeDemoUri(DemoUri *uri)
    254. {
    255. if (uri->path)
    256. pfree(uri->path);
    257. if (uri->protocol)
    258. pfree(uri->protocol);
    259. pfree(uri);
    260. }

    Parent topic: Installing the External Table Protocol