Skip to content

Examples

Basic tasker setup

Demonstrates the fundamental setup for tasker using a simple CalculatorResource with two base workers.

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/asaidimu/tasker/v2"
)

// CalculatorResource represents a simple resource,
// in this case, just a placeholder.
type CalculatorResource struct{}

// onCreate for CalculatorResource - no actual setup needed
func createCalcResource() (*CalculatorResource, error) {
	fmt.Println("INFO: Creating CalculatorResource")
	return &CalculatorResource{}, nil
}

// onDestroy for CalculatorResource - no actual cleanup needed
func destroyCalcResource(r *CalculatorResource) error {
	fmt.Println("INFO: Destroying CalculatorResource")
	return nil
}

func main() {
	fmt.Println("--- Basic Usage: Simple Calculator ---")

	ctx := context.Background()

	// Configure the tasker for our CalculatorResource
	config := tasker.Config[*CalculatorResource]{
		OnCreate:    createCalcResource,
		OnDestroy:   destroyCalcResource,
		WorkerCount: 2, // Two base workers
		Ctx:         ctx,
		// No specific health check or burst settings for this basic example
	}

	// Create a new task manager
	manager, err := tasker.NewTaskManager[*CalculatorResource, int](config) // Tasks will return an int result
	if err != nil {
		log.Fatalf("Error creating task manager: %v", err)
	}
	defer manager.Stop() // Ensure the manager is stopped gracefully

	fmt.Println("Queuing a simple Multiplication task...")
	task1Start := time.Now()
	// Queue a task to perform addition
	go func() {
		sum, err := manager.QueueTask(func(ctx context.Context, r *CalculatorResource) (int, error) {
			// In a real scenario, 'r' could be a connection to a math service
			time.Sleep(50 * time.Millisecond) // Simulate some work
			a, b := 10, 25
			fmt.Printf("Worker processing: %d * %d\n", a, b)
			return a * b, nil
		})

		if err != nil {
			fmt.Printf("Task 1 failed: %v\n", err)
		} else {
			fmt.Printf("Task 1 (Multiplication) Result: %d (took %s)\n", sum, time.Since(task1Start))
		}
	}()

	fmt.Println("Queuing another addition task...")
	task2Start := time.Now()
	manager.QueueTaskWithCallback(
		func(ctx context.Context, r *CalculatorResource) (int, error) {
			time.Sleep(50 * time.Millisecond) // Simulate some work
			a, b := 10, 25
			fmt.Printf("Worker processing: %d + %d\n", a, b)
			return a + b, nil
		},
		func(sum int, err error) { // do something with the results
			if err != nil {
				fmt.Printf("Task 2 failed: %v\n", err)
			} else {
				fmt.Printf("Task 2 (Addition) Result: %d (took %s)\n", sum, time.Since(task2Start))
			}
		},
	)

	fmt.Println("Queuing another subtraction task...")

	task3Start := time.Now()
	differencech, errch := manager.QueueTaskAsync(func(ctx context.Context, r *CalculatorResource) (int, error) {
		time.Sleep(70 * time.Millisecond) // Simulate some work
		a, b := 100, 40
		fmt.Printf("Worker processing: %d - %d\n", a, b)
		return a - b, nil
	})

	difference := <-differencech
	err = <-errch

	if err != nil {
		fmt.Printf("Task 3 failed: %v\n", err)
	} else {
		fmt.Printf("Task 3 (Subtraction) Result: %d (took %s)\n", difference, time.Since(task3Start))
	}

	// Allow some time for tasks to complete
	time.Sleep(500 * time.Millisecond)

	stats := manager.Stats()
	fmt.Printf("\n--- Current Stats ---\n")
	fmt.Printf("Active Workers: %d\n", stats.ActiveWorkers)
	fmt.Printf("Queued Tasks: %d\n", stats.QueuedTasks)
	fmt.Printf("Available Resources: %d\n", stats.AvailableResources)
	fmt.Println("----------------------")

	fmt.Println("Basic usage example finished.")
}

Output log shows 'Creating CalculatorResource' twice, tasks being processed, and correct calculation results. Final stats show 2 active workers and 0 queued tasks.

Related Methods: NewTaskManager, QueueTask, QueueTaskWithCallback, QueueTaskAsync, Stats, Stop


Custom health check

Demonstrates defining a custom health check function to identify unhealthy worker/resource states and trigger worker replacement and task retries.

