| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- package topology
- import (
- "encoding/json"
- "fmt"
- "math/rand/v2"
- "reflect"
- "sync"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/server/constants"
- "google.golang.org/grpc"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/storage"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- )
- /*
- This package is created to resolve these replica placement issues:
- 1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
- 2. in time of tight storage, how to reduce replica level
- 3. optimizing for hot data on faster disk, cold data on cheaper storage,
- 4. volume allocation for each bucket
- */
- type VolumeGrowRequest struct {
- Option *VolumeGrowOption
- Count uint32
- Force bool
- Reason string
- }
- func (vg *VolumeGrowRequest) Equals(req *VolumeGrowRequest) bool {
- return reflect.DeepEqual(vg.Option, req.Option) && vg.Count == req.Count && vg.Force == req.Force
- }
- type volumeGrowthStrategy struct {
- Copy1Count uint32
- Copy2Count uint32
- Copy3Count uint32
- CopyOtherCount uint32
- Threshold float64
- }
- var (
- VolumeGrowStrategy = volumeGrowthStrategy{
- Copy1Count: 7,
- Copy2Count: 6,
- Copy3Count: 3,
- CopyOtherCount: 1,
- Threshold: 0.9,
- }
- )
- type VolumeGrowOption struct {
- Collection string `json:"collection,omitempty"`
- ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"`
- Ttl *needle.TTL `json:"ttl,omitempty"`
- DiskType types.DiskType `json:"disk,omitempty"`
- Preallocate int64 `json:"preallocate,omitempty"`
- DataCenter string `json:"dataCenter,omitempty"`
- Rack string `json:"rack,omitempty"`
- DataNode string `json:"dataNode,omitempty"`
- MemoryMapMaxSizeMb uint32 `json:"memoryMapMaxSizeMb,omitempty"`
- Version uint32 `json:"version,omitempty"`
- }
- type VolumeGrowth struct {
- accessLock sync.Mutex
- }
- // VolumeGrowReservation tracks capacity reservations for a volume creation operation
- type VolumeGrowReservation struct {
- servers []*DataNode
- reservationIds []string
- diskType types.DiskType
- }
- // releaseAllReservations releases all reservations in this volume grow operation
- func (vgr *VolumeGrowReservation) releaseAllReservations() {
- for i, server := range vgr.servers {
- if i < len(vgr.reservationIds) && vgr.reservationIds[i] != "" {
- server.ReleaseReservedCapacity(vgr.reservationIds[i])
- }
- }
- }
- func (o *VolumeGrowOption) String() string {
- blob, _ := json.Marshal(o)
- return string(blob)
- }
- func NewDefaultVolumeGrowth() *VolumeGrowth {
- return &VolumeGrowth{}
- }
- // one replication type may need rp.GetCopyCount() actual volumes
- // given copyCount, how many logical volumes to create
- func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count uint32) {
- switch copyCount {
- case 1:
- count = VolumeGrowStrategy.Copy1Count
- case 2:
- count = VolumeGrowStrategy.Copy2Count
- case 3:
- count = VolumeGrowStrategy.Copy3Count
- default:
- count = VolumeGrowStrategy.CopyOtherCount
- }
- return
- }
- func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount uint32) (result []*master_pb.VolumeLocation, err error) {
- if targetCount == 0 {
- targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
- }
- result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
- if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 {
- return result, nil
- }
- return result, err
- }
- func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount uint32, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) {
- vg.accessLock.Lock()
- defer vg.accessLock.Unlock()
- for i := uint32(0); i < targetCount; i++ {
- if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
- result = append(result, res...)
- } else {
- glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e)
- return result, e
- }
- }
- return
- }
- func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
- servers, reservation, e := vg.findEmptySlotsForOneVolume(topo, option, true) // use reservations
- if e != nil {
- return nil, e
- }
- // Ensure reservations are released if anything goes wrong
- defer func() {
- if err != nil && reservation != nil {
- reservation.releaseAllReservations()
- }
- }()
- for !topo.LastLeaderChangeTime.Add(constants.VolumePulseSeconds * 2).Before(time.Now()) {
- glog.V(0).Infof("wait for volume servers to join back")
- time.Sleep(constants.VolumePulseSeconds / 2)
- }
- vid, raftErr := topo.NextVolumeId()
- if raftErr != nil {
- return nil, raftErr
- }
- if err = vg.grow(grpcDialOption, topo, vid, option, reservation, servers...); err == nil {
- for _, server := range servers {
- result = append(result, &master_pb.VolumeLocation{
- Url: server.Url(),
- PublicUrl: server.PublicUrl,
- DataCenter: server.GetDataCenterId(),
- GrpcPort: uint32(server.GrpcPort),
- NewVids: []uint32{uint32(vid)},
- })
- }
- }
- return
- }
- // 1. find the main data node
- // 1.1 collect all data nodes that have 1 slots
- // 2.2 collect all racks that have rp.SameRackCount+1
- // 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
- // 2. find rest data nodes
- // If useReservations is true, reserves capacity on each server and returns reservation info
- func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, useReservations bool) (servers []*DataNode, reservation *VolumeGrowReservation, err error) {
- //find main datacenter and other data centers
- rp := option.ReplicaPlacement
- // Track tentative reservations to make the process atomic
- var tentativeReservation *VolumeGrowReservation
- // Select appropriate functions based on useReservations flag
- var availableSpaceFunc func(Node, *VolumeGrowOption) int64
- var reserveOneVolumeFunc func(Node, int64, *VolumeGrowOption) (*DataNode, error)
- if useReservations {
- // Initialize tentative reservation tracking
- tentativeReservation = &VolumeGrowReservation{
- servers: make([]*DataNode, 0),
- reservationIds: make([]string, 0),
- diskType: option.DiskType,
- }
- // For reservations, we make actual reservations during node selection
- availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 {
- return node.AvailableSpaceForReservation(option)
- }
- reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) {
- return node.ReserveOneVolumeForReservation(r, option)
- }
- } else {
- availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 {
- return node.AvailableSpaceFor(option)
- }
- reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) {
- return node.ReserveOneVolume(r, option)
- }
- }
- // Ensure cleanup of partial reservations on error
- defer func() {
- if err != nil && tentativeReservation != nil {
- tentativeReservation.releaseAllReservations()
- }
- }()
- mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error {
- if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
- return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
- }
- if len(node.Children()) < rp.DiffRackCount+1 {
- return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
- }
- if availableSpaceFunc(node, option) < int64(rp.DiffRackCount+rp.SameRackCount+1) {
- return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.DiffRackCount+rp.SameRackCount+1)
- }
- possibleRacksCount := 0
- for _, rack := range node.Children() {
- possibleDataNodesCount := 0
- for _, n := range rack.Children() {
- if availableSpaceFunc(n, option) >= 1 {
- possibleDataNodesCount++
- }
- }
- if possibleDataNodesCount >= rp.SameRackCount+1 {
- possibleRacksCount++
- }
- }
- if possibleRacksCount < rp.DiffRackCount+1 {
- return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
- }
- return nil
- })
- if dc_err != nil {
- return nil, nil, dc_err
- }
- //find main rack and other racks
- mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, option, func(node Node) error {
- if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
- return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
- }
- if availableSpaceFunc(node, option) < int64(rp.SameRackCount+1) {
- return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.SameRackCount+1)
- }
- if len(node.Children()) < rp.SameRackCount+1 {
- // a bit faster way to test free racks
- return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
- }
- possibleDataNodesCount := 0
- for _, n := range node.Children() {
- if availableSpaceFunc(n, option) >= 1 {
- possibleDataNodesCount++
- }
- }
- if possibleDataNodesCount < rp.SameRackCount+1 {
- return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
- }
- return nil
- })
- if rackErr != nil {
- return nil, nil, rackErr
- }
- //find main server and other servers
- mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error {
- if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
- return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
- }
- if useReservations {
- // For reservations, atomically check and reserve capacity
- if node.IsDataNode() {
- reservationId, success := node.TryReserveCapacity(option.DiskType, 1)
- if !success {
- return fmt.Errorf("Cannot reserve capacity on node %s", node.Id())
- }
- // Track the reservation for later cleanup if needed
- tentativeReservation.servers = append(tentativeReservation.servers, node.(*DataNode))
- tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId)
- } else if availableSpaceFunc(node, option) < 1 {
- return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1)
- }
- } else if availableSpaceFunc(node, option) < 1 {
- return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1)
- }
- return nil
- })
- if serverErr != nil {
- return nil, nil, serverErr
- }
- servers = append(servers, mainServer.(*DataNode))
- for _, server := range otherServers {
- servers = append(servers, server.(*DataNode))
- }
- for _, rack := range otherRacks {
- r := rand.Int64N(availableSpaceFunc(rack, option))
- if server, e := reserveOneVolumeFunc(rack, r, option); e == nil {
- servers = append(servers, server)
- // If using reservations, also make a reservation on the selected server
- if useReservations {
- reservationId, success := server.TryReserveCapacity(option.DiskType, 1)
- if !success {
- return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other rack", server.Id())
- }
- tentativeReservation.servers = append(tentativeReservation.servers, server)
- tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId)
- }
- } else {
- return servers, nil, e
- }
- }
- for _, datacenter := range otherDataCenters {
- r := rand.Int64N(availableSpaceFunc(datacenter, option))
- if server, e := reserveOneVolumeFunc(datacenter, r, option); e == nil {
- servers = append(servers, server)
- // If using reservations, also make a reservation on the selected server
- if useReservations {
- reservationId, success := server.TryReserveCapacity(option.DiskType, 1)
- if !success {
- return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other datacenter", server.Id())
- }
- tentativeReservation.servers = append(tentativeReservation.servers, server)
- tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId)
- }
- } else {
- return servers, nil, e
- }
- }
- // If reservations were made, return the tentative reservation
- if useReservations && tentativeReservation != nil {
- reservation = tentativeReservation
- glog.V(1).Infof("Successfully reserved capacity on %d servers for volume creation", len(servers))
- }
- return servers, reservation, nil
- }
- // grow creates volumes on the provided servers, optionally managing capacity reservations
- func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, reservation *VolumeGrowReservation, servers ...*DataNode) (growErr error) {
- var createdVolumes []storage.VolumeInfo
- for _, server := range servers {
- if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
- createdVolumes = append(createdVolumes, storage.VolumeInfo{
- Id: vid,
- Size: 0,
- Collection: option.Collection,
- ReplicaPlacement: option.ReplicaPlacement,
- Ttl: option.Ttl,
- Version: needle.Version(option.Version),
- DiskType: option.DiskType.String(),
- ModifiedAtSecond: time.Now().Unix(),
- })
- glog.V(0).Infof("Created Volume %d on %s", vid, server.NodeImpl.String())
- } else {
- glog.Warningf("Failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
- growErr = fmt.Errorf("failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
- break
- }
- }
- if growErr == nil {
- for i, vi := range createdVolumes {
- server := servers[i]
- server.AddOrUpdateVolume(vi)
- topo.RegisterVolumeLayout(vi, server)
- glog.V(0).Infof("Registered Volume %d on %s", vid, server.NodeImpl.String())
- }
- // Release reservations on success since volumes are now registered
- if reservation != nil {
- reservation.releaseAllReservations()
- }
- } else {
- // cleaning up created volume replicas
- for i, vi := range createdVolumes {
- server := servers[i]
- if err := DeleteVolume(server, grpcDialOption, vi.Id); err != nil {
- glog.Warningf("Failed to clean up volume %d on %s", vid, server.NodeImpl.String())
- }
- }
- // Reservations will be released by the caller in case of failure
- }
- return growErr
- }
|