pegasus python client

项目地址

https://github.com/XiaoMi/pegasus-python-client

版本要求

Python 2.*

安装

pip install pypegasus

使用

pegasus python client使用了twisted, 编写的代码会带有twisted的影子。

示例

完整的示例请参考sample。以下是简单的示例:

  1. #!/usr/bin/env python
  2. # coding:utf-8
  3. from pypegasus.pgclient import Pegasus
  4. from twisted.internet import reactor
  5. from twisted.internet.defer import inlineCallbacks
  6. @inlineCallbacks
  7. def basic_test():
  8. # init
  9. c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602'], 'temp')
  10. suc = yield c.init()
  11. if not suc:
  12. reactor.stop()
  13. print('ERROR: connect pegasus server failed')
  14. return
  15. # set
  16. try:
  17. ret = yield c.set('hkey1', 'skey1', 'value', 0, 500)
  18. print('set ret: ', ret)
  19. except Exception as e:
  20. print(e)
  21. # get
  22. ret = yield c.get('hkey1', 'skey1')
  23. print('get ret: ', ret)
  24. reactor.stop()
  25. if __name__ == "__main__":
  26. reactor.callWhenRunning(basic_test)
  27. reactor.run()

log配置文件

pegasus python client使用了logging日志包,默认配置如下:

  1. [loggers]
  2. keys=root
  3. [logger_root]
  4. level=INFO
  5. handlers=hand01
  6. propagate=0
  7. [handlers]
  8. keys=hand01
  9. [handler_hand01]
  10. class=handlers.RotatingFileHandler
  11. formatter=form01
  12. args=('pegasus.log', 'a', 100*1024*1024, 10)
  13. [formatters]
  14. keys=form01
  15. [formatter_form01]
  16. format=%(asctime)s [%(thread)d] [%(levelname)s] %(filename)s:%(lineno)d %(message)s
  17. datefmt=%Y-%m-%d %H:%M:%S

如果用户有定制需求,可以在自己的代码目录添加配置文件logger.conf

API说明

初始化

初始化先构造Pegasus对象,在使用init函数完成初始化:

  1. class Pegasus(object):
  2. """
  3. Pegasus client class.
  4. """
  5. def __init__(self, meta_addrs=None, table_name='',
  6. timeout=DEFAULT_TIMEOUT):
  7. """
  8. :param meta_addrs: (list) pagasus meta servers list.
  9. example: ['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603']
  10. :param table_name: (str) table name/app name used in pegasus.
  11. :param timeout: (int) default timeout in milliseconds when communicate with meta sever and replica server.
  12. """
  1. def init(self):
  2. """
  3. Initialize the client before you can use it.
  4. :return: (DeferredList) True when initialized succeed, others when failed.
  5. """

ttl

判断key的剩余的ttl时间

  1. def ttl(self, hash_key, sort_key, timeout=0):
  2. """
  3. Get ttl(time to live) of the data.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param sort_key: (str) which sort key used for this API.
  6. :param timeout: (int) how long will the operation timeout in milliseconds.
  7. if timeout > 0, it is a timeout value for current operation,
  8. else the timeout value specified to create the instance will be used.
  9. :return: (tuple<error_types.code.value, int>) (code, ttl)
  10. code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
  11. ttl: in seconds, -1 means forever.
  12. """

exist

判断key是否存在

  1. def exist(self, hash_key, sort_key, timeout=0):
  2. """
  3. Check value exist.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param sort_key: (str) which sort key used for this API.
  6. :param timeout: (int) how long will the operation timeout in milliseconds.
  7. if timeout > 0, it is a timeout value for current operation,
  8. else the timeout value specified to create the instance will be used.
  9. :return: (tuple<error_types.code.value, None>) (code, ign)
  10. code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
  11. ign: useless, should be ignored.
  12. """

set

插入一条数据(若已存在则会覆盖)

  1. def set(self, hash_key, sort_key, value, ttl=0, timeout=0):
  2. """
  3. Set value to be stored in <hash_key, sort_key>.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param sort_key: (str) which sort key used for this API.
  6. :param value: (str) value to be stored under <hash_key, sort_key>.
  7. :param ttl: (int) ttl(time to live) in seconds of this data.
  8. :param timeout: (int) how long will the operation timeout in milliseconds.
  9. if timeout > 0, it is a timeout value for current operation,
  10. else the timeout value specified to create the instance will be used.
  11. :return: (tuple<error_types.code.value, None>) (code, ign)
  12. code: error_types.ERR_OK.value when data stored succeed.
  13. ign: useless, should be ignored.
  14. """

multi_set