go
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/asaidimu/tasker/v2"
)

// ImageProcessor represents a resource for image manipulation.
type ImageProcessor struct {
	ID        int
	IsHealthy bool
}

// onCreate for ImageProcessor: simulates creating a connection to an image processing service.
func createImageProcessor() (*ImageProcessor, error) {
	id := rand.Intn(1000)
	fmt.Printf("INFO: Creating ImageProcessor %d\n", id)
	return &ImageProcessor{ID: id, IsHealthy: true}, nil
}

// onDestroy for ImageProcessor: simulates closing the connection.
func destroyImageProcessor(p *ImageProcessor) error {
	fmt.Printf("INFO: Destroying ImageProcessor %d\n", p.ID)
	return nil
}

// checkImageProcessorHealth: Custom health check.
// If the error is "processor_crash", consider the worker/resource unhealthy.
func checkImageProcessorHealth(err error) bool {
	if err != nil && err.Error() == "processor_crash" {
		fmt.Printf("WARN: Detected unhealthy error: %v. Worker will be replaced.\n", err)
		return false // This error indicates an unhealthy state
	}
	return true // Other errors are just task failures, not worker health issues
}

func main() {
	fmt.Println("\n--- Intermediate Usage: Image Processing ---")

	ctx := context.Background()

	config := tasker.Config[*ImageProcessor]{
		OnCreate:         createImageProcessor,
		OnDestroy:        destroyImageProcessor,
		WorkerCount:      2,
		Ctx:              ctx,
		CheckHealth:      checkImageProcessorHealth, // Use custom health check
		MaxRetries:       1,
		ResourcePoolSize: 1,
	}

	manager, err := tasker.NewTaskManager[*ImageProcessor, string](config)
	if err != nil {
		log.Fatalf("Error creating task manager: %v", err)
	}
	defer manager.Stop()

	fmt.Println("Queueing a task that might crash a worker (unhealthy error)...")
	go func() {
		result, err := manager.QueueTask(func(ctx context.Context, proc *ImageProcessor) (string, error) {
			fmt.Printf("Worker %d processing problematic image (might crash)\n", proc.ID)
			time.Sleep(100 * time.Millisecond)
			if rand.Intn(2) == 0 {
				return "", errors.New("processor_crash") // This triggers CheckHealth to return false
			}
			return "problematic_image_processed.jpg", nil
		})
		if err != nil {
			fmt.Printf("Problematic Image Task Failed: %v\n", err)
		} else {
			fmt.Printf("Problematic Image Task Completed: %s\n", result)
		}
	}()

	time.Sleep(500 * time.Millisecond)
}

When a task returns "processor_crash" error, the log should show 'WARN: Detected unhealthy error: processor_crash. Worker will be replaced.' followed by 'INFO: Destroying ImageProcessor [ID]' and 'INFO: Creating ImageProcessor [New ID]', indicating worker replacement. The task may be retried or fail with 'max retries exceeded'.

Related Methods: QueueTask

Related Errors: processor_crash, max retries exceeded


Graceful shutdown

Demonstrates initiating a graceful shutdown of the TaskManager using Stop(), ensuring all pending tasks are completed before resources are released.

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/asaidimu/tasker/v2"
)

type HeavyComputeResource struct{ ID int }
func createComputeResource() (*HeavyComputeResource, error) { fmt.Printf("INFO: Creating ComputeResource\n"); return &HeavyComputeResource{ID: 1}, nil }
func destroyComputeResource(r *HeavyComputeResource) error { fmt.Printf("INFO: Destroying ComputeResource\n"); return nil }

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	config := tasker.Config[*HeavyComputeResource]{
		OnCreate:    createComputeResource,
		OnDestroy:   destroyComputeResource,
		WorkerCount: 2,
		Ctx:         ctx,
	}

	manager, err := tasker.NewTaskManager[*HeavyComputeResource, string](config)
	if err != nil {
		log.Fatalf("Error creating task manager: %v", err)
	}

	// Queue some tasks
	for i := 0; i < 5; i++ {
		taskID := i
		go func() {
			_, _ = manager.QueueTask(func(ctx context.Context, res *HeavyComputeResource) (string, error) {
				fmt.Printf("Worker %d processing Task %d\n", res.ID, taskID)
				time.Sleep(100 * time.Millisecond)
				return fmt.Sprintf("Task %d completed", taskID), nil
			})
		}()
	}

	// Allow some tasks to start, then initiate graceful shutdown
	time.Sleep(200 * time.Millisecond)
	fmt.Println("\nInitiating graceful shutdown...")
	err = manager.Stop()
	if err != nil {
		fmt.Printf("Error during graceful shutdown: %v\n", err)
	} else {
		fmt.Println("Task manager gracefully shut down.")
	}
	fmt.Printf("Final Active Workers: %d\n", manager.Stats().ActiveWorkers)
}

