tarantool_store_kv.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. //go:build tarantool
  2. // +build tarantool
  3. package tarantool
  4. import (
  5. "context"
  6. "fmt"
  7. "reflect"
  8. "github.com/seaweedfs/seaweedfs/weed/filer"
  9. "github.com/tarantool/go-tarantool/v2/crud"
  10. "github.com/tarantool/go-tarantool/v2/pool"
  11. )
  12. const (
  13. tarantoolKVSpaceName = "key_value"
  14. )
  15. func (store *TarantoolStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  16. var operations = []crud.Operation{
  17. {
  18. Operator: crud.Insert,
  19. Field: "value",
  20. Value: string(value),
  21. },
  22. }
  23. req := crud.MakeUpsertRequest(tarantoolKVSpaceName).
  24. Tuple([]interface{}{string(key), nil, string(value)}).
  25. Operations(operations)
  26. ret := crud.Result{}
  27. if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil {
  28. return fmt.Errorf("kv put: %w", err)
  29. }
  30. return nil
  31. }
  32. func (store *TarantoolStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  33. getOpts := crud.GetOpts{
  34. Fields: crud.MakeOptTuple([]interface{}{"value"}),
  35. Mode: crud.MakeOptString("read"),
  36. PreferReplica: crud.MakeOptBool(true),
  37. Balance: crud.MakeOptBool(true),
  38. }
  39. req := crud.MakeGetRequest(tarantoolKVSpaceName).
  40. Key(crud.Tuple([]interface{}{string(key)})).
  41. Opts(getOpts)
  42. resp := crud.Result{}
  43. err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp)
  44. if err != nil {
  45. return nil, err
  46. }
  47. results, ok := resp.Rows.([]interface{})
  48. if !ok || len(results) != 1 {
  49. return nil, filer.ErrKvNotFound
  50. }
  51. rows, ok := results[0].([]interface{})
  52. if !ok || len(rows) != 1 {
  53. return nil, filer.ErrKvNotFound
  54. }
  55. row, ok := rows[0].(string)
  56. if !ok {
  57. return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0])
  58. }
  59. return []byte(row), nil
  60. }
  61. func (store *TarantoolStore) KvDelete(ctx context.Context, key []byte) (err error) {
  62. delOpts := crud.DeleteOpts{
  63. Noreturn: crud.MakeOptBool(true),
  64. }
  65. req := crud.MakeDeleteRequest(tarantoolKVSpaceName).
  66. Key(crud.Tuple([]interface{}{string(key)})).
  67. Opts(delOpts)
  68. if _, err := store.pool.Do(req, pool.RW).Get(); err != nil {
  69. return fmt.Errorf("kv delete: %w", err)
  70. }
  71. return nil
  72. }