同时写一条hashkey的多条sortkey数据

  1. def multi_set(self, hash_key, sortkey_value_dict, ttl=0, timeout=0):
  2. """
  3. Set multiple sort_keys-values under hash_key to be stored.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param sortkey_value_dict: (dict) <sort_key, value> pairs in dict.
  6. :param ttl: (int) ttl(time to live) in seconds of these data.
  7. :param timeout: (int) how long will the operation timeout in milliseconds.
  8. if timeout > 0, it is a timeout value for current operation,
  9. else the timeout value specified to create the instance will be used.
  10. :return: (tuple<error_types.code.value, _>) (code, ign)
  11. code: error_types.ERR_OK.value when data stored succeed.
  12. ign: useless, should be ignored.
  13. """

get

获取一条数据

  1. def get(self, hash_key, sort_key, timeout=0):
  2. """
  3. Get value stored in <hash_key, sort_key>.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param sort_key: (str) which sort key used for this API.
  6. :param timeout: (int) how long will the operation timeout in milliseconds.
  7. if timeout > 0, it is a timeout value for current operation,
  8. else the timeout value specified to create the instance will be used.
  9. :return: (tuple<error_types.code.value, str>) (code, value).
  10. code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
  11. value: data stored in this <hash_key, sort_key>
  12. """

multi_get

同时读一条hashkey的多条sortkey数据

  1. def multi_get(self, hash_key,
  2. sortkey_set,
  3. max_kv_count=100,
  4. max_kv_size=1000000,
  5. no_value=False,
  6. timeout=0):
  7. """
  8. Get multiple values stored in <hash_key, sortkey> pairs.
  9. :param hash_key: (str) which hash key used for this API.
  10. :param sortkey_set: (set) sort keys in set.
  11. :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
  12. :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
  13. :param no_value: (bool) whether to fetch value of these keys.
  14. :param timeout: (int) how long will the operation timeout in milliseconds.
  15. if timeout > 0, it is a timeout value for current operation,
  16. else the timeout value specified to create the instance will be used.
  17. :return: (tuple<error_types.code.value, dict>) (code, kvs)
  18. code: error_types.ERR_OK.value when data got succeed.
  19. kvs: <sort_key, value> pairs in dict.
  20. """

multi_get_opt

同时读一条hashkey的多条sortkey数据, 读取的数据根据multi_get_options参数指定的模式确定。

  1. def multi_get_opt(self, hash_key,
  2. start_sort_key, stop_sort_key,
  3. multi_get_options,
  4. max_kv_count=100,
  5. max_kv_size=1000000,
  6. timeout=0):
  7. """
  8. Get multiple values stored in hash_key, and sort key range in [start_sort_key, stop_sort_key) as default.
  9. :param hash_key: (str) which hash key used for this API.
  10. :param start_sort_key: (str) returned k-v pairs is start from start_sort_key.
  11. :param stop_sort_key: (str) returned k-v pairs is stop at stop_sort_key.
  12. :param multi_get_options: (MultiGetOptions) configurable multi_get options.
  13. :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
  14. :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
  15. :param timeout: (int) how long will the operation timeout in milliseconds.
  16. if timeout > 0, it is a timeout value for current operation,
  17. else the timeout value specified to create the instance will be used.
  18. :return: (tuple<error_types.code.value, dict>) (code, kvs)
  19. code: error_types.ERR_OK.value when data got succeed.
  20. kvs: <sort_key, value> pairs in dict.
  21. """

其中,MultiGetOptions可以指定sortkey的范围、是否包含边界、子串匹配、是否返回value、是否逆序等,具体定义如下:

  1. class MultiGetOptions(object):
  2. """
  3. configurable options for multi_get.
  4. """
  5. def __init__(self):
  6. self.start_inclusive = True
  7. self.stop_inclusive = False
  8. self.sortkey_filter_type = filter_type.FT_NO_FILTER
  9. self.sortkey_filter_pattern = ""
  10. self.no_value = False
  11. self.reverse = False
  12. class filter_type:
  13. FT_NO_FILTER = 0
  14. FT_MATCH_ANYWHERE = 1
  15. FT_MATCH_PREFIX = 2
  16. FT_MATCH_POSTFIX = 3

remove

删除一条数据

  1. def remove(self, hash_key, sort_key, timeout=0):
  2. """
  3. Remove the entire <hash_key, sort_key>-value in pegasus.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param sort_key: (str) which sort key used for this API.
  6. :param timeout: (int) how long will the operation timeout in milliseconds.
  7. if timeout > 0, it is a timeout value for current operation,
  8. else the timeout value specified to create the instance will be used.
  9. :return: (tuple<error_types.code.value, None>) (code, ign)
  10. code: error_types.ERR_OK.value when data stored succeed.
  11. ign: useless, should be ignored.
  12. """