All 5 tasks submitted are eventually reported as 'completed'. Logs indicate workers are destroyed only after tasks finish. Final 'Active Workers' should be 0. Task manager gracefully shut down. should be printed.

Related Methods: Stop

Related Errors: task manager already stopping or killed


Immediate shutdown

Demonstrates initiating an immediate shutdown of the TaskManager using Kill(), which cancels active tasks and drops queued ones without waiting.

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/asaidimu/tasker/v2"
)

type HeavyComputeResource struct{ ID int }
func createComputeResource() (*HeavyComputeResource, error) { fmt.Printf("INFO: Creating ComputeResource\n"); return &HeavyComputeResource{ID: 1}, nil }
func destroyComputeResource(r *HeavyComputeResource) error { fmt.Printf("INFO: Destroying ComputeResource\n"); return nil }

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	config := tasker.Config[*HeavyComputeResource]{
		OnCreate:    createComputeResource,
		OnDestroy:   destroyComputeResource,
		WorkerCount: 2,
		Ctx:         ctx,
	}

	manager, err := tasker.NewTaskManager[*HeavyComputeResource, string](config)
	if err != nil {
		log.Fatalf("Error creating task manager: %v", err)
	}

	// Queue a long-running task
	go func() {
		_, _ = manager.QueueTask(func(ctx context.Context, res *HeavyComputeResource) (string, error) {
			fmt.Printf("Worker %d processing long Task...\n", res.ID)
			select {
			case <-ctx.Done():
				fmt.Printf("Task on Worker %d cancelled due to shutdown.\n", res.ID)
				return "", ctx.Err()
			case <-time.After(5 * time.Second): // Simulate long work
				return "long task completed", nil
			}
		})
	}()

	// Immediately kill the manager
	time.Sleep(50 * time.Millisecond)
	fmt.Println("\nInitiating immediate shutdown (Kill)...")
	err = manager.Kill()
	if err != nil {
		fmt.Printf("Error during immediate shutdown: %v\n", err)
	} else {
		fmt.Println("Task manager immediately shut down.")
	}
	fmt.Printf("Final Active Workers: %d\n", manager.Stats().ActiveWorkers)
}

The long-running task should be reported as 'cancelled'. Logs should show workers being destroyed immediately. Final 'Active Workers' should be 0. Task manager immediately shut down. should be printed.

Related Methods: Kill

Related Errors: task manager already killed


Get live stats

Demonstrates how to retrieve real-time operational statistics of the TaskManager.

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/asaidimu/tasker/v2"
)

type CalcResource struct{}
func createCalcResource() (*CalcResource, error) { return &CalcResource{}, nil }
func destroyCalcResource(r *CalcResource) error { return nil }

func main() {
	ctx := context.Background()
	config := tasker.Config[*CalcResource]{
		OnCreate:    createCalcResource,
		OnDestroy:   destroyCalcResource,
		WorkerCount: 1,
		Ctx:         ctx,
	}
	manager, err := tasker.NewTaskManager[*CalcResource, int](config)
	if err != nil {
		log.Fatalf("Error creating manager: %v", err)
	}
	defer manager.Stop()

	// Queue a task to keep a worker busy
	go func() {
		_, _ = manager.QueueTask(func(ctx context.Context, r *CalcResource) (int, error) {
			time.Sleep(200 * time.Millisecond)
			return 0, nil
		})
	}()

	time.Sleep(50 * time.Millisecond) // Allow worker to pick up task
	stats := manager.Stats()
	fmt.Printf("Current Stats: Active Workers: %d, Queued Tasks: %d, Available Resources: %d\n",
		stats.ActiveWorkers, stats.QueuedTasks, stats.AvailableResources)

	time.Sleep(200 * time.Millisecond) // Wait for task to complete
	stats = manager.Stats()
	fmt.Printf("Stats after task completion: Active Workers: %d, Queued Tasks: %d, Available Resources: %d\n",
		stats.ActiveWorkers, stats.QueuedTasks, stats.AvailableResources)
}

