tarantool_store.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. //go:build tarantool
  2. // +build tarantool
  3. package tarantool
  4. import (
  5. "context"
  6. "fmt"
  7. "reflect"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. weed_util "github.com/seaweedfs/seaweedfs/weed/util"
  15. "github.com/tarantool/go-tarantool/v2"
  16. "github.com/tarantool/go-tarantool/v2/crud"
  17. "github.com/tarantool/go-tarantool/v2/pool"
  18. )
  19. const (
  20. tarantoolSpaceName = "filer_metadata"
  21. )
  22. func init() {
  23. filer.Stores = append(filer.Stores, &TarantoolStore{})
  24. }
  25. type TarantoolStore struct {
  26. pool *pool.ConnectionPool
  27. }
  28. func (store *TarantoolStore) GetName() string {
  29. return "tarantool"
  30. }
  31. func (store *TarantoolStore) Initialize(configuration weed_util.Configuration, prefix string) error {
  32. configuration.SetDefault(prefix+"address", "localhost:3301")
  33. configuration.SetDefault(prefix+"user", "guest")
  34. configuration.SetDefault(prefix+"password", "")
  35. configuration.SetDefault(prefix+"timeout", "5s")
  36. configuration.SetDefault(prefix+"maxReconnects", "1000")
  37. address := configuration.GetString(prefix + "address")
  38. user := configuration.GetString(prefix + "user")
  39. password := configuration.GetString(prefix + "password")
  40. timeoutStr := configuration.GetString(prefix + "timeout")
  41. timeout, err := time.ParseDuration(timeoutStr)
  42. if err != nil {
  43. return fmt.Errorf("parse tarantool store timeout: %w", err)
  44. }
  45. maxReconnects := configuration.GetInt(prefix + "maxReconnects")
  46. if maxReconnects < 0 {
  47. return fmt.Errorf("maxReconnects is negative")
  48. }
  49. addresses := strings.Split(address, ",")
  50. return store.initialize(addresses, user, password, timeout, uint(maxReconnects))
  51. }
  52. func (store *TarantoolStore) initialize(addresses []string, user string, password string, timeout time.Duration, maxReconnects uint) error {
  53. opts := tarantool.Opts{
  54. Timeout: timeout,
  55. Reconnect: time.Second,
  56. MaxReconnects: maxReconnects,
  57. }
  58. poolInstances := makePoolInstances(addresses, user, password, opts)
  59. poolOpts := pool.Opts{
  60. CheckTimeout: time.Second,
  61. }
  62. ctx := context.Background()
  63. p, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts)
  64. if err != nil {
  65. return fmt.Errorf("Can't create connection pool: %w", err)
  66. }
  67. _, err = p.Do(tarantool.NewPingRequest(), pool.ANY).Get()
  68. if err != nil {
  69. return err
  70. }
  71. store.pool = p
  72. return nil
  73. }
  74. func makePoolInstances(addresses []string, user string, password string, opts tarantool.Opts) []pool.Instance {
  75. poolInstances := make([]pool.Instance, 0, len(addresses))
  76. for i, address := range addresses {
  77. poolInstances = append(poolInstances, makePoolInstance(address, user, password, opts, i))
  78. }
  79. return poolInstances
  80. }
  81. func makePoolInstance(address string, user string, password string, opts tarantool.Opts, serial int) pool.Instance {
  82. return pool.Instance{
  83. Name: fmt.Sprintf("instance%d", serial),
  84. Dialer: tarantool.NetDialer{
  85. Address: address,
  86. User: user,
  87. Password: password,
  88. },
  89. Opts: opts,
  90. }
  91. }
  92. func (store *TarantoolStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  93. return ctx, nil
  94. }
  95. func (store *TarantoolStore) CommitTransaction(ctx context.Context) error {
  96. return nil
  97. }
  98. func (store *TarantoolStore) RollbackTransaction(ctx context.Context) error {
  99. return nil
  100. }
  101. func (store *TarantoolStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  102. dir, name := entry.FullPath.DirAndName()
  103. meta, err := entry.EncodeAttributesAndChunks()
  104. if err != nil {
  105. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  106. }
  107. if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
  108. meta = util.MaybeGzipData(meta)
  109. }
  110. var ttl int64
  111. if entry.TtlSec > 0 {
  112. ttl = time.Now().Unix() + int64(entry.TtlSec)
  113. } else {
  114. ttl = 0
  115. }
  116. var operations = []crud.Operation{
  117. {
  118. Operator: crud.Insert,
  119. Field: "data",
  120. Value: string(meta),
  121. },
  122. }
  123. req := crud.MakeUpsertRequest(tarantoolSpaceName).
  124. Tuple([]interface{}{dir, nil, name, ttl, string(meta)}).
  125. Operations(operations)
  126. ret := crud.Result{}
  127. if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil {
  128. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  129. }
  130. return nil
  131. }
  132. func (store *TarantoolStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  133. return store.InsertEntry(ctx, entry)
  134. }
  135. func (store *TarantoolStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  136. dir, name := fullpath.DirAndName()
  137. findEntryGetOpts := crud.GetOpts{
  138. Fields: crud.MakeOptTuple([]interface{}{"data"}),
  139. Mode: crud.MakeOptString("read"),
  140. PreferReplica: crud.MakeOptBool(true),
  141. Balance: crud.MakeOptBool(true),
  142. }
  143. req := crud.MakeGetRequest(tarantoolSpaceName).
  144. Key(crud.Tuple([]interface{}{dir, name})).
  145. Opts(findEntryGetOpts)
  146. resp := crud.Result{}
  147. err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp)
  148. if err != nil {
  149. return nil, err
  150. }
  151. results, ok := resp.Rows.([]interface{})
  152. if !ok || len(results) != 1 {
  153. return nil, filer_pb.ErrNotFound
  154. }
  155. rows, ok := results[0].([]interface{})
  156. if !ok || len(rows) != 1 {
  157. return nil, filer_pb.ErrNotFound
  158. }
  159. row, ok := rows[0].(string)
  160. if !ok {
  161. return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0])
  162. }
  163. entry = &filer.Entry{
  164. FullPath: fullpath,
  165. }
  166. err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData([]byte(row)))
  167. if err != nil {
  168. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  169. }
  170. return entry, nil
  171. }
  172. func (store *TarantoolStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  173. dir, name := fullpath.DirAndName()
  174. delOpts := crud.DeleteOpts{
  175. Noreturn: crud.MakeOptBool(true),
  176. }
  177. req := crud.MakeDeleteRequest(tarantoolSpaceName).
  178. Key(crud.Tuple([]interface{}{dir, name})).
  179. Opts(delOpts)
  180. if _, err := store.pool.Do(req, pool.RW).Get(); err != nil {
  181. return fmt.Errorf("delete %s : %v", fullpath, err)
  182. }
  183. return nil
  184. }
  185. func (store *TarantoolStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  186. req := tarantool.NewCallRequest("filer_metadata.delete_by_directory_idx").
  187. Args([]interface{}{fullpath})
  188. if _, err := store.pool.Do(req, pool.RW).Get(); err != nil {
  189. return fmt.Errorf("delete %s : %v", fullpath, err)
  190. }
  191. return nil
  192. }
  193. func (store *TarantoolStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  194. return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
  195. }
  196. func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  197. req := tarantool.NewCallRequest("filer_metadata.find_by_directory_idx_and_name").
  198. Args([]interface{}{string(dirPath), startFileName, includeStartFile, limit})
  199. results, err := store.pool.Do(req, pool.PreferRO).Get()
  200. if err != nil {
  201. return
  202. }
  203. if len(results) < 1 {
  204. glog.ErrorfCtx(ctx, "Can't find results, data is empty")
  205. return
  206. }
  207. rows, ok := results[0].([]interface{})
  208. if !ok {
  209. glog.ErrorfCtx(ctx, "Can't convert results[0] to list")
  210. return
  211. }
  212. for _, result := range rows {
  213. row, ok := result.([]interface{})
  214. if !ok {
  215. glog.ErrorfCtx(ctx, "Can't convert result to list")
  216. return
  217. }
  218. if len(row) < 5 {
  219. glog.ErrorfCtx(ctx, "Length of result is less than needed: %v", len(row))
  220. return
  221. }
  222. nameRaw := row[2]
  223. name, ok := nameRaw.(string)
  224. if !ok {
  225. glog.ErrorfCtx(ctx, "Can't convert name field to string. Actual type: %v, value: %v", reflect.TypeOf(nameRaw), nameRaw)
  226. return
  227. }
  228. dataRaw := row[4]
  229. data, ok := dataRaw.(string)
  230. if !ok {
  231. glog.ErrorfCtx(ctx, "Can't convert data field to string. Actual type: %v, value: %v", reflect.TypeOf(dataRaw), dataRaw)
  232. return
  233. }
  234. entry := &filer.Entry{
  235. FullPath: util.NewFullPath(string(dirPath), name),
  236. }
  237. lastFileName = name
  238. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))); decodeErr != nil {
  239. err = decodeErr
  240. glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
  241. break
  242. }
  243. if !eachEntryFunc(entry) {
  244. break
  245. }
  246. }
  247. return lastFileName, err
  248. }
  249. func (store *TarantoolStore) Shutdown() {
  250. store.pool.Close()
  251. }