topology_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package topology
  2. import (
  3. "reflect"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/sequence"
  6. "github.com/seaweedfs/seaweedfs/weed/storage"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. "testing"
  12. )
  13. func TestRemoveDataCenter(t *testing.T) {
  14. topo := setup(topologyLayout)
  15. topo.UnlinkChildNode(NodeId("dc2"))
  16. if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 15 {
  17. t.Fail()
  18. }
  19. topo.UnlinkChildNode(NodeId("dc3"))
  20. if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 12 {
  21. t.Fail()
  22. }
  23. }
  24. func TestHandlingVolumeServerHeartbeat(t *testing.T) {
  25. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  26. dc := topo.GetOrCreateDataCenter("dc1")
  27. rack := dc.GetOrCreateRack("rack1")
  28. maxVolumeCounts := make(map[string]uint32)
  29. maxVolumeCounts[""] = 25
  30. maxVolumeCounts["ssd"] = 12
  31. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
  32. {
  33. volumeCount := 7
  34. var volumeMessages []*master_pb.VolumeInformationMessage
  35. for k := 1; k <= volumeCount; k++ {
  36. volumeMessage := &master_pb.VolumeInformationMessage{
  37. Id: uint32(k),
  38. Size: uint64(25432),
  39. Collection: "",
  40. FileCount: uint64(2343),
  41. DeleteCount: uint64(345),
  42. DeletedByteCount: 34524,
  43. ReadOnly: false,
  44. ReplicaPlacement: uint32(0),
  45. Version: uint32(needle.GetCurrentVersion()),
  46. Ttl: 0,
  47. }
  48. volumeMessages = append(volumeMessages, volumeMessage)
  49. }
  50. for k := 1; k <= volumeCount; k++ {
  51. volumeMessage := &master_pb.VolumeInformationMessage{
  52. Id: uint32(volumeCount + k),
  53. Size: uint64(25432),
  54. Collection: "",
  55. FileCount: uint64(2343),
  56. DeleteCount: uint64(345),
  57. DeletedByteCount: 34524,
  58. ReadOnly: false,
  59. ReplicaPlacement: uint32(0),
  60. Version: uint32(needle.GetCurrentVersion()),
  61. Ttl: 0,
  62. DiskType: "ssd",
  63. }
  64. volumeMessages = append(volumeMessages, volumeMessage)
  65. }
  66. topo.SyncDataNodeRegistration(volumeMessages, dn)
  67. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  68. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
  69. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
  70. assert(t, "ssdVolumeCount", int(topo.diskUsages.usages[types.SsdType].volumeCount), volumeCount)
  71. }
  72. {
  73. volumeCount := 7 - 1
  74. var volumeMessages []*master_pb.VolumeInformationMessage
  75. for k := 1; k <= volumeCount; k++ {
  76. volumeMessage := &master_pb.VolumeInformationMessage{
  77. Id: uint32(k),
  78. Size: uint64(254320),
  79. Collection: "",
  80. FileCount: uint64(2343),
  81. DeleteCount: uint64(345),
  82. DeletedByteCount: 345240,
  83. ReadOnly: false,
  84. ReplicaPlacement: uint32(0),
  85. Version: uint32(needle.GetCurrentVersion()),
  86. Ttl: 0,
  87. }
  88. volumeMessages = append(volumeMessages, volumeMessage)
  89. }
  90. topo.SyncDataNodeRegistration(volumeMessages, dn)
  91. //rp, _ := storage.NewReplicaPlacementFromString("000")
  92. //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
  93. //assert(t, "writables", len(layout.writables), volumeCount)
  94. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  95. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
  96. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
  97. }
  98. {
  99. volumeCount := 6
  100. newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
  101. Id: uint32(3),
  102. Collection: "",
  103. ReplicaPlacement: uint32(0),
  104. Version: uint32(needle.GetCurrentVersion()),
  105. Ttl: 0,
  106. }
  107. topo.IncrementalSyncDataNodeRegistration(
  108. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  109. nil,
  110. dn)
  111. rp, _ := super_block.NewReplicaPlacementFromString("000")
  112. layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
  113. assert(t, "writables after repeated add", len(layout.writables), volumeCount)
  114. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  115. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
  116. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
  117. topo.IncrementalSyncDataNodeRegistration(
  118. nil,
  119. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  120. dn)
  121. assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
  122. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount-1)
  123. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount-1)
  124. topo.IncrementalSyncDataNodeRegistration(
  125. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  126. nil,
  127. dn)
  128. for vid := range layout.vid2location {
  129. println("after add volume id", vid)
  130. }
  131. for _, vid := range layout.writables {
  132. println("after add writable volume id", vid)
  133. }
  134. assert(t, "writables after add back", len(layout.writables), volumeCount)
  135. }
  136. topo.UnRegisterDataNode(dn)
  137. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  138. assert(t, "activeVolumeCount2", int(usageCounts.activeVolumeCount), 0)
  139. }
  140. func assert(t *testing.T, message string, actual, expected int) {
  141. if actual != expected {
  142. t.Fatalf("unexpected %s: %d, expected: %d", message, actual, expected)
  143. }
  144. }
  145. func TestAddRemoveVolume(t *testing.T) {
  146. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  147. dc := topo.GetOrCreateDataCenter("dc1")
  148. rack := dc.GetOrCreateRack("rack1")
  149. maxVolumeCounts := make(map[string]uint32)
  150. maxVolumeCounts[""] = 25
  151. maxVolumeCounts["ssd"] = 12
  152. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
  153. v := storage.VolumeInfo{
  154. Id: needle.VolumeId(1),
  155. Size: 100,
  156. Collection: "xcollection",
  157. DiskType: "ssd",
  158. FileCount: 123,
  159. DeleteCount: 23,
  160. DeletedByteCount: 45,
  161. ReadOnly: false,
  162. Version: needle.GetCurrentVersion(),
  163. ReplicaPlacement: &super_block.ReplicaPlacement{},
  164. Ttl: needle.EMPTY_TTL,
  165. }
  166. dn.UpdateVolumes([]storage.VolumeInfo{v})
  167. topo.RegisterVolumeLayout(v, dn)
  168. topo.RegisterVolumeLayout(v, dn)
  169. if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
  170. t.Errorf("collection %v should exist", v.Collection)
  171. }
  172. topo.UnRegisterVolumeLayout(v, dn)
  173. if _, hasCollection := topo.FindCollection(v.Collection); hasCollection {
  174. t.Errorf("collection %v should not exist", v.Collection)
  175. }
  176. }
  177. func TestListCollections(t *testing.T) {
  178. rp, _ := super_block.NewReplicaPlacementFromString("002")
  179. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  180. dc := topo.GetOrCreateDataCenter("dc1")
  181. rack := dc.GetOrCreateRack("rack1")
  182. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil)
  183. topo.RegisterVolumeLayout(storage.VolumeInfo{
  184. Id: needle.VolumeId(1111),
  185. ReplicaPlacement: rp,
  186. }, dn)
  187. topo.RegisterVolumeLayout(storage.VolumeInfo{
  188. Id: needle.VolumeId(2222),
  189. ReplicaPlacement: rp,
  190. Collection: "vol_collection_a",
  191. }, dn)
  192. topo.RegisterVolumeLayout(storage.VolumeInfo{
  193. Id: needle.VolumeId(3333),
  194. ReplicaPlacement: rp,
  195. Collection: "vol_collection_b",
  196. }, dn)
  197. topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{
  198. VolumeId: needle.VolumeId(4444),
  199. Collection: "ec_collection_a",
  200. }, dn)
  201. topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{
  202. VolumeId: needle.VolumeId(5555),
  203. Collection: "ec_collection_b",
  204. }, dn)
  205. testCases := []struct {
  206. name string
  207. includeNormalVolumes bool
  208. includeEcVolumes bool
  209. want []string
  210. }{
  211. {
  212. name: "no volume types selected",
  213. includeNormalVolumes: false,
  214. includeEcVolumes: false,
  215. want: nil,
  216. }, {
  217. name: "normal volumes",
  218. includeNormalVolumes: true,
  219. includeEcVolumes: false,
  220. want: []string{"", "vol_collection_a", "vol_collection_b"},
  221. }, {
  222. name: "EC volumes",
  223. includeNormalVolumes: false,
  224. includeEcVolumes: true,
  225. want: []string{"ec_collection_a", "ec_collection_b"},
  226. }, {
  227. name: "normal + EC volumes",
  228. includeNormalVolumes: true,
  229. includeEcVolumes: true,
  230. want: []string{"", "ec_collection_a", "ec_collection_b", "vol_collection_a", "vol_collection_b"},
  231. },
  232. }
  233. for _, tc := range testCases {
  234. t.Run(tc.name, func(t *testing.T) {
  235. got := topo.ListCollections(tc.includeNormalVolumes, tc.includeEcVolumes)
  236. if !reflect.DeepEqual(got, tc.want) {
  237. t.Errorf("got %v, want %v", got, tc.want)
  238. }
  239. })
  240. }
  241. }