admin_server.go 59 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985
  1. package dash
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "net/http"
  7. "strconv"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
  11. "github.com/seaweedfs/seaweedfs/weed/cluster"
  12. "github.com/seaweedfs/seaweedfs/weed/credential"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  20. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. "github.com/seaweedfs/seaweedfs/weed/util"
  23. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  24. "google.golang.org/grpc"
  25. "github.com/seaweedfs/seaweedfs/weed/s3api"
  26. "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
  27. )
  28. type AdminServer struct {
  29. masterClient *wdclient.MasterClient
  30. templateFS http.FileSystem
  31. dataDir string
  32. grpcDialOption grpc.DialOption
  33. cacheExpiration time.Duration
  34. lastCacheUpdate time.Time
  35. cachedTopology *ClusterTopology
  36. // Filer discovery and caching
  37. cachedFilers []string
  38. lastFilerUpdate time.Time
  39. filerCacheExpiration time.Duration
  40. // Credential management
  41. credentialManager *credential.CredentialManager
  42. // Configuration persistence
  43. configPersistence *ConfigPersistence
  44. // Maintenance system
  45. maintenanceManager *maintenance.MaintenanceManager
  46. // Topic retention purger
  47. topicRetentionPurger *TopicRetentionPurger
  48. // Worker gRPC server
  49. workerGrpcServer *WorkerGrpcServer
  50. }
  51. // Type definitions moved to types.go
  52. func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string) *AdminServer {
  53. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.admin")
  54. // Create master client with multiple master support
  55. masterClient := wdclient.NewMasterClient(
  56. grpcDialOption,
  57. "", // filerGroup - not needed for admin
  58. "admin", // clientType
  59. "", // clientHost - not needed for admin
  60. "", // dataCenter - not needed for admin
  61. "", // rack - not needed for admin
  62. *pb.ServerAddresses(masters).ToServiceDiscovery(),
  63. )
  64. // Start master client connection process (like shell and filer do)
  65. ctx := context.Background()
  66. go masterClient.KeepConnectedToMaster(ctx)
  67. server := &AdminServer{
  68. masterClient: masterClient,
  69. templateFS: templateFS,
  70. dataDir: dataDir,
  71. grpcDialOption: grpcDialOption,
  72. cacheExpiration: 10 * time.Second,
  73. filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
  74. configPersistence: NewConfigPersistence(dataDir),
  75. }
  76. // Initialize topic retention purger
  77. server.topicRetentionPurger = NewTopicRetentionPurger(server)
  78. // Initialize credential manager with defaults
  79. credentialManager, err := credential.NewCredentialManagerWithDefaults("")
  80. if err != nil {
  81. glog.Warningf("Failed to initialize credential manager: %v", err)
  82. // Continue without credential manager - will fall back to legacy approach
  83. } else {
  84. // For stores that need filer client details, set them
  85. if store := credentialManager.GetStore(); store != nil {
  86. if filerClientSetter, ok := store.(interface {
  87. SetFilerClient(string, grpc.DialOption)
  88. }); ok {
  89. // We'll set the filer client later when we discover filers
  90. // For now, just store the credential manager
  91. server.credentialManager = credentialManager
  92. // Set up a goroutine to set filer client once we discover filers
  93. go func() {
  94. for {
  95. filerAddr := server.GetFilerAddress()
  96. if filerAddr != "" {
  97. filerClientSetter.SetFilerClient(filerAddr, server.grpcDialOption)
  98. glog.V(1).Infof("Set filer client for credential manager: %s", filerAddr)
  99. break
  100. }
  101. glog.V(1).Infof("Waiting for filer discovery for credential manager...")
  102. time.Sleep(5 * time.Second) // Retry every 5 seconds
  103. }
  104. }()
  105. } else {
  106. server.credentialManager = credentialManager
  107. }
  108. } else {
  109. server.credentialManager = credentialManager
  110. }
  111. }
  112. // Initialize maintenance system - always initialize even without persistent storage
  113. var maintenanceConfig *maintenance.MaintenanceConfig
  114. if server.configPersistence.IsConfigured() {
  115. var err error
  116. maintenanceConfig, err = server.configPersistence.LoadMaintenanceConfig()
  117. if err != nil {
  118. glog.Errorf("Failed to load maintenance configuration: %v", err)
  119. maintenanceConfig = maintenance.DefaultMaintenanceConfig()
  120. }
  121. // Apply new defaults to handle schema changes (like enabling by default)
  122. schema := maintenance.GetMaintenanceConfigSchema()
  123. if err := schema.ApplyDefaultsToProtobuf(maintenanceConfig); err != nil {
  124. glog.Warningf("Failed to apply schema defaults to loaded config: %v", err)
  125. }
  126. // Force enable maintenance system for new default behavior
  127. // This handles the case where old configs had Enabled=false as default
  128. if !maintenanceConfig.Enabled {
  129. glog.V(1).Infof("Enabling maintenance system (new default behavior)")
  130. maintenanceConfig.Enabled = true
  131. }
  132. glog.V(1).Infof("Maintenance system initialized with persistent configuration (enabled: %v)", maintenanceConfig.Enabled)
  133. } else {
  134. maintenanceConfig = maintenance.DefaultMaintenanceConfig()
  135. glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode (enabled: %v)", maintenanceConfig.Enabled)
  136. }
  137. // Always initialize maintenance manager
  138. server.InitMaintenanceManager(maintenanceConfig)
  139. // Load saved task configurations from persistence
  140. server.loadTaskConfigurationsFromPersistence()
  141. // Start maintenance manager if enabled
  142. if maintenanceConfig.Enabled {
  143. go func() {
  144. // Give master client a bit of time to connect before starting scans
  145. time.Sleep(2 * time.Second)
  146. if err := server.StartMaintenanceManager(); err != nil {
  147. glog.Errorf("Failed to start maintenance manager: %v", err)
  148. }
  149. }()
  150. }
  151. return server
  152. }
  153. // loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files
  154. func (s *AdminServer) loadTaskConfigurationsFromPersistence() {
  155. if s.configPersistence == nil || !s.configPersistence.IsConfigured() {
  156. glog.V(1).Infof("Config persistence not available, using default task configurations")
  157. return
  158. }
  159. // Load task configurations dynamically using the config update registry
  160. configUpdateRegistry := tasks.GetGlobalConfigUpdateRegistry()
  161. configUpdateRegistry.UpdateAllConfigs(s.configPersistence)
  162. }
  163. // GetCredentialManager returns the credential manager
  164. func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
  165. return s.credentialManager
  166. }
  167. // Filer discovery methods moved to client_management.go
  168. // Client management methods moved to client_management.go
  169. // WithFilerClient and WithVolumeServerClient methods moved to client_management.go
  170. // Cluster topology methods moved to cluster_topology.go
  171. // getTopologyViaGRPC method moved to cluster_topology.go
  172. // InvalidateCache method moved to cluster_topology.go
  173. // GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections
  174. func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
  175. var buckets []S3Bucket
  176. // Build a map of collection name to collection data
  177. collectionMap := make(map[string]struct {
  178. Size int64
  179. FileCount int64
  180. })
  181. // Collect volume information by collection
  182. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  183. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  184. if err != nil {
  185. return err
  186. }
  187. if resp.TopologyInfo != nil {
  188. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  189. for _, rack := range dc.RackInfos {
  190. for _, node := range rack.DataNodeInfos {
  191. for _, diskInfo := range node.DiskInfos {
  192. for _, volInfo := range diskInfo.VolumeInfos {
  193. collection := volInfo.Collection
  194. if collection == "" {
  195. collection = "default"
  196. }
  197. if _, exists := collectionMap[collection]; !exists {
  198. collectionMap[collection] = struct {
  199. Size int64
  200. FileCount int64
  201. }{}
  202. }
  203. data := collectionMap[collection]
  204. data.Size += int64(volInfo.Size)
  205. data.FileCount += int64(volInfo.FileCount)
  206. collectionMap[collection] = data
  207. }
  208. }
  209. }
  210. }
  211. }
  212. }
  213. return nil
  214. })
  215. if err != nil {
  216. return nil, fmt.Errorf("failed to get volume information: %w", err)
  217. }
  218. // Get filer configuration to determine FilerGroup
  219. var filerGroup string
  220. err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  221. configResp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  222. if err != nil {
  223. glog.Warningf("Failed to get filer configuration: %v", err)
  224. // Continue without filer group
  225. return nil
  226. }
  227. filerGroup = configResp.FilerGroup
  228. return nil
  229. })
  230. if err != nil {
  231. return nil, fmt.Errorf("failed to get filer configuration: %w", err)
  232. }
  233. // Now list buckets from the filer and match with collection data
  234. err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  235. // List buckets by looking at the /buckets directory
  236. stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
  237. Directory: "/buckets",
  238. Prefix: "",
  239. StartFromFileName: "",
  240. InclusiveStartFrom: false,
  241. Limit: 1000,
  242. })
  243. if err != nil {
  244. return err
  245. }
  246. for {
  247. resp, err := stream.Recv()
  248. if err != nil {
  249. if err.Error() == "EOF" {
  250. break
  251. }
  252. return err
  253. }
  254. if resp.Entry.IsDirectory {
  255. bucketName := resp.Entry.Name
  256. // Determine collection name for this bucket
  257. var collectionName string
  258. if filerGroup != "" {
  259. collectionName = fmt.Sprintf("%s_%s", filerGroup, bucketName)
  260. } else {
  261. collectionName = bucketName
  262. }
  263. // Get size and object count from collection data
  264. var size int64
  265. var objectCount int64
  266. if collectionData, exists := collectionMap[collectionName]; exists {
  267. size = collectionData.Size
  268. objectCount = collectionData.FileCount
  269. }
  270. // Get quota information from entry
  271. quota := resp.Entry.Quota
  272. quotaEnabled := quota > 0
  273. if quota < 0 {
  274. // Negative quota means disabled
  275. quota = -quota
  276. quotaEnabled = false
  277. }
  278. // Get versioning and object lock information from extended attributes
  279. versioningEnabled := false
  280. objectLockEnabled := false
  281. objectLockMode := ""
  282. var objectLockDuration int32 = 0
  283. if resp.Entry.Extended != nil {
  284. // Use shared utility to extract versioning information
  285. versioningEnabled = extractVersioningFromEntry(resp.Entry)
  286. // Use shared utility to extract Object Lock information
  287. objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(resp.Entry)
  288. }
  289. bucket := S3Bucket{
  290. Name: bucketName,
  291. CreatedAt: time.Unix(resp.Entry.Attributes.Crtime, 0),
  292. Size: size,
  293. ObjectCount: objectCount,
  294. LastModified: time.Unix(resp.Entry.Attributes.Mtime, 0),
  295. Quota: quota,
  296. QuotaEnabled: quotaEnabled,
  297. VersioningEnabled: versioningEnabled,
  298. ObjectLockEnabled: objectLockEnabled,
  299. ObjectLockMode: objectLockMode,
  300. ObjectLockDuration: objectLockDuration,
  301. }
  302. buckets = append(buckets, bucket)
  303. }
  304. }
  305. return nil
  306. })
  307. if err != nil {
  308. return nil, fmt.Errorf("failed to list Object Store buckets: %w", err)
  309. }
  310. return buckets, nil
  311. }
  312. // GetBucketDetails retrieves detailed information about a specific bucket
  313. func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) {
  314. bucketPath := fmt.Sprintf("/buckets/%s", bucketName)
  315. details := &BucketDetails{
  316. Bucket: S3Bucket{
  317. Name: bucketName,
  318. },
  319. Objects: []S3Object{},
  320. UpdatedAt: time.Now(),
  321. }
  322. err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  323. // Get bucket info
  324. bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
  325. Directory: "/buckets",
  326. Name: bucketName,
  327. })
  328. if err != nil {
  329. return fmt.Errorf("bucket not found: %w", err)
  330. }
  331. details.Bucket.CreatedAt = time.Unix(bucketResp.Entry.Attributes.Crtime, 0)
  332. details.Bucket.LastModified = time.Unix(bucketResp.Entry.Attributes.Mtime, 0)
  333. // Get quota information from entry
  334. quota := bucketResp.Entry.Quota
  335. quotaEnabled := quota > 0
  336. if quota < 0 {
  337. // Negative quota means disabled
  338. quota = -quota
  339. quotaEnabled = false
  340. }
  341. details.Bucket.Quota = quota
  342. details.Bucket.QuotaEnabled = quotaEnabled
  343. // Get versioning and object lock information from extended attributes
  344. versioningEnabled := false
  345. objectLockEnabled := false
  346. objectLockMode := ""
  347. var objectLockDuration int32 = 0
  348. if bucketResp.Entry.Extended != nil {
  349. // Use shared utility to extract versioning information
  350. versioningEnabled = extractVersioningFromEntry(bucketResp.Entry)
  351. // Use shared utility to extract Object Lock information
  352. objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(bucketResp.Entry)
  353. }
  354. details.Bucket.VersioningEnabled = versioningEnabled
  355. details.Bucket.ObjectLockEnabled = objectLockEnabled
  356. details.Bucket.ObjectLockMode = objectLockMode
  357. details.Bucket.ObjectLockDuration = objectLockDuration
  358. // List objects in bucket (recursively)
  359. return s.listBucketObjects(client, bucketPath, "", details)
  360. })
  361. if err != nil {
  362. return nil, err
  363. }
  364. return details, nil
  365. }
  366. // listBucketObjects recursively lists all objects in a bucket
  367. func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, directory, prefix string, details *BucketDetails) error {
  368. stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
  369. Directory: directory,
  370. Prefix: prefix,
  371. StartFromFileName: "",
  372. InclusiveStartFrom: false,
  373. Limit: 1000,
  374. })
  375. if err != nil {
  376. return err
  377. }
  378. for {
  379. resp, err := stream.Recv()
  380. if err != nil {
  381. if err.Error() == "EOF" {
  382. break
  383. }
  384. return err
  385. }
  386. entry := resp.Entry
  387. if entry.IsDirectory {
  388. // Recursively list subdirectories
  389. subDir := fmt.Sprintf("%s/%s", directory, entry.Name)
  390. err := s.listBucketObjects(client, subDir, "", details)
  391. if err != nil {
  392. return err
  393. }
  394. } else {
  395. // Add file object
  396. objectKey := entry.Name
  397. if directory != fmt.Sprintf("/buckets/%s", details.Bucket.Name) {
  398. // Remove bucket prefix to get relative path
  399. relativePath := directory[len(fmt.Sprintf("/buckets/%s", details.Bucket.Name))+1:]
  400. objectKey = fmt.Sprintf("%s/%s", relativePath, entry.Name)
  401. }
  402. obj := S3Object{
  403. Key: objectKey,
  404. Size: int64(entry.Attributes.FileSize),
  405. LastModified: time.Unix(entry.Attributes.Mtime, 0),
  406. ETag: "", // Could be calculated from chunks if needed
  407. StorageClass: "STANDARD",
  408. }
  409. details.Objects = append(details.Objects, obj)
  410. details.TotalSize += obj.Size
  411. details.TotalCount++
  412. }
  413. }
  414. // Update bucket totals
  415. details.Bucket.Size = details.TotalSize
  416. details.Bucket.ObjectCount = details.TotalCount
  417. return nil
  418. }
  419. // CreateS3Bucket creates a new S3 bucket
  420. func (s *AdminServer) CreateS3Bucket(bucketName string) error {
  421. return s.CreateS3BucketWithQuota(bucketName, 0, false)
  422. }
  423. // DeleteS3Bucket deletes an S3 bucket and all its contents
  424. func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
  425. return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  426. // Delete bucket directory recursively
  427. _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
  428. Directory: "/buckets",
  429. Name: bucketName,
  430. IsDeleteData: true,
  431. IsRecursive: true,
  432. IgnoreRecursiveError: false,
  433. })
  434. if err != nil {
  435. return fmt.Errorf("failed to delete bucket: %w", err)
  436. }
  437. return nil
  438. })
  439. }
  440. // GetObjectStoreUsers retrieves object store users from identity.json
  441. func (s *AdminServer) GetObjectStoreUsers() ([]ObjectStoreUser, error) {
  442. s3cfg := &iam_pb.S3ApiConfiguration{}
  443. // Load IAM configuration from filer
  444. err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  445. var buf bytes.Buffer
  446. if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
  447. if err == filer_pb.ErrNotFound {
  448. // If file doesn't exist, return empty configuration
  449. return nil
  450. }
  451. return err
  452. }
  453. if buf.Len() > 0 {
  454. return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
  455. }
  456. return nil
  457. })
  458. if err != nil {
  459. glog.Errorf("Failed to load IAM configuration: %v", err)
  460. return []ObjectStoreUser{}, nil // Return empty list instead of error for UI
  461. }
  462. var users []ObjectStoreUser
  463. // Convert IAM identities to ObjectStoreUser format
  464. for _, identity := range s3cfg.Identities {
  465. // Skip anonymous identity
  466. if identity.Name == "anonymous" {
  467. continue
  468. }
  469. user := ObjectStoreUser{
  470. Username: identity.Name,
  471. Permissions: identity.Actions,
  472. }
  473. // Set email from account if available
  474. if identity.Account != nil {
  475. user.Email = identity.Account.EmailAddress
  476. }
  477. // Get first access key for display
  478. if len(identity.Credentials) > 0 {
  479. user.AccessKey = identity.Credentials[0].AccessKey
  480. user.SecretKey = identity.Credentials[0].SecretKey
  481. }
  482. users = append(users, user)
  483. }
  484. return users, nil
  485. }
  486. // Volume server methods moved to volume_management.go
  487. // Volume methods moved to volume_management.go
  488. // sortVolumes method moved to volume_management.go
  489. // GetClusterCollections method moved to collection_management.go
  490. // GetClusterMasters retrieves cluster masters data
  491. func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) {
  492. var masters []MasterInfo
  493. var leaderCount int
  494. // First, get master information from topology
  495. topology, err := s.GetClusterTopology()
  496. if err != nil {
  497. return nil, err
  498. }
  499. // Create a map to merge topology and raft data
  500. masterMap := make(map[string]*MasterInfo)
  501. // Add masters from topology
  502. for _, master := range topology.Masters {
  503. masterInfo := &MasterInfo{
  504. Address: master.Address,
  505. IsLeader: master.IsLeader,
  506. Suffrage: "",
  507. }
  508. if master.IsLeader {
  509. leaderCount++
  510. }
  511. masterMap[master.Address] = masterInfo
  512. }
  513. // Then, get additional master information from Raft cluster
  514. err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  515. resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{})
  516. if err != nil {
  517. return err
  518. }
  519. // Process each raft server
  520. for _, server := range resp.ClusterServers {
  521. address := server.Address
  522. // Update existing master info or create new one
  523. if masterInfo, exists := masterMap[address]; exists {
  524. // Update existing master with raft data
  525. masterInfo.IsLeader = server.IsLeader
  526. masterInfo.Suffrage = server.Suffrage
  527. } else {
  528. // Create new master info from raft data
  529. masterInfo := &MasterInfo{
  530. Address: address,
  531. IsLeader: server.IsLeader,
  532. Suffrage: server.Suffrage,
  533. }
  534. masterMap[address] = masterInfo
  535. }
  536. if server.IsLeader {
  537. // Update leader count based on raft data
  538. leaderCount = 1 // There should only be one leader
  539. }
  540. }
  541. return nil
  542. })
  543. if err != nil {
  544. // If gRPC call fails, log the error but continue with topology data
  545. currentMaster := s.masterClient.GetMaster(context.Background())
  546. glog.Errorf("Failed to get raft cluster servers from master %s: %v", currentMaster, err)
  547. }
  548. // Convert map to slice
  549. for _, masterInfo := range masterMap {
  550. masters = append(masters, *masterInfo)
  551. }
  552. // If no masters found at all, add the current master as fallback
  553. if len(masters) == 0 {
  554. currentMaster := s.masterClient.GetMaster(context.Background())
  555. if currentMaster != "" {
  556. masters = append(masters, MasterInfo{
  557. Address: string(currentMaster),
  558. IsLeader: true,
  559. Suffrage: "Voter",
  560. })
  561. leaderCount = 1
  562. }
  563. }
  564. return &ClusterMastersData{
  565. Masters: masters,
  566. TotalMasters: len(masters),
  567. LeaderCount: leaderCount,
  568. LastUpdated: time.Now(),
  569. }, nil
  570. }
  571. // GetClusterFilers retrieves cluster filers data
  572. func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) {
  573. var filers []FilerInfo
  574. // Get filer information from master using ListClusterNodes
  575. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  576. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  577. ClientType: cluster.FilerType,
  578. })
  579. if err != nil {
  580. return err
  581. }
  582. // Process each filer node
  583. for _, node := range resp.ClusterNodes {
  584. createdAt := time.Unix(0, node.CreatedAtNs)
  585. filerInfo := FilerInfo{
  586. Address: node.Address,
  587. DataCenter: node.DataCenter,
  588. Rack: node.Rack,
  589. Version: node.Version,
  590. CreatedAt: createdAt,
  591. }
  592. filers = append(filers, filerInfo)
  593. }
  594. return nil
  595. })
  596. if err != nil {
  597. return nil, fmt.Errorf("failed to get filer nodes from master: %w", err)
  598. }
  599. return &ClusterFilersData{
  600. Filers: filers,
  601. TotalFilers: len(filers),
  602. LastUpdated: time.Now(),
  603. }, nil
  604. }
  605. // GetClusterBrokers retrieves cluster message brokers data
  606. func (s *AdminServer) GetClusterBrokers() (*ClusterBrokersData, error) {
  607. var brokers []MessageBrokerInfo
  608. // Get broker information from master using ListClusterNodes
  609. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  610. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  611. ClientType: cluster.BrokerType,
  612. })
  613. if err != nil {
  614. return err
  615. }
  616. // Process each broker node
  617. for _, node := range resp.ClusterNodes {
  618. createdAt := time.Unix(0, node.CreatedAtNs)
  619. brokerInfo := MessageBrokerInfo{
  620. Address: node.Address,
  621. DataCenter: node.DataCenter,
  622. Rack: node.Rack,
  623. Version: node.Version,
  624. CreatedAt: createdAt,
  625. }
  626. brokers = append(brokers, brokerInfo)
  627. }
  628. return nil
  629. })
  630. if err != nil {
  631. return nil, fmt.Errorf("failed to get broker nodes from master: %w", err)
  632. }
  633. return &ClusterBrokersData{
  634. Brokers: brokers,
  635. TotalBrokers: len(brokers),
  636. LastUpdated: time.Now(),
  637. }, nil
  638. }
  639. // GetAllFilers method moved to client_management.go
  640. // GetVolumeDetails method moved to volume_management.go
  641. // VacuumVolume method moved to volume_management.go
  642. // ShowMaintenanceQueue displays the maintenance queue page
  643. func (as *AdminServer) ShowMaintenanceQueue(c *gin.Context) {
  644. data, err := as.getMaintenanceQueueData()
  645. if err != nil {
  646. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  647. return
  648. }
  649. // This should not render HTML template, it should use the component approach
  650. c.JSON(http.StatusOK, data)
  651. }
  652. // ShowMaintenanceWorkers displays the maintenance workers page
  653. func (as *AdminServer) ShowMaintenanceWorkers(c *gin.Context) {
  654. workers, err := as.getMaintenanceWorkers()
  655. if err != nil {
  656. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  657. return
  658. }
  659. // Create worker details data
  660. workersData := make([]*WorkerDetailsData, 0, len(workers))
  661. for _, worker := range workers {
  662. details, err := as.getMaintenanceWorkerDetails(worker.ID)
  663. if err != nil {
  664. // Create basic worker details if we can't get full details
  665. details = &WorkerDetailsData{
  666. Worker: worker,
  667. CurrentTasks: []*MaintenanceTask{},
  668. RecentTasks: []*MaintenanceTask{},
  669. Performance: &WorkerPerformance{
  670. TasksCompleted: 0,
  671. TasksFailed: 0,
  672. AverageTaskTime: 0,
  673. Uptime: 0,
  674. SuccessRate: 0,
  675. },
  676. LastUpdated: time.Now(),
  677. }
  678. }
  679. workersData = append(workersData, details)
  680. }
  681. c.JSON(http.StatusOK, gin.H{
  682. "workers": workersData,
  683. "title": "Maintenance Workers",
  684. })
  685. }
  686. // ShowMaintenanceConfig displays the maintenance configuration page
  687. func (as *AdminServer) ShowMaintenanceConfig(c *gin.Context) {
  688. config, err := as.getMaintenanceConfig()
  689. if err != nil {
  690. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  691. return
  692. }
  693. // This should not render HTML template, it should use the component approach
  694. c.JSON(http.StatusOK, config)
  695. }
  696. // UpdateMaintenanceConfig updates maintenance configuration from form
  697. func (as *AdminServer) UpdateMaintenanceConfig(c *gin.Context) {
  698. var config MaintenanceConfig
  699. if err := c.ShouldBind(&config); err != nil {
  700. c.HTML(http.StatusBadRequest, "error.html", gin.H{"error": err.Error()})
  701. return
  702. }
  703. err := as.updateMaintenanceConfig(&config)
  704. if err != nil {
  705. c.HTML(http.StatusInternalServerError, "error.html", gin.H{"error": err.Error()})
  706. return
  707. }
  708. c.Redirect(http.StatusSeeOther, "/maintenance/config")
  709. }
  710. // TriggerMaintenanceScan triggers a maintenance scan
  711. func (as *AdminServer) TriggerMaintenanceScan(c *gin.Context) {
  712. err := as.triggerMaintenanceScan()
  713. if err != nil {
  714. c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
  715. return
  716. }
  717. c.JSON(http.StatusOK, gin.H{"success": true, "message": "Maintenance scan triggered"})
  718. }
  719. // GetMaintenanceTasks returns all maintenance tasks
  720. func (as *AdminServer) GetMaintenanceTasks(c *gin.Context) {
  721. tasks, err := as.getMaintenanceTasks()
  722. if err != nil {
  723. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  724. return
  725. }
  726. c.JSON(http.StatusOK, tasks)
  727. }
  728. // GetMaintenanceTask returns a specific maintenance task
  729. func (as *AdminServer) GetMaintenanceTask(c *gin.Context) {
  730. taskID := c.Param("id")
  731. task, err := as.getMaintenanceTask(taskID)
  732. if err != nil {
  733. c.JSON(http.StatusNotFound, gin.H{"error": "Task not found"})
  734. return
  735. }
  736. c.JSON(http.StatusOK, task)
  737. }
  738. // GetMaintenanceTaskDetailAPI returns detailed task information via API
  739. func (as *AdminServer) GetMaintenanceTaskDetailAPI(c *gin.Context) {
  740. taskID := c.Param("id")
  741. taskDetail, err := as.GetMaintenanceTaskDetail(taskID)
  742. if err != nil {
  743. c.JSON(http.StatusNotFound, gin.H{"error": "Task detail not found", "details": err.Error()})
  744. return
  745. }
  746. c.JSON(http.StatusOK, taskDetail)
  747. }
  748. // ShowMaintenanceTaskDetail renders the task detail page
  749. func (as *AdminServer) ShowMaintenanceTaskDetail(c *gin.Context) {
  750. username := c.GetString("username")
  751. if username == "" {
  752. username = "admin" // Default fallback
  753. }
  754. taskID := c.Param("id")
  755. taskDetail, err := as.GetMaintenanceTaskDetail(taskID)
  756. if err != nil {
  757. c.HTML(http.StatusNotFound, "error.html", gin.H{
  758. "error": "Task not found",
  759. "details": err.Error(),
  760. })
  761. return
  762. }
  763. // Prepare data for template
  764. data := gin.H{
  765. "username": username,
  766. "task": taskDetail.Task,
  767. "taskDetail": taskDetail,
  768. "title": fmt.Sprintf("Task Detail - %s", taskID),
  769. }
  770. c.HTML(http.StatusOK, "task_detail.html", data)
  771. }
  772. // CancelMaintenanceTask cancels a pending maintenance task
  773. func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
  774. taskID := c.Param("id")
  775. err := as.cancelMaintenanceTask(taskID)
  776. if err != nil {
  777. c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
  778. return
  779. }
  780. c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"})
  781. }
  782. // cancelMaintenanceTask cancels a pending maintenance task
  783. func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
  784. if as.maintenanceManager == nil {
  785. return fmt.Errorf("maintenance manager not initialized")
  786. }
  787. return as.maintenanceManager.CancelTask(taskID)
  788. }
  789. // GetMaintenanceWorkersAPI returns all maintenance workers
  790. func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) {
  791. workers, err := as.getMaintenanceWorkers()
  792. if err != nil {
  793. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  794. return
  795. }
  796. c.JSON(http.StatusOK, workers)
  797. }
  798. // GetMaintenanceWorker returns a specific maintenance worker
  799. func (as *AdminServer) GetMaintenanceWorker(c *gin.Context) {
  800. workerID := c.Param("id")
  801. worker, err := as.getMaintenanceWorkerDetails(workerID)
  802. if err != nil {
  803. c.JSON(http.StatusNotFound, gin.H{"error": "Worker not found"})
  804. return
  805. }
  806. c.JSON(http.StatusOK, worker)
  807. }
  808. // GetMaintenanceStats returns maintenance statistics
  809. func (as *AdminServer) GetMaintenanceStats(c *gin.Context) {
  810. stats, err := as.getMaintenanceStats()
  811. if err != nil {
  812. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  813. return
  814. }
  815. c.JSON(http.StatusOK, stats)
  816. }
  817. // GetMaintenanceConfigAPI returns maintenance configuration
  818. func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) {
  819. config, err := as.getMaintenanceConfig()
  820. if err != nil {
  821. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  822. return
  823. }
  824. c.JSON(http.StatusOK, config)
  825. }
  826. // UpdateMaintenanceConfigAPI updates maintenance configuration via API
  827. func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) {
  828. // Parse JSON into a generic map first to handle type conversions
  829. var jsonConfig map[string]interface{}
  830. if err := c.ShouldBindJSON(&jsonConfig); err != nil {
  831. c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
  832. return
  833. }
  834. // Convert JSON map to protobuf configuration
  835. config, err := convertJSONToMaintenanceConfig(jsonConfig)
  836. if err != nil {
  837. c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse configuration: " + err.Error()})
  838. return
  839. }
  840. err = as.updateMaintenanceConfig(config)
  841. if err != nil {
  842. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  843. return
  844. }
  845. c.JSON(http.StatusOK, gin.H{"success": true, "message": "Configuration updated"})
  846. }
  847. // GetMaintenanceConfigData returns maintenance configuration data (public wrapper)
  848. func (as *AdminServer) GetMaintenanceConfigData() (*maintenance.MaintenanceConfigData, error) {
  849. return as.getMaintenanceConfig()
  850. }
  851. // UpdateMaintenanceConfigData updates maintenance configuration (public wrapper)
  852. func (as *AdminServer) UpdateMaintenanceConfigData(config *maintenance.MaintenanceConfig) error {
  853. return as.updateMaintenanceConfig(config)
  854. }
  855. // Helper methods for maintenance operations
  856. // getMaintenanceQueueData returns data for the maintenance queue UI
  857. func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueData, error) {
  858. tasks, err := as.getMaintenanceTasks()
  859. if err != nil {
  860. return nil, err
  861. }
  862. workers, err := as.getMaintenanceWorkers()
  863. if err != nil {
  864. return nil, err
  865. }
  866. stats, err := as.getMaintenanceQueueStats()
  867. if err != nil {
  868. return nil, err
  869. }
  870. return &maintenance.MaintenanceQueueData{
  871. Tasks: tasks,
  872. Workers: workers,
  873. Stats: stats,
  874. LastUpdated: time.Now(),
  875. }, nil
  876. }
  877. // GetMaintenanceQueueStats returns statistics for the maintenance queue (exported for handlers)
  878. func (as *AdminServer) GetMaintenanceQueueStats() (*maintenance.QueueStats, error) {
  879. return as.getMaintenanceQueueStats()
  880. }
  881. // getMaintenanceQueueStats returns statistics for the maintenance queue
  882. func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) {
  883. if as.maintenanceManager == nil {
  884. return &maintenance.QueueStats{
  885. PendingTasks: 0,
  886. RunningTasks: 0,
  887. CompletedToday: 0,
  888. FailedToday: 0,
  889. TotalTasks: 0,
  890. }, nil
  891. }
  892. // Get real statistics from maintenance manager
  893. stats := as.maintenanceManager.GetStats()
  894. // Convert MaintenanceStats to QueueStats
  895. queueStats := &maintenance.QueueStats{
  896. PendingTasks: stats.TasksByStatus[maintenance.TaskStatusPending],
  897. RunningTasks: stats.TasksByStatus[maintenance.TaskStatusAssigned] + stats.TasksByStatus[maintenance.TaskStatusInProgress],
  898. CompletedToday: stats.CompletedToday,
  899. FailedToday: stats.FailedToday,
  900. TotalTasks: stats.TotalTasks,
  901. }
  902. return queueStats, nil
  903. }
  904. // getMaintenanceTasks returns all maintenance tasks
  905. func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) {
  906. if as.maintenanceManager == nil {
  907. return []*maintenance.MaintenanceTask{}, nil
  908. }
  909. // Collect all tasks from memory across all statuses
  910. allTasks := []*maintenance.MaintenanceTask{}
  911. statuses := []maintenance.MaintenanceTaskStatus{
  912. maintenance.TaskStatusPending,
  913. maintenance.TaskStatusAssigned,
  914. maintenance.TaskStatusInProgress,
  915. maintenance.TaskStatusCompleted,
  916. maintenance.TaskStatusFailed,
  917. maintenance.TaskStatusCancelled,
  918. }
  919. for _, status := range statuses {
  920. tasks := as.maintenanceManager.GetTasks(status, "", 0)
  921. allTasks = append(allTasks, tasks...)
  922. }
  923. // Also load any persisted tasks that might not be in memory
  924. if as.configPersistence != nil {
  925. persistedTasks, err := as.configPersistence.LoadAllTaskStates()
  926. if err == nil {
  927. // Add any persisted tasks not already in memory
  928. for _, persistedTask := range persistedTasks {
  929. found := false
  930. for _, memoryTask := range allTasks {
  931. if memoryTask.ID == persistedTask.ID {
  932. found = true
  933. break
  934. }
  935. }
  936. if !found {
  937. allTasks = append(allTasks, persistedTask)
  938. }
  939. }
  940. }
  941. }
  942. return allTasks, nil
  943. }
  944. // getMaintenanceTask returns a specific maintenance task
  945. func (as *AdminServer) getMaintenanceTask(taskID string) (*maintenance.MaintenanceTask, error) {
  946. if as.maintenanceManager == nil {
  947. return nil, fmt.Errorf("maintenance manager not initialized")
  948. }
  949. // Search for the task across all statuses since we don't know which status it has
  950. statuses := []maintenance.MaintenanceTaskStatus{
  951. maintenance.TaskStatusPending,
  952. maintenance.TaskStatusAssigned,
  953. maintenance.TaskStatusInProgress,
  954. maintenance.TaskStatusCompleted,
  955. maintenance.TaskStatusFailed,
  956. maintenance.TaskStatusCancelled,
  957. }
  958. // First, search for the task in memory across all statuses
  959. for _, status := range statuses {
  960. tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status
  961. for _, task := range tasks {
  962. if task.ID == taskID {
  963. return task, nil
  964. }
  965. }
  966. }
  967. // If not found in memory, try to load from persistent storage
  968. if as.configPersistence != nil {
  969. task, err := as.configPersistence.LoadTaskState(taskID)
  970. if err == nil {
  971. glog.V(2).Infof("Loaded task %s from persistent storage", taskID)
  972. return task, nil
  973. }
  974. glog.V(2).Infof("Task %s not found in persistent storage: %v", taskID, err)
  975. }
  976. return nil, fmt.Errorf("task %s not found", taskID)
  977. }
  978. // GetMaintenanceTaskDetail returns comprehensive task details including logs and assignment history
  979. func (as *AdminServer) GetMaintenanceTaskDetail(taskID string) (*maintenance.TaskDetailData, error) {
  980. // Get basic task information
  981. task, err := as.getMaintenanceTask(taskID)
  982. if err != nil {
  983. return nil, err
  984. }
  985. // Create task detail structure from the loaded task
  986. taskDetail := &maintenance.TaskDetailData{
  987. Task: task,
  988. AssignmentHistory: task.AssignmentHistory, // Use assignment history from persisted task
  989. ExecutionLogs: []*maintenance.TaskExecutionLog{},
  990. RelatedTasks: []*maintenance.MaintenanceTask{},
  991. LastUpdated: time.Now(),
  992. }
  993. if taskDetail.AssignmentHistory == nil {
  994. taskDetail.AssignmentHistory = []*maintenance.TaskAssignmentRecord{}
  995. }
  996. // Get worker information if task is assigned
  997. if task.WorkerID != "" {
  998. workers := as.maintenanceManager.GetWorkers()
  999. for _, worker := range workers {
  1000. if worker.ID == task.WorkerID {
  1001. taskDetail.WorkerInfo = worker
  1002. break
  1003. }
  1004. }
  1005. }
  1006. // Get execution logs from worker if task is active/completed and worker is connected
  1007. if task.Status == maintenance.TaskStatusInProgress || task.Status == maintenance.TaskStatusCompleted {
  1008. if as.workerGrpcServer != nil && task.WorkerID != "" {
  1009. workerLogs, err := as.workerGrpcServer.RequestTaskLogs(task.WorkerID, taskID, 100, "")
  1010. if err == nil && len(workerLogs) > 0 {
  1011. // Convert worker logs to maintenance logs
  1012. for _, workerLog := range workerLogs {
  1013. maintenanceLog := &maintenance.TaskExecutionLog{
  1014. Timestamp: time.Unix(workerLog.Timestamp, 0),
  1015. Level: workerLog.Level,
  1016. Message: workerLog.Message,
  1017. Source: "worker",
  1018. TaskID: taskID,
  1019. WorkerID: task.WorkerID,
  1020. }
  1021. // carry structured fields if present
  1022. if len(workerLog.Fields) > 0 {
  1023. maintenanceLog.Fields = make(map[string]string, len(workerLog.Fields))
  1024. for k, v := range workerLog.Fields {
  1025. maintenanceLog.Fields[k] = v
  1026. }
  1027. }
  1028. // carry optional progress/status
  1029. if workerLog.Progress != 0 {
  1030. p := float64(workerLog.Progress)
  1031. maintenanceLog.Progress = &p
  1032. }
  1033. if workerLog.Status != "" {
  1034. maintenanceLog.Status = workerLog.Status
  1035. }
  1036. taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, maintenanceLog)
  1037. }
  1038. } else if err != nil {
  1039. // Add a diagnostic log entry when worker logs cannot be retrieved
  1040. diagnosticLog := &maintenance.TaskExecutionLog{
  1041. Timestamp: time.Now(),
  1042. Level: "WARNING",
  1043. Message: fmt.Sprintf("Failed to retrieve worker logs: %v", err),
  1044. Source: "admin",
  1045. TaskID: taskID,
  1046. WorkerID: task.WorkerID,
  1047. }
  1048. taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
  1049. glog.V(1).Infof("Failed to get worker logs for task %s from worker %s: %v", taskID, task.WorkerID, err)
  1050. }
  1051. } else {
  1052. // Add diagnostic information when worker is not available
  1053. reason := "worker gRPC server not available"
  1054. if task.WorkerID == "" {
  1055. reason = "no worker assigned to task"
  1056. }
  1057. diagnosticLog := &maintenance.TaskExecutionLog{
  1058. Timestamp: time.Now(),
  1059. Level: "INFO",
  1060. Message: fmt.Sprintf("Worker logs not available: %s", reason),
  1061. Source: "admin",
  1062. TaskID: taskID,
  1063. WorkerID: task.WorkerID,
  1064. }
  1065. taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
  1066. }
  1067. }
  1068. // Get related tasks (other tasks on same volume/server)
  1069. if task.VolumeID != 0 || task.Server != "" {
  1070. allTasks := as.maintenanceManager.GetTasks("", "", 50) // Get recent tasks
  1071. for _, relatedTask := range allTasks {
  1072. if relatedTask.ID != taskID &&
  1073. (relatedTask.VolumeID == task.VolumeID || relatedTask.Server == task.Server) {
  1074. taskDetail.RelatedTasks = append(taskDetail.RelatedTasks, relatedTask)
  1075. }
  1076. }
  1077. }
  1078. // Save updated task detail to disk
  1079. if err := as.configPersistence.SaveTaskDetail(taskID, taskDetail); err != nil {
  1080. glog.V(1).Infof("Failed to save task detail for %s: %v", taskID, err)
  1081. }
  1082. return taskDetail, nil
  1083. }
  1084. // getMaintenanceWorkers returns all maintenance workers
  1085. func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
  1086. if as.maintenanceManager == nil {
  1087. return []*MaintenanceWorker{}, nil
  1088. }
  1089. return as.maintenanceManager.GetWorkers(), nil
  1090. }
  1091. // getMaintenanceWorkerDetails returns detailed information about a worker
  1092. func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDetailsData, error) {
  1093. if as.maintenanceManager == nil {
  1094. return nil, fmt.Errorf("maintenance manager not initialized")
  1095. }
  1096. workers := as.maintenanceManager.GetWorkers()
  1097. var targetWorker *MaintenanceWorker
  1098. for _, worker := range workers {
  1099. if worker.ID == workerID {
  1100. targetWorker = worker
  1101. break
  1102. }
  1103. }
  1104. if targetWorker == nil {
  1105. return nil, fmt.Errorf("worker %s not found", workerID)
  1106. }
  1107. // Get current tasks for this worker
  1108. currentTasks := as.maintenanceManager.GetTasks(TaskStatusInProgress, "", 0)
  1109. var workerCurrentTasks []*MaintenanceTask
  1110. for _, task := range currentTasks {
  1111. if task.WorkerID == workerID {
  1112. workerCurrentTasks = append(workerCurrentTasks, task)
  1113. }
  1114. }
  1115. // Get recent tasks for this worker
  1116. recentTasks := as.maintenanceManager.GetTasks(TaskStatusCompleted, "", 10)
  1117. var workerRecentTasks []*MaintenanceTask
  1118. for _, task := range recentTasks {
  1119. if task.WorkerID == workerID {
  1120. workerRecentTasks = append(workerRecentTasks, task)
  1121. }
  1122. }
  1123. // Calculate performance metrics
  1124. var totalDuration time.Duration
  1125. var completedTasks, failedTasks int
  1126. for _, task := range workerRecentTasks {
  1127. if task.Status == TaskStatusCompleted {
  1128. completedTasks++
  1129. if task.StartedAt != nil && task.CompletedAt != nil {
  1130. totalDuration += task.CompletedAt.Sub(*task.StartedAt)
  1131. }
  1132. } else if task.Status == TaskStatusFailed {
  1133. failedTasks++
  1134. }
  1135. }
  1136. var averageTaskTime time.Duration
  1137. var successRate float64
  1138. if completedTasks+failedTasks > 0 {
  1139. if completedTasks > 0 {
  1140. averageTaskTime = totalDuration / time.Duration(completedTasks)
  1141. }
  1142. successRate = float64(completedTasks) / float64(completedTasks+failedTasks) * 100
  1143. }
  1144. return &WorkerDetailsData{
  1145. Worker: targetWorker,
  1146. CurrentTasks: workerCurrentTasks,
  1147. RecentTasks: workerRecentTasks,
  1148. Performance: &WorkerPerformance{
  1149. TasksCompleted: completedTasks,
  1150. TasksFailed: failedTasks,
  1151. AverageTaskTime: averageTaskTime,
  1152. Uptime: time.Since(targetWorker.LastHeartbeat), // This should be tracked properly
  1153. SuccessRate: successRate,
  1154. },
  1155. LastUpdated: time.Now(),
  1156. }, nil
  1157. }
  1158. // GetWorkerLogs fetches logs from a specific worker for a task
  1159. func (as *AdminServer) GetWorkerLogs(c *gin.Context) {
  1160. workerID := c.Param("id")
  1161. taskID := c.Query("taskId")
  1162. maxEntriesStr := c.DefaultQuery("maxEntries", "100")
  1163. logLevel := c.DefaultQuery("logLevel", "")
  1164. maxEntries := int32(100)
  1165. if maxEntriesStr != "" {
  1166. if parsed, err := strconv.ParseInt(maxEntriesStr, 10, 32); err == nil {
  1167. maxEntries = int32(parsed)
  1168. }
  1169. }
  1170. if as.workerGrpcServer == nil {
  1171. c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Worker gRPC server not available"})
  1172. return
  1173. }
  1174. logs, err := as.workerGrpcServer.RequestTaskLogs(workerID, taskID, maxEntries, logLevel)
  1175. if err != nil {
  1176. c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("Failed to get logs from worker: %v", err)})
  1177. return
  1178. }
  1179. c.JSON(http.StatusOK, gin.H{"worker_id": workerID, "task_id": taskID, "logs": logs, "count": len(logs)})
  1180. }
  1181. // getMaintenanceStats returns maintenance statistics
  1182. func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) {
  1183. if as.maintenanceManager == nil {
  1184. return &MaintenanceStats{
  1185. TotalTasks: 0,
  1186. TasksByStatus: make(map[MaintenanceTaskStatus]int),
  1187. TasksByType: make(map[MaintenanceTaskType]int),
  1188. ActiveWorkers: 0,
  1189. }, nil
  1190. }
  1191. return as.maintenanceManager.GetStats(), nil
  1192. }
  1193. // getMaintenanceConfig returns maintenance configuration
  1194. func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigData, error) {
  1195. // Load configuration from persistent storage
  1196. config, err := as.configPersistence.LoadMaintenanceConfig()
  1197. if err != nil {
  1198. // Fallback to default configuration
  1199. config = maintenance.DefaultMaintenanceConfig()
  1200. }
  1201. // Note: Do NOT apply schema defaults to existing config as it overrides saved values
  1202. // Only apply defaults when creating new configs or handling fallback cases
  1203. // The schema defaults should only be used in the UI for new installations
  1204. // Get system stats from maintenance manager if available
  1205. var systemStats *MaintenanceStats
  1206. if as.maintenanceManager != nil {
  1207. systemStats = as.maintenanceManager.GetStats()
  1208. } else {
  1209. // Fallback stats
  1210. systemStats = &MaintenanceStats{
  1211. TotalTasks: 0,
  1212. TasksByStatus: map[MaintenanceTaskStatus]int{
  1213. TaskStatusPending: 0,
  1214. TaskStatusInProgress: 0,
  1215. TaskStatusCompleted: 0,
  1216. TaskStatusFailed: 0,
  1217. },
  1218. TasksByType: make(map[MaintenanceTaskType]int),
  1219. ActiveWorkers: 0,
  1220. CompletedToday: 0,
  1221. FailedToday: 0,
  1222. AverageTaskTime: 0,
  1223. LastScanTime: time.Now().Add(-time.Hour),
  1224. NextScanTime: time.Now().Add(time.Duration(config.ScanIntervalSeconds) * time.Second),
  1225. }
  1226. }
  1227. configData := &MaintenanceConfigData{
  1228. Config: config,
  1229. IsEnabled: config.Enabled,
  1230. LastScanTime: systemStats.LastScanTime,
  1231. NextScanTime: systemStats.NextScanTime,
  1232. SystemStats: systemStats,
  1233. MenuItems: maintenance.BuildMaintenanceMenuItems(),
  1234. }
  1235. return configData, nil
  1236. }
  1237. // updateMaintenanceConfig updates maintenance configuration
  1238. func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error {
  1239. // Use ConfigField validation instead of standalone validation
  1240. if err := maintenance.ValidateMaintenanceConfigWithSchema(config); err != nil {
  1241. return fmt.Errorf("configuration validation failed: %v", err)
  1242. }
  1243. // Save configuration to persistent storage
  1244. if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil {
  1245. return fmt.Errorf("failed to save maintenance configuration: %w", err)
  1246. }
  1247. // Update maintenance manager if available
  1248. if as.maintenanceManager != nil {
  1249. if err := as.maintenanceManager.UpdateConfig(config); err != nil {
  1250. glog.Errorf("Failed to update maintenance manager config: %v", err)
  1251. // Don't return error here, just log it
  1252. }
  1253. }
  1254. glog.V(1).Infof("Updated maintenance configuration (enabled: %v, scan interval: %ds)",
  1255. config.Enabled, config.ScanIntervalSeconds)
  1256. return nil
  1257. }
  1258. // triggerMaintenanceScan triggers a maintenance scan
  1259. func (as *AdminServer) triggerMaintenanceScan() error {
  1260. if as.maintenanceManager == nil {
  1261. return fmt.Errorf("maintenance manager not initialized")
  1262. }
  1263. glog.V(1).Infof("Triggering maintenance scan")
  1264. err := as.maintenanceManager.TriggerScan()
  1265. if err != nil {
  1266. glog.Errorf("Failed to trigger maintenance scan: %v", err)
  1267. return err
  1268. }
  1269. glog.V(1).Infof("Maintenance scan triggered successfully")
  1270. return nil
  1271. }
  1272. // TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API
  1273. func (as *AdminServer) TriggerTopicRetentionPurgeAPI(c *gin.Context) {
  1274. err := as.TriggerTopicRetentionPurge()
  1275. if err != nil {
  1276. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  1277. return
  1278. }
  1279. c.JSON(http.StatusOK, gin.H{"message": "Topic retention purge triggered successfully"})
  1280. }
  1281. // GetConfigInfo returns information about the admin configuration
  1282. func (as *AdminServer) GetConfigInfo(c *gin.Context) {
  1283. configInfo := as.configPersistence.GetConfigInfo()
  1284. // Add additional admin server info
  1285. currentMaster := as.masterClient.GetMaster(context.Background())
  1286. configInfo["master_address"] = string(currentMaster)
  1287. configInfo["cache_expiration"] = as.cacheExpiration.String()
  1288. configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String()
  1289. // Add maintenance system info
  1290. if as.maintenanceManager != nil {
  1291. configInfo["maintenance_enabled"] = true
  1292. configInfo["maintenance_running"] = as.maintenanceManager.IsRunning()
  1293. } else {
  1294. configInfo["maintenance_enabled"] = false
  1295. configInfo["maintenance_running"] = false
  1296. }
  1297. c.JSON(http.StatusOK, gin.H{
  1298. "config_info": configInfo,
  1299. "title": "Configuration Information",
  1300. })
  1301. }
  1302. // GetMaintenanceWorkersData returns workers data for the maintenance workers page
  1303. func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, error) {
  1304. workers, err := as.getMaintenanceWorkers()
  1305. if err != nil {
  1306. return nil, err
  1307. }
  1308. // Create worker details data
  1309. workersData := make([]*WorkerDetailsData, 0, len(workers))
  1310. activeWorkers := 0
  1311. busyWorkers := 0
  1312. totalLoad := 0
  1313. for _, worker := range workers {
  1314. details, err := as.getMaintenanceWorkerDetails(worker.ID)
  1315. if err != nil {
  1316. // Create basic worker details if we can't get full details
  1317. details = &WorkerDetailsData{
  1318. Worker: worker,
  1319. CurrentTasks: []*MaintenanceTask{},
  1320. RecentTasks: []*MaintenanceTask{},
  1321. Performance: &WorkerPerformance{
  1322. TasksCompleted: 0,
  1323. TasksFailed: 0,
  1324. AverageTaskTime: 0,
  1325. Uptime: 0,
  1326. SuccessRate: 0,
  1327. },
  1328. LastUpdated: time.Now(),
  1329. }
  1330. }
  1331. workersData = append(workersData, details)
  1332. if worker.Status == "active" {
  1333. activeWorkers++
  1334. } else if worker.Status == "busy" {
  1335. busyWorkers++
  1336. }
  1337. totalLoad += worker.CurrentLoad
  1338. }
  1339. return &MaintenanceWorkersData{
  1340. Workers: workersData,
  1341. ActiveWorkers: activeWorkers,
  1342. BusyWorkers: busyWorkers,
  1343. TotalLoad: totalLoad,
  1344. LastUpdated: time.Now(),
  1345. }, nil
  1346. }
  1347. // StartWorkerGrpcServer starts the worker gRPC server
  1348. func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error {
  1349. if s.workerGrpcServer != nil {
  1350. return fmt.Errorf("worker gRPC server is already running")
  1351. }
  1352. s.workerGrpcServer = NewWorkerGrpcServer(s)
  1353. return s.workerGrpcServer.StartWithTLS(grpcPort)
  1354. }
  1355. // StopWorkerGrpcServer stops the worker gRPC server
  1356. func (s *AdminServer) StopWorkerGrpcServer() error {
  1357. if s.workerGrpcServer != nil {
  1358. err := s.workerGrpcServer.Stop()
  1359. s.workerGrpcServer = nil
  1360. return err
  1361. }
  1362. return nil
  1363. }
  1364. // GetWorkerGrpcServer returns the worker gRPC server
  1365. func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
  1366. return s.workerGrpcServer
  1367. }
  1368. // Maintenance system integration methods
  1369. // InitMaintenanceManager initializes the maintenance manager
  1370. func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
  1371. s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
  1372. // Set up task persistence if config persistence is available
  1373. if s.configPersistence != nil {
  1374. queue := s.maintenanceManager.GetQueue()
  1375. if queue != nil {
  1376. queue.SetPersistence(s.configPersistence)
  1377. // Load tasks from persistence on startup
  1378. if err := queue.LoadTasksFromPersistence(); err != nil {
  1379. glog.Errorf("Failed to load tasks from persistence: %v", err)
  1380. }
  1381. }
  1382. }
  1383. glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
  1384. }
  1385. // GetMaintenanceManager returns the maintenance manager
  1386. func (s *AdminServer) GetMaintenanceManager() *maintenance.MaintenanceManager {
  1387. return s.maintenanceManager
  1388. }
  1389. // StartMaintenanceManager starts the maintenance manager
  1390. func (s *AdminServer) StartMaintenanceManager() error {
  1391. if s.maintenanceManager == nil {
  1392. return fmt.Errorf("maintenance manager not initialized")
  1393. }
  1394. return s.maintenanceManager.Start()
  1395. }
  1396. // StopMaintenanceManager stops the maintenance manager
  1397. func (s *AdminServer) StopMaintenanceManager() {
  1398. if s.maintenanceManager != nil {
  1399. s.maintenanceManager.Stop()
  1400. }
  1401. }
  1402. // TriggerTopicRetentionPurge triggers topic data purging based on retention policies
  1403. func (s *AdminServer) TriggerTopicRetentionPurge() error {
  1404. if s.topicRetentionPurger == nil {
  1405. return fmt.Errorf("topic retention purger not initialized")
  1406. }
  1407. glog.V(0).Infof("Triggering topic retention purge")
  1408. return s.topicRetentionPurger.PurgeExpiredTopicData()
  1409. }
  1410. // GetTopicRetentionPurger returns the topic retention purger
  1411. func (s *AdminServer) GetTopicRetentionPurger() *TopicRetentionPurger {
  1412. return s.topicRetentionPurger
  1413. }
  1414. // CreateTopicWithRetention creates a new topic with optional retention configuration
  1415. func (s *AdminServer) CreateTopicWithRetention(namespace, name string, partitionCount int32, retentionEnabled bool, retentionSeconds int64) error {
  1416. // Find broker leader to create the topic
  1417. brokerLeader, err := s.findBrokerLeader()
  1418. if err != nil {
  1419. return fmt.Errorf("failed to find broker leader: %w", err)
  1420. }
  1421. // Create retention configuration
  1422. var retention *mq_pb.TopicRetention
  1423. if retentionEnabled {
  1424. retention = &mq_pb.TopicRetention{
  1425. Enabled: true,
  1426. RetentionSeconds: retentionSeconds,
  1427. }
  1428. } else {
  1429. retention = &mq_pb.TopicRetention{
  1430. Enabled: false,
  1431. RetentionSeconds: 0,
  1432. }
  1433. }
  1434. // Create the topic via broker
  1435. err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
  1436. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  1437. defer cancel()
  1438. _, err := client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
  1439. Topic: &schema_pb.Topic{
  1440. Namespace: namespace,
  1441. Name: name,
  1442. },
  1443. PartitionCount: partitionCount,
  1444. Retention: retention,
  1445. })
  1446. return err
  1447. })
  1448. if err != nil {
  1449. return fmt.Errorf("failed to create topic: %w", err)
  1450. }
  1451. glog.V(0).Infof("Created topic %s.%s with %d partitions (retention: enabled=%v, seconds=%d)",
  1452. namespace, name, partitionCount, retentionEnabled, retentionSeconds)
  1453. return nil
  1454. }
  1455. // UpdateTopicRetention updates the retention configuration for an existing topic
  1456. func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, retentionSeconds int64) error {
  1457. // Get broker information from master
  1458. var brokerAddress string
  1459. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  1460. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  1461. ClientType: cluster.BrokerType,
  1462. })
  1463. if err != nil {
  1464. return err
  1465. }
  1466. // Find the first available broker
  1467. for _, node := range resp.ClusterNodes {
  1468. brokerAddress = node.Address
  1469. break
  1470. }
  1471. return nil
  1472. })
  1473. if err != nil {
  1474. return fmt.Errorf("failed to get broker nodes from master: %w", err)
  1475. }
  1476. if brokerAddress == "" {
  1477. return fmt.Errorf("no active brokers found")
  1478. }
  1479. // Create gRPC connection
  1480. conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption)
  1481. if err != nil {
  1482. return fmt.Errorf("failed to connect to broker: %w", err)
  1483. }
  1484. defer conn.Close()
  1485. client := mq_pb.NewSeaweedMessagingClient(conn)
  1486. // First, get the current topic configuration to preserve existing settings
  1487. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  1488. defer cancel()
  1489. currentConfig, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
  1490. Topic: &schema_pb.Topic{
  1491. Namespace: namespace,
  1492. Name: name,
  1493. },
  1494. })
  1495. if err != nil {
  1496. return fmt.Errorf("failed to get current topic configuration: %w", err)
  1497. }
  1498. // Create the topic configuration request, preserving all existing settings
  1499. configRequest := &mq_pb.ConfigureTopicRequest{
  1500. Topic: &schema_pb.Topic{
  1501. Namespace: namespace,
  1502. Name: name,
  1503. },
  1504. // Preserve existing partition count - this is critical!
  1505. PartitionCount: currentConfig.PartitionCount,
  1506. // Preserve existing record type if it exists
  1507. RecordType: currentConfig.RecordType,
  1508. }
  1509. // Update only the retention configuration
  1510. if enabled {
  1511. configRequest.Retention = &mq_pb.TopicRetention{
  1512. RetentionSeconds: retentionSeconds,
  1513. Enabled: true,
  1514. }
  1515. } else {
  1516. // Set retention to disabled
  1517. configRequest.Retention = &mq_pb.TopicRetention{
  1518. RetentionSeconds: 0,
  1519. Enabled: false,
  1520. }
  1521. }
  1522. // Send the configuration request with preserved settings
  1523. _, err = client.ConfigureTopic(ctx, configRequest)
  1524. if err != nil {
  1525. return fmt.Errorf("failed to update topic retention: %w", err)
  1526. }
  1527. glog.V(0).Infof("Updated topic %s.%s retention (enabled: %v, seconds: %d) while preserving %d partitions",
  1528. namespace, name, enabled, retentionSeconds, currentConfig.PartitionCount)
  1529. return nil
  1530. }
  1531. // Shutdown gracefully shuts down the admin server
  1532. func (s *AdminServer) Shutdown() {
  1533. glog.V(1).Infof("Shutting down admin server...")
  1534. // Stop maintenance manager
  1535. s.StopMaintenanceManager()
  1536. // Stop worker gRPC server
  1537. if err := s.StopWorkerGrpcServer(); err != nil {
  1538. glog.Errorf("Failed to stop worker gRPC server: %v", err)
  1539. }
  1540. glog.V(1).Infof("Admin server shutdown complete")
  1541. }
  1542. // Function to extract Object Lock information from bucket entry using shared utilities
  1543. func extractObjectLockInfoFromEntry(entry *filer_pb.Entry) (bool, string, int32) {
  1544. // Try to load Object Lock configuration using shared utility
  1545. if config, found := s3api.LoadObjectLockConfigurationFromExtended(entry); found {
  1546. return s3api.ExtractObjectLockInfoFromConfig(config)
  1547. }
  1548. return false, "", 0
  1549. }
  1550. // Function to extract versioning information from bucket entry using shared utilities
  1551. func extractVersioningFromEntry(entry *filer_pb.Entry) bool {
  1552. enabled, _ := s3api.LoadVersioningFromExtended(entry)
  1553. return enabled
  1554. }
  1555. // GetConfigPersistence returns the config persistence manager
  1556. func (as *AdminServer) GetConfigPersistence() *ConfigPersistence {
  1557. return as.configPersistence
  1558. }
  1559. // convertJSONToMaintenanceConfig converts JSON map to protobuf MaintenanceConfig
  1560. func convertJSONToMaintenanceConfig(jsonConfig map[string]interface{}) (*maintenance.MaintenanceConfig, error) {
  1561. config := &maintenance.MaintenanceConfig{}
  1562. // Helper function to get int32 from interface{}
  1563. getInt32 := func(key string) (int32, error) {
  1564. if val, ok := jsonConfig[key]; ok {
  1565. switch v := val.(type) {
  1566. case int:
  1567. return int32(v), nil
  1568. case int32:
  1569. return v, nil
  1570. case int64:
  1571. return int32(v), nil
  1572. case float64:
  1573. return int32(v), nil
  1574. default:
  1575. return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
  1576. }
  1577. }
  1578. return 0, nil
  1579. }
  1580. // Helper function to get bool from interface{}
  1581. getBool := func(key string) bool {
  1582. if val, ok := jsonConfig[key]; ok {
  1583. if b, ok := val.(bool); ok {
  1584. return b
  1585. }
  1586. }
  1587. return false
  1588. }
  1589. var err error
  1590. // Convert basic fields
  1591. config.Enabled = getBool("enabled")
  1592. if config.ScanIntervalSeconds, err = getInt32("scan_interval_seconds"); err != nil {
  1593. return nil, err
  1594. }
  1595. if config.WorkerTimeoutSeconds, err = getInt32("worker_timeout_seconds"); err != nil {
  1596. return nil, err
  1597. }
  1598. if config.TaskTimeoutSeconds, err = getInt32("task_timeout_seconds"); err != nil {
  1599. return nil, err
  1600. }
  1601. if config.RetryDelaySeconds, err = getInt32("retry_delay_seconds"); err != nil {
  1602. return nil, err
  1603. }
  1604. if config.MaxRetries, err = getInt32("max_retries"); err != nil {
  1605. return nil, err
  1606. }
  1607. if config.CleanupIntervalSeconds, err = getInt32("cleanup_interval_seconds"); err != nil {
  1608. return nil, err
  1609. }
  1610. if config.TaskRetentionSeconds, err = getInt32("task_retention_seconds"); err != nil {
  1611. return nil, err
  1612. }
  1613. // Convert policy if present
  1614. if policyData, ok := jsonConfig["policy"]; ok {
  1615. if policyMap, ok := policyData.(map[string]interface{}); ok {
  1616. policy := &maintenance.MaintenancePolicy{}
  1617. if globalMaxConcurrent, err := getInt32FromMap(policyMap, "global_max_concurrent"); err != nil {
  1618. return nil, err
  1619. } else {
  1620. policy.GlobalMaxConcurrent = globalMaxConcurrent
  1621. }
  1622. if defaultRepeatIntervalSeconds, err := getInt32FromMap(policyMap, "default_repeat_interval_seconds"); err != nil {
  1623. return nil, err
  1624. } else {
  1625. policy.DefaultRepeatIntervalSeconds = defaultRepeatIntervalSeconds
  1626. }
  1627. if defaultCheckIntervalSeconds, err := getInt32FromMap(policyMap, "default_check_interval_seconds"); err != nil {
  1628. return nil, err
  1629. } else {
  1630. policy.DefaultCheckIntervalSeconds = defaultCheckIntervalSeconds
  1631. }
  1632. // Convert task policies if present
  1633. if taskPoliciesData, ok := policyMap["task_policies"]; ok {
  1634. if taskPoliciesMap, ok := taskPoliciesData.(map[string]interface{}); ok {
  1635. policy.TaskPolicies = make(map[string]*maintenance.TaskPolicy)
  1636. for taskType, taskPolicyData := range taskPoliciesMap {
  1637. if taskPolicyMap, ok := taskPolicyData.(map[string]interface{}); ok {
  1638. taskPolicy := &maintenance.TaskPolicy{}
  1639. taskPolicy.Enabled = getBoolFromMap(taskPolicyMap, "enabled")
  1640. if maxConcurrent, err := getInt32FromMap(taskPolicyMap, "max_concurrent"); err != nil {
  1641. return nil, err
  1642. } else {
  1643. taskPolicy.MaxConcurrent = maxConcurrent
  1644. }
  1645. if repeatIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "repeat_interval_seconds"); err != nil {
  1646. return nil, err
  1647. } else {
  1648. taskPolicy.RepeatIntervalSeconds = repeatIntervalSeconds
  1649. }
  1650. if checkIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "check_interval_seconds"); err != nil {
  1651. return nil, err
  1652. } else {
  1653. taskPolicy.CheckIntervalSeconds = checkIntervalSeconds
  1654. }
  1655. policy.TaskPolicies[taskType] = taskPolicy
  1656. }
  1657. }
  1658. }
  1659. }
  1660. config.Policy = policy
  1661. }
  1662. }
  1663. return config, nil
  1664. }
  1665. // Helper functions for map conversion
  1666. func getInt32FromMap(m map[string]interface{}, key string) (int32, error) {
  1667. if val, ok := m[key]; ok {
  1668. switch v := val.(type) {
  1669. case int:
  1670. return int32(v), nil
  1671. case int32:
  1672. return v, nil
  1673. case int64:
  1674. return int32(v), nil
  1675. case float64:
  1676. return int32(v), nil
  1677. default:
  1678. return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
  1679. }
  1680. }
  1681. return 0, nil
  1682. }
  1683. func getBoolFromMap(m map[string]interface{}, key string) bool {
  1684. if val, ok := m[key]; ok {
  1685. if b, ok := val.(bool); ok {
  1686. return b
  1687. }
  1688. }
  1689. return false
  1690. }