vid_map.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package wdclient
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "math/rand"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. )
  14. const (
  15. maxCursorIndex = 4096
  16. )
  17. type HasLookupFileIdFunction interface {
  18. GetLookupFileIdFunction() LookupFileIdFunctionType
  19. }
  20. type LookupFileIdFunctionType func(ctx context.Context, fileId string) (targetUrls []string, err error)
  21. type Location struct {
  22. Url string `json:"url,omitempty"`
  23. PublicUrl string `json:"publicUrl,omitempty"`
  24. DataCenter string `json:"dataCenter,omitempty"`
  25. GrpcPort int `json:"grpcPort,omitempty"`
  26. }
  27. func (l Location) ServerAddress() pb.ServerAddress {
  28. return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort)
  29. }
  30. type vidMap struct {
  31. sync.RWMutex
  32. vid2Locations map[uint32][]Location
  33. ecVid2Locations map[uint32][]Location
  34. DataCenter string
  35. cursor int32
  36. cache *vidMap
  37. }
  38. func newVidMap(dataCenter string) *vidMap {
  39. return &vidMap{
  40. vid2Locations: make(map[uint32][]Location),
  41. ecVid2Locations: make(map[uint32][]Location),
  42. DataCenter: dataCenter,
  43. cursor: -1,
  44. }
  45. }
  46. func (vc *vidMap) getLocationIndex(length int) (int, error) {
  47. if length <= 0 {
  48. return 0, fmt.Errorf("invalid length: %d", length)
  49. }
  50. if atomic.LoadInt32(&vc.cursor) == maxCursorIndex {
  51. atomic.CompareAndSwapInt32(&vc.cursor, maxCursorIndex, -1)
  52. }
  53. return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil
  54. }
  55. func (vc *vidMap) isSameDataCenter(loc *Location) bool {
  56. if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
  57. return false
  58. }
  59. return true
  60. }
  61. func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
  62. id, err := strconv.Atoi(vid)
  63. if err != nil {
  64. glog.V(1).Infof("Unknown volume id %s", vid)
  65. return nil, err
  66. }
  67. locations, found := vc.GetLocations(uint32(id))
  68. if !found {
  69. return nil, fmt.Errorf("volume %d not found", id)
  70. }
  71. var sameDcServers, otherDcServers []string
  72. for _, loc := range locations {
  73. if vc.isSameDataCenter(&loc) {
  74. sameDcServers = append(sameDcServers, loc.Url)
  75. } else {
  76. otherDcServers = append(otherDcServers, loc.Url)
  77. }
  78. }
  79. rand.Shuffle(len(sameDcServers), func(i, j int) {
  80. sameDcServers[i], sameDcServers[j] = sameDcServers[j], sameDcServers[i]
  81. })
  82. rand.Shuffle(len(otherDcServers), func(i, j int) {
  83. otherDcServers[i], otherDcServers[j] = otherDcServers[j], otherDcServers[i]
  84. })
  85. // Prefer same data center
  86. serverUrls = append(sameDcServers, otherDcServers...)
  87. return
  88. }
  89. func (vc *vidMap) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
  90. parts := strings.Split(fileId, ",")
  91. if len(parts) != 2 {
  92. return nil, errors.New("Invalid fileId " + fileId)
  93. }
  94. serverUrls, lookupError := vc.LookupVolumeServerUrl(parts[0])
  95. if lookupError != nil {
  96. return nil, lookupError
  97. }
  98. for _, serverUrl := range serverUrls {
  99. fullUrls = append(fullUrls, "http://"+serverUrl+"/"+fileId)
  100. }
  101. return
  102. }
  103. func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) {
  104. id, err := strconv.Atoi(vid)
  105. if err != nil {
  106. glog.V(1).Infof("Unknown volume id %s", vid)
  107. return nil, fmt.Errorf("Unknown volume id %s", vid)
  108. }
  109. foundLocations, found := vc.GetLocations(uint32(id))
  110. if found {
  111. return foundLocations, nil
  112. }
  113. return nil, fmt.Errorf("volume id %s not found", vid)
  114. }
  115. func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
  116. // glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations)
  117. locations, found = vc.getLocations(vid)
  118. if found && len(locations) > 0 {
  119. return locations, found
  120. }
  121. if vc.cache != nil {
  122. return vc.cache.GetLocations(vid)
  123. }
  124. return nil, false
  125. }
  126. func (vc *vidMap) GetLocationsClone(vid uint32) (locations []Location, found bool) {
  127. locations, found = vc.GetLocations(vid)
  128. if found {
  129. // clone the locations in case the volume locations are changed below
  130. existingLocations := make([]Location, len(locations))
  131. copy(existingLocations, locations)
  132. return existingLocations, found
  133. }
  134. return nil, false
  135. }
  136. func (vc *vidMap) getLocations(vid uint32) (locations []Location, found bool) {
  137. vc.RLock()
  138. defer vc.RUnlock()
  139. locations, found = vc.vid2Locations[vid]
  140. if found && len(locations) > 0 {
  141. return
  142. }
  143. locations, found = vc.ecVid2Locations[vid]
  144. return
  145. }
  146. func (vc *vidMap) addLocation(vid uint32, location Location) {
  147. vc.Lock()
  148. defer vc.Unlock()
  149. glog.V(4).Infof("+ volume id %d: %+v", vid, location)
  150. locations, found := vc.vid2Locations[vid]
  151. if !found {
  152. vc.vid2Locations[vid] = []Location{location}
  153. return
  154. }
  155. for _, loc := range locations {
  156. if loc.Url == location.Url {
  157. return
  158. }
  159. }
  160. vc.vid2Locations[vid] = append(locations, location)
  161. }
  162. func (vc *vidMap) addEcLocation(vid uint32, location Location) {
  163. vc.Lock()
  164. defer vc.Unlock()
  165. glog.V(4).Infof("+ ec volume id %d: %+v", vid, location)
  166. locations, found := vc.ecVid2Locations[vid]
  167. if !found {
  168. vc.ecVid2Locations[vid] = []Location{location}
  169. return
  170. }
  171. for _, loc := range locations {
  172. if loc.Url == location.Url {
  173. return
  174. }
  175. }
  176. vc.ecVid2Locations[vid] = append(locations, location)
  177. }
  178. func (vc *vidMap) deleteLocation(vid uint32, location Location) {
  179. if vc.cache != nil {
  180. vc.cache.deleteLocation(vid, location)
  181. }
  182. vc.Lock()
  183. defer vc.Unlock()
  184. glog.V(4).Infof("- volume id %d: %+v", vid, location)
  185. locations, found := vc.vid2Locations[vid]
  186. if !found {
  187. return
  188. }
  189. for i, loc := range locations {
  190. if loc.Url == location.Url {
  191. vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
  192. break
  193. }
  194. }
  195. }
  196. func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
  197. if vc.cache != nil {
  198. vc.cache.deleteLocation(vid, location)
  199. }
  200. vc.Lock()
  201. defer vc.Unlock()
  202. glog.V(4).Infof("- ec volume id %d: %+v", vid, location)
  203. locations, found := vc.ecVid2Locations[vid]
  204. if !found {
  205. return
  206. }
  207. for i, loc := range locations {
  208. if loc.Url == location.Url {
  209. vc.ecVid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
  210. break
  211. }
  212. }
  213. }