ydb_store_kv.go 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. //go:build ydb
  2. // +build ydb
  3. package ydb
  4. import (
  5. "context"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/ydb-platform/ydb-go-sdk/v3/query"
  11. "github.com/ydb-platform/ydb-go-sdk/v3/table"
  12. "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
  13. )
  14. func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  15. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  16. fileMeta := FileMeta{dirHash, name, dirStr, value}
  17. return store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) {
  18. _, err = s.Query(ctx, *withPragma(&store.tablePathPrefix, upsertQuery),
  19. query.WithParameters(fileMeta.queryParameters(0)), rwQC)
  20. if err != nil {
  21. return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  22. }
  23. return nil
  24. }, query.WithIdempotent())
  25. }
  26. func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  27. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  28. valueFound := false
  29. err = store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) error {
  30. res, err := s.Query(ctx, *withPragma(&store.tablePathPrefix, findQuery),
  31. query.WithParameters(table.NewQueryParameters(
  32. table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
  33. table.ValueParam("$directory", types.UTF8Value(dirStr)),
  34. table.ValueParam("$name", types.UTF8Value(name)))), roQC)
  35. if err != nil {
  36. return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  37. }
  38. defer func() { _ = res.Close(ctx) }()
  39. for rs, err := range res.ResultSets(ctx) {
  40. if err != nil {
  41. return err
  42. }
  43. for row, err := range rs.Rows(ctx) {
  44. if err != nil {
  45. return err
  46. }
  47. if err := row.Scan(&value); err != nil {
  48. return fmt.Errorf("scan %s : %v", util.NewFullPath(dirStr, name).Name(), err)
  49. }
  50. valueFound = true
  51. return nil
  52. }
  53. }
  54. return nil
  55. }, query.WithIdempotent())
  56. if !valueFound {
  57. return nil, filer.ErrKvNotFound
  58. }
  59. return value, nil
  60. }
  61. func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) {
  62. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  63. return store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) {
  64. _, err = s.Query(ctx, *withPragma(&store.tablePathPrefix, deleteQuery),
  65. query.WithParameters(table.NewQueryParameters(
  66. table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
  67. table.ValueParam("$directory", types.UTF8Value(dirStr)),
  68. table.ValueParam("$name", types.UTF8Value(name)))), rwQC)
  69. if err != nil {
  70. return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  71. }
  72. return nil
  73. }, query.WithIdempotent())
  74. }