client.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944
  1. package worker
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  12. "google.golang.org/grpc"
  13. )
  14. // GrpcAdminClient implements AdminClient using gRPC bidirectional streaming
  15. type GrpcAdminClient struct {
  16. adminAddress string
  17. workerID string
  18. dialOption grpc.DialOption
  19. conn *grpc.ClientConn
  20. client worker_pb.WorkerServiceClient
  21. stream worker_pb.WorkerService_WorkerStreamClient
  22. streamCtx context.Context
  23. streamCancel context.CancelFunc
  24. connected bool
  25. reconnecting bool
  26. shouldReconnect bool
  27. mutex sync.RWMutex
  28. // Reconnection parameters
  29. maxReconnectAttempts int
  30. reconnectBackoff time.Duration
  31. maxReconnectBackoff time.Duration
  32. reconnectMultiplier float64
  33. // Worker registration info for re-registration after reconnection
  34. lastWorkerInfo *types.WorkerData
  35. // Channels for communication
  36. outgoing chan *worker_pb.WorkerMessage
  37. incoming chan *worker_pb.AdminMessage
  38. responseChans map[string]chan *worker_pb.AdminMessage
  39. responsesMutex sync.RWMutex
  40. // Shutdown channel
  41. shutdownChan chan struct{}
  42. }
  43. // NewGrpcAdminClient creates a new gRPC admin client
  44. func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.DialOption) *GrpcAdminClient {
  45. // Admin uses HTTP port + 10000 as gRPC port
  46. grpcAddress := pb.ServerToGrpcAddress(adminAddress)
  47. return &GrpcAdminClient{
  48. adminAddress: grpcAddress,
  49. workerID: workerID,
  50. dialOption: dialOption,
  51. shouldReconnect: true,
  52. maxReconnectAttempts: 0, // 0 means infinite attempts
  53. reconnectBackoff: 1 * time.Second,
  54. maxReconnectBackoff: 30 * time.Second,
  55. reconnectMultiplier: 1.5,
  56. outgoing: make(chan *worker_pb.WorkerMessage, 100),
  57. incoming: make(chan *worker_pb.AdminMessage, 100),
  58. responseChans: make(map[string]chan *worker_pb.AdminMessage),
  59. shutdownChan: make(chan struct{}),
  60. }
  61. }
  62. // Connect establishes gRPC connection to admin server with TLS detection
  63. func (c *GrpcAdminClient) Connect() error {
  64. c.mutex.Lock()
  65. defer c.mutex.Unlock()
  66. if c.connected {
  67. return fmt.Errorf("already connected")
  68. }
  69. // Always start the reconnection loop, even if initial connection fails
  70. go c.reconnectionLoop()
  71. // Attempt initial connection
  72. err := c.attemptConnection()
  73. if err != nil {
  74. glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
  75. return err
  76. }
  77. return nil
  78. }
  79. // attemptConnection tries to establish the connection without managing the reconnection loop
  80. func (c *GrpcAdminClient) attemptConnection() error {
  81. // Detect TLS support and create appropriate connection
  82. conn, err := c.createConnection()
  83. if err != nil {
  84. return fmt.Errorf("failed to connect to admin server: %w", err)
  85. }
  86. c.conn = conn
  87. c.client = worker_pb.NewWorkerServiceClient(conn)
  88. // Create bidirectional stream
  89. c.streamCtx, c.streamCancel = context.WithCancel(context.Background())
  90. stream, err := c.client.WorkerStream(c.streamCtx)
  91. if err != nil {
  92. c.conn.Close()
  93. return fmt.Errorf("failed to create worker stream: %w", err)
  94. }
  95. c.stream = stream
  96. c.connected = true
  97. // Always check for worker info and send registration immediately as the very first message
  98. c.mutex.RLock()
  99. workerInfo := c.lastWorkerInfo
  100. c.mutex.RUnlock()
  101. if workerInfo != nil {
  102. // Send registration synchronously as the very first message
  103. if err := c.sendRegistrationSync(workerInfo); err != nil {
  104. c.conn.Close()
  105. c.connected = false
  106. return fmt.Errorf("failed to register worker: %w", err)
  107. }
  108. glog.Infof("Worker registered successfully with admin server")
  109. } else {
  110. // No worker info yet - stream will wait for registration
  111. glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
  112. }
  113. // Start stream handlers with synchronization
  114. outgoingReady := make(chan struct{})
  115. incomingReady := make(chan struct{})
  116. go c.handleOutgoingWithReady(outgoingReady)
  117. go c.handleIncomingWithReady(incomingReady)
  118. // Wait for both handlers to be ready
  119. <-outgoingReady
  120. <-incomingReady
  121. glog.Infof("Connected to admin server at %s", c.adminAddress)
  122. return nil
  123. }
  124. // createConnection attempts to connect using the provided dial option
  125. func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
  126. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  127. defer cancel()
  128. conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
  129. if err != nil {
  130. return nil, fmt.Errorf("failed to connect to admin server: %w", err)
  131. }
  132. glog.Infof("Connected to admin server at %s", c.adminAddress)
  133. return conn, nil
  134. }
  135. // Disconnect closes the gRPC connection
  136. func (c *GrpcAdminClient) Disconnect() error {
  137. c.mutex.Lock()
  138. defer c.mutex.Unlock()
  139. if !c.connected {
  140. return nil
  141. }
  142. c.connected = false
  143. c.shouldReconnect = false
  144. // Send shutdown signal to stop reconnection loop
  145. select {
  146. case c.shutdownChan <- struct{}{}:
  147. default:
  148. }
  149. // Send shutdown message
  150. shutdownMsg := &worker_pb.WorkerMessage{
  151. WorkerId: c.workerID,
  152. Timestamp: time.Now().Unix(),
  153. Message: &worker_pb.WorkerMessage_Shutdown{
  154. Shutdown: &worker_pb.WorkerShutdown{
  155. WorkerId: c.workerID,
  156. Reason: "normal shutdown",
  157. },
  158. },
  159. }
  160. select {
  161. case c.outgoing <- shutdownMsg:
  162. case <-time.After(time.Second):
  163. glog.Warningf("Failed to send shutdown message")
  164. }
  165. // Cancel stream context
  166. if c.streamCancel != nil {
  167. c.streamCancel()
  168. }
  169. // Close stream
  170. if c.stream != nil {
  171. c.stream.CloseSend()
  172. }
  173. // Close connection
  174. if c.conn != nil {
  175. c.conn.Close()
  176. }
  177. // Close channels
  178. close(c.outgoing)
  179. close(c.incoming)
  180. glog.Infof("Disconnected from admin server")
  181. return nil
  182. }
  183. // reconnectionLoop handles automatic reconnection with exponential backoff
  184. func (c *GrpcAdminClient) reconnectionLoop() {
  185. backoff := c.reconnectBackoff
  186. attempts := 0
  187. for {
  188. select {
  189. case <-c.shutdownChan:
  190. return
  191. default:
  192. }
  193. c.mutex.RLock()
  194. shouldReconnect := c.shouldReconnect && !c.connected && !c.reconnecting
  195. c.mutex.RUnlock()
  196. if !shouldReconnect {
  197. time.Sleep(time.Second)
  198. continue
  199. }
  200. c.mutex.Lock()
  201. c.reconnecting = true
  202. c.mutex.Unlock()
  203. glog.Infof("Attempting to reconnect to admin server (attempt %d)", attempts+1)
  204. // Attempt to reconnect
  205. if err := c.reconnect(); err != nil {
  206. attempts++
  207. glog.Errorf("Reconnection attempt %d failed: %v", attempts, err)
  208. // Reset reconnecting flag
  209. c.mutex.Lock()
  210. c.reconnecting = false
  211. c.mutex.Unlock()
  212. // Check if we should give up
  213. if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts {
  214. glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts)
  215. c.mutex.Lock()
  216. c.shouldReconnect = false
  217. c.mutex.Unlock()
  218. return
  219. }
  220. // Wait with exponential backoff
  221. glog.Infof("Waiting %v before next reconnection attempt", backoff)
  222. select {
  223. case <-c.shutdownChan:
  224. return
  225. case <-time.After(backoff):
  226. }
  227. // Increase backoff
  228. backoff = time.Duration(float64(backoff) * c.reconnectMultiplier)
  229. if backoff > c.maxReconnectBackoff {
  230. backoff = c.maxReconnectBackoff
  231. }
  232. } else {
  233. // Successful reconnection
  234. attempts = 0
  235. backoff = c.reconnectBackoff
  236. glog.Infof("Successfully reconnected to admin server")
  237. c.mutex.Lock()
  238. c.reconnecting = false
  239. c.mutex.Unlock()
  240. }
  241. }
  242. }
  243. // reconnect attempts to re-establish the connection
  244. func (c *GrpcAdminClient) reconnect() error {
  245. // Clean up existing connection completely
  246. c.mutex.Lock()
  247. if c.streamCancel != nil {
  248. c.streamCancel()
  249. }
  250. if c.stream != nil {
  251. c.stream.CloseSend()
  252. }
  253. if c.conn != nil {
  254. c.conn.Close()
  255. }
  256. c.connected = false
  257. c.mutex.Unlock()
  258. // Attempt to re-establish connection using the same logic as initial connection
  259. err := c.attemptConnection()
  260. if err != nil {
  261. return fmt.Errorf("failed to reconnect: %w", err)
  262. }
  263. // Registration is now handled in attemptConnection if worker info is available
  264. return nil
  265. }
  266. // handleOutgoing processes outgoing messages to admin
  267. func (c *GrpcAdminClient) handleOutgoing() {
  268. for msg := range c.outgoing {
  269. c.mutex.RLock()
  270. connected := c.connected
  271. stream := c.stream
  272. c.mutex.RUnlock()
  273. if !connected {
  274. break
  275. }
  276. if err := stream.Send(msg); err != nil {
  277. glog.Errorf("Failed to send message to admin: %v", err)
  278. c.mutex.Lock()
  279. c.connected = false
  280. c.mutex.Unlock()
  281. break
  282. }
  283. }
  284. }
  285. // handleOutgoingWithReady processes outgoing messages and signals when ready
  286. func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) {
  287. // Signal that this handler is ready to process messages
  288. close(ready)
  289. // Now process messages normally
  290. c.handleOutgoing()
  291. }
  292. // handleIncoming processes incoming messages from admin
  293. func (c *GrpcAdminClient) handleIncoming() {
  294. glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID)
  295. for {
  296. c.mutex.RLock()
  297. connected := c.connected
  298. stream := c.stream
  299. c.mutex.RUnlock()
  300. if !connected {
  301. glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID)
  302. break
  303. }
  304. glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", c.workerID)
  305. msg, err := stream.Recv()
  306. if err != nil {
  307. if err == io.EOF {
  308. glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", c.workerID)
  309. } else {
  310. glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err)
  311. }
  312. c.mutex.Lock()
  313. c.connected = false
  314. c.mutex.Unlock()
  315. break
  316. }
  317. glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message)
  318. // Route message to waiting goroutines or general handler
  319. select {
  320. case c.incoming <- msg:
  321. glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID)
  322. case <-time.After(time.Second):
  323. glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message)
  324. }
  325. }
  326. glog.V(1).Infof("INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID)
  327. }
  328. // handleIncomingWithReady processes incoming messages and signals when ready
  329. func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
  330. // Signal that this handler is ready to process messages
  331. close(ready)
  332. // Now process messages normally
  333. c.handleIncoming()
  334. }
  335. // RegisterWorker registers the worker with the admin server
  336. func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
  337. // Store worker info for re-registration after reconnection
  338. c.mutex.Lock()
  339. c.lastWorkerInfo = worker
  340. c.mutex.Unlock()
  341. // If not connected, registration will happen when connection is established
  342. if !c.connected {
  343. glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
  344. return nil
  345. }
  346. return c.sendRegistration(worker)
  347. }
  348. // sendRegistration sends the registration message and waits for response
  349. func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
  350. capabilities := make([]string, len(worker.Capabilities))
  351. for i, cap := range worker.Capabilities {
  352. capabilities[i] = string(cap)
  353. }
  354. msg := &worker_pb.WorkerMessage{
  355. WorkerId: c.workerID,
  356. Timestamp: time.Now().Unix(),
  357. Message: &worker_pb.WorkerMessage_Registration{
  358. Registration: &worker_pb.WorkerRegistration{
  359. WorkerId: c.workerID,
  360. Address: worker.Address,
  361. Capabilities: capabilities,
  362. MaxConcurrent: int32(worker.MaxConcurrent),
  363. Metadata: make(map[string]string),
  364. },
  365. },
  366. }
  367. select {
  368. case c.outgoing <- msg:
  369. case <-time.After(5 * time.Second):
  370. return fmt.Errorf("failed to send registration message: timeout")
  371. }
  372. // Wait for registration response
  373. timeout := time.NewTimer(10 * time.Second)
  374. defer timeout.Stop()
  375. for {
  376. select {
  377. case response := <-c.incoming:
  378. if regResp := response.GetRegistrationResponse(); regResp != nil {
  379. if regResp.Success {
  380. glog.Infof("Worker registered successfully: %s", regResp.Message)
  381. return nil
  382. }
  383. return fmt.Errorf("registration failed: %s", regResp.Message)
  384. }
  385. case <-timeout.C:
  386. return fmt.Errorf("registration timeout")
  387. }
  388. }
  389. }
  390. // sendRegistrationSync sends the registration message synchronously
  391. func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
  392. capabilities := make([]string, len(worker.Capabilities))
  393. for i, cap := range worker.Capabilities {
  394. capabilities[i] = string(cap)
  395. }
  396. msg := &worker_pb.WorkerMessage{
  397. WorkerId: c.workerID,
  398. Timestamp: time.Now().Unix(),
  399. Message: &worker_pb.WorkerMessage_Registration{
  400. Registration: &worker_pb.WorkerRegistration{
  401. WorkerId: c.workerID,
  402. Address: worker.Address,
  403. Capabilities: capabilities,
  404. MaxConcurrent: int32(worker.MaxConcurrent),
  405. Metadata: make(map[string]string),
  406. },
  407. },
  408. }
  409. // Send directly to stream to ensure it's the first message
  410. if err := c.stream.Send(msg); err != nil {
  411. return fmt.Errorf("failed to send registration message: %w", err)
  412. }
  413. // Create a channel to receive the response
  414. responseChan := make(chan *worker_pb.AdminMessage, 1)
  415. errChan := make(chan error, 1)
  416. // Start a goroutine to listen for the response
  417. go func() {
  418. for {
  419. response, err := c.stream.Recv()
  420. if err != nil {
  421. errChan <- fmt.Errorf("failed to receive registration response: %w", err)
  422. return
  423. }
  424. if regResp := response.GetRegistrationResponse(); regResp != nil {
  425. responseChan <- response
  426. return
  427. }
  428. // Continue waiting if it's not a registration response
  429. }
  430. }()
  431. // Wait for registration response with timeout
  432. timeout := time.NewTimer(10 * time.Second)
  433. defer timeout.Stop()
  434. select {
  435. case response := <-responseChan:
  436. if regResp := response.GetRegistrationResponse(); regResp != nil {
  437. if regResp.Success {
  438. glog.V(1).Infof("Worker registered successfully: %s", regResp.Message)
  439. return nil
  440. }
  441. return fmt.Errorf("registration failed: %s", regResp.Message)
  442. }
  443. return fmt.Errorf("unexpected response type")
  444. case err := <-errChan:
  445. return err
  446. case <-timeout.C:
  447. return fmt.Errorf("registration timeout")
  448. }
  449. }
  450. // SendHeartbeat sends heartbeat to admin server
  451. func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
  452. if !c.connected {
  453. // If we're currently reconnecting, don't wait - just skip the heartbeat
  454. c.mutex.RLock()
  455. reconnecting := c.reconnecting
  456. c.mutex.RUnlock()
  457. if reconnecting {
  458. // Don't treat as an error - reconnection is in progress
  459. glog.V(2).Infof("Skipping heartbeat during reconnection")
  460. return nil
  461. }
  462. // Wait for reconnection for a short time
  463. if err := c.waitForConnection(10 * time.Second); err != nil {
  464. return fmt.Errorf("not connected to admin server: %w", err)
  465. }
  466. }
  467. taskIds := make([]string, len(status.CurrentTasks))
  468. for i, task := range status.CurrentTasks {
  469. taskIds[i] = task.ID
  470. }
  471. msg := &worker_pb.WorkerMessage{
  472. WorkerId: c.workerID,
  473. Timestamp: time.Now().Unix(),
  474. Message: &worker_pb.WorkerMessage_Heartbeat{
  475. Heartbeat: &worker_pb.WorkerHeartbeat{
  476. WorkerId: c.workerID,
  477. Status: status.Status,
  478. CurrentLoad: int32(status.CurrentLoad),
  479. MaxConcurrent: int32(status.MaxConcurrent),
  480. CurrentTaskIds: taskIds,
  481. TasksCompleted: int32(status.TasksCompleted),
  482. TasksFailed: int32(status.TasksFailed),
  483. UptimeSeconds: int64(status.Uptime.Seconds()),
  484. },
  485. },
  486. }
  487. select {
  488. case c.outgoing <- msg:
  489. return nil
  490. case <-time.After(time.Second):
  491. return fmt.Errorf("failed to send heartbeat: timeout")
  492. }
  493. }
  494. // RequestTask requests a new task from admin server
  495. func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
  496. if !c.connected {
  497. // If we're currently reconnecting, don't wait - just return no task
  498. c.mutex.RLock()
  499. reconnecting := c.reconnecting
  500. c.mutex.RUnlock()
  501. if reconnecting {
  502. // Don't treat as an error - reconnection is in progress
  503. glog.V(2).Infof("RECONNECTING: Worker %s skipping task request during reconnection", workerID)
  504. return nil, nil
  505. }
  506. // Wait for reconnection for a short time
  507. if err := c.waitForConnection(5 * time.Second); err != nil {
  508. return nil, fmt.Errorf("not connected to admin server: %w", err)
  509. }
  510. }
  511. caps := make([]string, len(capabilities))
  512. for i, cap := range capabilities {
  513. caps[i] = string(cap)
  514. }
  515. glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v",
  516. workerID, capabilities)
  517. msg := &worker_pb.WorkerMessage{
  518. WorkerId: c.workerID,
  519. Timestamp: time.Now().Unix(),
  520. Message: &worker_pb.WorkerMessage_TaskRequest{
  521. TaskRequest: &worker_pb.TaskRequest{
  522. WorkerId: c.workerID,
  523. Capabilities: caps,
  524. AvailableSlots: 1, // Request one task
  525. },
  526. },
  527. }
  528. select {
  529. case c.outgoing <- msg:
  530. glog.V(3).Infof("TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID)
  531. case <-time.After(time.Second):
  532. glog.Errorf("TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID)
  533. return nil, fmt.Errorf("failed to send task request: timeout")
  534. }
  535. // Wait for task assignment
  536. glog.V(3).Infof("WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID)
  537. timeout := time.NewTimer(5 * time.Second)
  538. defer timeout.Stop()
  539. for {
  540. select {
  541. case response := <-c.incoming:
  542. glog.V(3).Infof("RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message)
  543. if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
  544. glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)",
  545. workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
  546. // Convert to our task type
  547. task := &types.TaskInput{
  548. ID: taskAssign.TaskId,
  549. Type: types.TaskType(taskAssign.TaskType),
  550. Status: types.TaskStatusAssigned,
  551. VolumeID: taskAssign.Params.VolumeId,
  552. Server: getServerFromParams(taskAssign.Params),
  553. Collection: taskAssign.Params.Collection,
  554. Priority: types.TaskPriority(taskAssign.Priority),
  555. CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
  556. // Use typed protobuf parameters directly
  557. TypedParams: taskAssign.Params,
  558. }
  559. return task, nil
  560. } else {
  561. glog.V(3).Infof("NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message)
  562. }
  563. case <-timeout.C:
  564. glog.V(3).Infof("TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID)
  565. return nil, nil // No task available
  566. }
  567. }
  568. }
  569. // CompleteTask reports task completion to admin server
  570. func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
  571. return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil)
  572. }
  573. // CompleteTaskWithMetadata reports task completion with additional metadata
  574. func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
  575. if !c.connected {
  576. // If we're currently reconnecting, don't wait - just skip the completion report
  577. c.mutex.RLock()
  578. reconnecting := c.reconnecting
  579. c.mutex.RUnlock()
  580. if reconnecting {
  581. // Don't treat as an error - reconnection is in progress
  582. glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID)
  583. return nil
  584. }
  585. // Wait for reconnection for a short time
  586. if err := c.waitForConnection(5 * time.Second); err != nil {
  587. return fmt.Errorf("not connected to admin server: %w", err)
  588. }
  589. }
  590. taskComplete := &worker_pb.TaskComplete{
  591. TaskId: taskID,
  592. WorkerId: c.workerID,
  593. Success: success,
  594. ErrorMessage: errorMsg,
  595. CompletionTime: time.Now().Unix(),
  596. }
  597. // Add metadata if provided
  598. if metadata != nil {
  599. taskComplete.ResultMetadata = metadata
  600. }
  601. msg := &worker_pb.WorkerMessage{
  602. WorkerId: c.workerID,
  603. Timestamp: time.Now().Unix(),
  604. Message: &worker_pb.WorkerMessage_TaskComplete{
  605. TaskComplete: taskComplete,
  606. },
  607. }
  608. select {
  609. case c.outgoing <- msg:
  610. return nil
  611. case <-time.After(time.Second):
  612. return fmt.Errorf("failed to send task completion: timeout")
  613. }
  614. }
  615. // UpdateTaskProgress updates task progress to admin server
  616. func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
  617. if !c.connected {
  618. // If we're currently reconnecting, don't wait - just skip the progress update
  619. c.mutex.RLock()
  620. reconnecting := c.reconnecting
  621. c.mutex.RUnlock()
  622. if reconnecting {
  623. // Don't treat as an error - reconnection is in progress
  624. glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID)
  625. return nil
  626. }
  627. // Wait for reconnection for a short time
  628. if err := c.waitForConnection(5 * time.Second); err != nil {
  629. return fmt.Errorf("not connected to admin server: %w", err)
  630. }
  631. }
  632. msg := &worker_pb.WorkerMessage{
  633. WorkerId: c.workerID,
  634. Timestamp: time.Now().Unix(),
  635. Message: &worker_pb.WorkerMessage_TaskUpdate{
  636. TaskUpdate: &worker_pb.TaskUpdate{
  637. TaskId: taskID,
  638. WorkerId: c.workerID,
  639. Status: "in_progress",
  640. Progress: float32(progress),
  641. },
  642. },
  643. }
  644. select {
  645. case c.outgoing <- msg:
  646. return nil
  647. case <-time.After(time.Second):
  648. return fmt.Errorf("failed to send task progress: timeout")
  649. }
  650. }
  651. // IsConnected returns whether the client is connected
  652. func (c *GrpcAdminClient) IsConnected() bool {
  653. c.mutex.RLock()
  654. defer c.mutex.RUnlock()
  655. return c.connected
  656. }
  657. // IsReconnecting returns whether the client is currently attempting to reconnect
  658. func (c *GrpcAdminClient) IsReconnecting() bool {
  659. c.mutex.RLock()
  660. defer c.mutex.RUnlock()
  661. return c.reconnecting
  662. }
  663. // SetReconnectionSettings allows configuration of reconnection behavior
  664. func (c *GrpcAdminClient) SetReconnectionSettings(maxAttempts int, initialBackoff, maxBackoff time.Duration, multiplier float64) {
  665. c.mutex.Lock()
  666. defer c.mutex.Unlock()
  667. c.maxReconnectAttempts = maxAttempts
  668. c.reconnectBackoff = initialBackoff
  669. c.maxReconnectBackoff = maxBackoff
  670. c.reconnectMultiplier = multiplier
  671. }
  672. // StopReconnection stops the reconnection loop
  673. func (c *GrpcAdminClient) StopReconnection() {
  674. c.mutex.Lock()
  675. defer c.mutex.Unlock()
  676. c.shouldReconnect = false
  677. }
  678. // StartReconnection starts the reconnection loop
  679. func (c *GrpcAdminClient) StartReconnection() {
  680. c.mutex.Lock()
  681. defer c.mutex.Unlock()
  682. c.shouldReconnect = true
  683. }
  684. // waitForConnection waits for the connection to be established or timeout
  685. func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
  686. deadline := time.Now().Add(timeout)
  687. for time.Now().Before(deadline) {
  688. c.mutex.RLock()
  689. connected := c.connected
  690. shouldReconnect := c.shouldReconnect
  691. c.mutex.RUnlock()
  692. if connected {
  693. return nil
  694. }
  695. if !shouldReconnect {
  696. return fmt.Errorf("reconnection is disabled")
  697. }
  698. time.Sleep(100 * time.Millisecond)
  699. }
  700. return fmt.Errorf("timeout waiting for connection")
  701. }
  702. // GetIncomingChannel returns the incoming message channel for message processing
  703. // This allows the worker to process admin messages directly
  704. func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
  705. return c.incoming
  706. }
  707. // MockAdminClient provides a mock implementation for testing
  708. type MockAdminClient struct {
  709. workerID string
  710. connected bool
  711. tasks []*types.TaskInput
  712. mutex sync.RWMutex
  713. }
  714. // NewMockAdminClient creates a new mock admin client
  715. func NewMockAdminClient() *MockAdminClient {
  716. return &MockAdminClient{
  717. connected: true,
  718. tasks: make([]*types.TaskInput, 0),
  719. }
  720. }
  721. // Connect mock implementation
  722. func (m *MockAdminClient) Connect() error {
  723. m.mutex.Lock()
  724. defer m.mutex.Unlock()
  725. m.connected = true
  726. return nil
  727. }
  728. // Disconnect mock implementation
  729. func (m *MockAdminClient) Disconnect() error {
  730. m.mutex.Lock()
  731. defer m.mutex.Unlock()
  732. m.connected = false
  733. return nil
  734. }
  735. // RegisterWorker mock implementation
  736. func (m *MockAdminClient) RegisterWorker(worker *types.WorkerData) error {
  737. m.workerID = worker.ID
  738. glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities)
  739. return nil
  740. }
  741. // SendHeartbeat mock implementation
  742. func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
  743. glog.V(2).Infof("Mock: Heartbeat from worker %s, status: %s, load: %d/%d",
  744. workerID, status.Status, status.CurrentLoad, status.MaxConcurrent)
  745. return nil
  746. }
  747. // RequestTask mock implementation
  748. func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
  749. m.mutex.Lock()
  750. defer m.mutex.Unlock()
  751. if len(m.tasks) > 0 {
  752. task := m.tasks[0]
  753. m.tasks = m.tasks[1:]
  754. glog.Infof("Mock: Assigned task %s to worker %s", task.ID, workerID)
  755. return task, nil
  756. }
  757. // No tasks available
  758. return nil, nil
  759. }
  760. // CompleteTask mock implementation
  761. func (m *MockAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
  762. if success {
  763. glog.Infof("Mock: Task %s completed successfully", taskID)
  764. } else {
  765. glog.Infof("Mock: Task %s failed: %s", taskID, errorMsg)
  766. }
  767. return nil
  768. }
  769. // UpdateTaskProgress mock implementation
  770. func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
  771. glog.V(2).Infof("Mock: Task %s progress: %.1f%%", taskID, progress)
  772. return nil
  773. }
  774. // CompleteTaskWithMetadata mock implementation
  775. func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
  776. glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata)
  777. return nil
  778. }
  779. // IsConnected mock implementation
  780. func (m *MockAdminClient) IsConnected() bool {
  781. m.mutex.RLock()
  782. defer m.mutex.RUnlock()
  783. return m.connected
  784. }
  785. // AddMockTask adds a mock task for testing
  786. func (m *MockAdminClient) AddMockTask(task *types.TaskInput) {
  787. m.mutex.Lock()
  788. defer m.mutex.Unlock()
  789. m.tasks = append(m.tasks, task)
  790. }
  791. // CreateAdminClient creates an admin client with the provided dial option
  792. func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) {
  793. return NewGrpcAdminClient(adminServer, workerID, dialOption), nil
  794. }
  795. // getServerFromParams extracts server address from unified sources
  796. func getServerFromParams(params *worker_pb.TaskParams) string {
  797. if len(params.Sources) > 0 {
  798. return params.Sources[0].Node
  799. }
  800. return ""
  801. }