multi_del

批量删除一个hashkey下的多条sortkey数据

  1. def multi_del(self, hash_key, sortkey_set, timeout=0):
  2. """
  3. Remove multiple entire <hash_key, sort_key>-values in pegasus.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param sortkey_set: (set) sort keys in set.
  6. :param timeout: (int) how long will the operation timeout in milliseconds.
  7. if timeout > 0, it is a timeout value for current operation,
  8. else the timeout value specified to create the instance will be used.
  9. :return: (tuple<error_types.code.value, int>) (code, count).
  10. code: error_types.ERR_OK.value when data got succeed.
  11. count: count of deleted k-v pairs.
  12. """

sort_key_count

获取一个hashkey下的sortkey数量

  1. def sort_key_count(self, hash_key, timeout=0):
  2. """
  3. Get the total sort key count under the hash_key.
  4. :param hash_key: (str) which hash key used for this API.
  5. :param timeout: (int) how long will the operation timeout in milliseconds.
  6. if timeout > 0, it is a timeout value for current operation,
  7. else the timeout value specified to create the instance will be used.
  8. :return: (tuple<error_types.code.value, count>) (code, count)
  9. code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
  10. value: total sort key count under the hash_key.
  11. """

get_sort_keys

获取一个hashkey下的sortkey值

  1. def get_sort_keys(self, hash_key,
  2. max_kv_count=100,
  3. max_kv_size=1000000,
  4. timeout=0):
  5. """
  6. Get multiple sort keys under hash_key.
  7. :param hash_key: (str) which hash key used for this API.
  8. :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
  9. :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
  10. :param timeout: (int) how long will the operation timeout in milliseconds.
  11. if timeout > 0, it is a timeout value for current operation,
  12. else the timeout value specified to create the instance will be used.
  13. :return: (tuple<error_types.code.value, set>) (code, ks)
  14. code: error_types.ERR_OK.value when data got succeed.
  15. ks: <sort_key, ign> pairs in dict, ign will always be empty str.
  16. """

get_scanner

获取scanner对象,用于指定范围的数据扫描。可以通过scan_options参数指定扫描的模式。

  1. def get_scanner(self, hash_key,
  2. start_sort_key, stop_sort_key,
  3. scan_options):
  4. """
  5. Get scanner for hash_key, start from start_sort_key, and stop at stop_sort_key.
  6. Whether the scanner include the start_sort_key and stop_sort_key is configurable by scan_options
  7. :param hash_key: (str) which hash key used for this API.
  8. :param start_sort_key: (str) returned scanner is start from start_sort_key.
  9. :param stop_sort_key: (str) returned scanner is stop at stop_sort_key.
  10. :param scan_options: (ScanOptions) configurable scan options.
  11. :return: (PegasusScanner) scanner, instance of PegasusScanner.
  12. """

其中,ScanOptions可以指定是否包含边界、超时时间、一次从replica server批量获取的sortkey-value数量等,具体定义如下:

  1. class ScanOptions(object):
  2. """
  3. configurable options for scan.
  4. """
  5. def __init__(self):
  6. self.timeout_millis = 5000
  7. self.batch_size = 1000
  8. self.start_inclusive = True
  9. self.stop_inclusive = False
  10. self.snapshot = None # for future use

get_unordered_scanners

一次性获取多个scanner,用于整个table的数据扫描。可以通过scan_options参数指定扫描的模式。

  1. def get_unordered_scanners(self, max_split_count, scan_options):
  2. """
  3. Get scanners for the whole pegasus table.
  4. :param max_split_count: (int) max count of scanners will be returned.
  5. :param scan_options: (ScanOptions) configurable scan options.
  6. :return: (list) instance of PegasusScanner list.
  7. each scanner in this list can scan separate part of the whole pegasus table.
  8. """

scanner对象

用于数据扫描的对象,由get_scannerget_unordered_scanners返回。使用它的next函数执行扫描过程。

  1. class PegasusScanner(object):
  2. """
  3. Pegasus scanner class, used for scanning data in pegasus table.
  4. """

get_next

获取扫描得到的数据,需要循环执行,直到返回None结束扫描。

  1. def get_next(self):
  2. """
  3. scan the next k-v pair for the scanner.
  4. :return: (tuple<tuple<hash_key, sort_key>, value> or None)
  5. all the sort_keys returned by this API are in ascend order.
  6. """