Output shows 'Active Workers: 1' and 'Queued Tasks: 0' initially (if task immediately picked up), and then 'Active Workers: 1' and 'Queued Tasks: 0' after task completion. The 'Available Resources' should reflect pool state.

Related Methods: Stats


Get performance metrics

Demonstrates how to retrieve comprehensive performance metrics from the TaskManager.

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/asaidimu/tasker/v2"
)

type CalcResource struct{}
func createCalcResource() (*CalcResource, error) { return &CalcResource{}, nil }
func destroyCalcResource(r *CalcResource) error { return nil }

func main() {
	ctx := context.Background()
	config := tasker.Config[*CalcResource]{
		OnCreate:    createCalcResource,
		OnDestroy:   destroyCalcResource,
		WorkerCount: 1,
		Ctx:         ctx,
	}
	manager, err := tasker.NewTaskManager[*CalcResource, int](config)
	if err != nil {
		log.Fatalf("Error creating manager: %v", err)
	}
	defer manager.Stop()

	// Queue multiple tasks to generate metrics data
	for i := 0; i < 10; i++ {
		go func() {
			_, _ = manager.QueueTask(func(ctx context.Context, r *CalcResource) (int, error) {
				time.Sleep(time.Duration(10 + i*5) * time.Millisecond) // Varying work
				return 0, nil
			})
		}()
	}

	time.Sleep(500 * time.Millisecond) // Allow tasks to process

	metrics := manager.Metrics()
	fmt.Printf("\n--- Performance Metrics ---\n")
	fmt.Printf("Total Tasks Completed: %d\n", metrics.TotalTasksCompleted)
	fmt.Printf("Average Execution Time: %v\n", metrics.AverageExecutionTime)
	fmt.Printf("P95 Execution Time: %v\n", metrics.P95ExecutionTime)
	fmt.Printf("Task Completion Rate: %.2f/sec\n", metrics.TaskCompletionRate)
	fmt.Printf("Success Rate: %.2f\n", metrics.SuccessRate)
	fmt.Println("-----------------------------")
}

Output shows various metrics like Total Tasks Completed, Average Execution Time, P95 Execution Time, Task Completion Rate, and Success Rate. Values should be non-zero after tasks have completed.

Related Methods: Metrics


Queue and wait

Template for queuing a task to the main queue and blocking the caller until its result is available.

go
result, err := manager.QueueTask(func(ctx context.Context, res *MyResource) (string, error) {
    // Task logic here
    return "task_done", nil
})
if err != nil {
    fmt.Printf("Task failed: %v\n", err)
} else {
    fmt.Printf("Task completed with result: %s\n", result)
}

The result variable holds the correct task output, and err is nil if the task was successful. The fmt.Printf will execute after the task is truly finished.

Related Methods: QueueTask


Queue with callback

Template for queuing a task to the main queue and receiving its result asynchronously via a callback function.

go
manager.QueueTaskWithCallback(func(ctx context.Context, res *MyResource) (string, error) {
    // Task logic here
    return "callback_task_done", nil
}, func(result string, err error) {
    if err != nil {
        fmt.Printf("Callback task failed: %v\n", err)
    } else {
        fmt.Printf("Callback task completed with result: %s\n", result)
    }
})
fmt.Println("Submitted callback task, not blocking.")

The line 'Submitted callback task, not blocking.' prints immediately. The callback function prints its result message later, once the task is processed.

Related Methods: QueueTaskWithCallback


Queue asynchronously with channels

Template for queuing a task to the main queue and receiving its result and error via Go channels, allowing non-blocking submission and flexible result consumption.

go
resultChan, errChan := manager.QueueTaskAsync(func(ctx context.Context, res *MyResource) (string, error) {
    // Task logic here
    return "channel_task_done", nil
})
fmt.Println("Submitted async channel task, not blocking.")

go func() {
    result := <-resultChan
    err := <-errChan
    if err != nil {
        fmt.Printf("Async channel task failed: %v\n", err)
    } else {
        fmt.Printf("Async channel task completed with result: %s\n", result)
    }
}()

The line 'Submitted async channel task, not blocking.' prints immediately. The result/error message from the goroutine receiving from channels prints later, once the task is processed.

Related Methods: QueueTaskAsync


Released under the MIT License.