对于这种并发查询的超时处理,通常有以下几种处理策略,各有利弊:
1. 全部成功或全部失败策略
// 设置一个合理的超时时间
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
// 如果任何一个查询失败或超时,整个任务失败
for i := 0; i < totalQueries; i++ {
select {
case result := <-resultChan:
if result.err != nil {
return nil, fmt.Errorf("查询失败: %v", result.err)
}
// 处理成功的结果
case <-ctx.Done():
return nil, fmt.Errorf("查询超时")
}
}
- 优点:数据一致性好,不会出现部分数据的情况
- 缺点:一个查询失败就导致整个任务失败
2. 降级策略
type QueryResult struct {
Data interface{}
Err error
}
// 为每个查询设置默认值
defaultResults := map[string]interface{}{
"order": make(map[string]param.OrderStats),
"problem": make(map[string]param.ProblemStats),
// ... 其他默认值
}
results := make(map[string]interface{})
timeout := time.After(10 * time.Minute)
for i := 0; i < totalQueries; i++ {
select {
case result := <-resultChan:
if result.err != nil {
ecomm.Zlog.Error("查询失败,使用默认值",
zap.String("type", result.name),
zap.Error(result.err))
results[result.name] = defaultResults[result.name]
} else {
results[result.name] = result.data
}
case <-timeout:
// 超时后,未完成的查询使用默认值
ecomm.Zlog.Warn("部分查询超时,使用默认值")
for name, defaultValue := range defaultResults {
if _, exists := results[name]; !exists {
results[name] = defaultValue
}
}
goto ProcessResults
}
}
- 优点:系统可以继续运行,不会完全阻塞
- 缺点:数据可能不完整,需要明确标记哪些是默认值
3. 重试机制
func queryWithRetry(ctx context.Context, queryFunc func() (interface{}, error)) (interface{}, error) {
retries := 3
for i := 0; i < retries; i++ {
result, err := queryFunc()
if err == nil {
return result, nil
}
// 指数退避
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
continue
}
}
return nil, fmt.Errorf("达到最大重试次数")
}
// 使用
go func() {
result, err := queryWithRetry(ctx, func() (interface{}, error) {
return dbcpm.QueryOrderStatsRt(startTime, endTime)
})
resultChan <- queryResult{"order", result, err}
}()
- 优点:增加了成功的机会
- 缺点:可能会增加总体执行时间
4. 分批执行策略
// 将查询分组,每组执行完再执行下一组
queryGroups := [][]queryFunc{
{queryOrderStats, queryProblemStats}, // 优先级高的查询
{queryCompensStats, queryAppealStats}, // 次优先级查询
{queryLossStats, queryWyueStats}, // 其他查询
}
for _, group := range queryGroups {
results := make([]interface{}, len(group))
errChan := make(chan error, len(group))
// 执行当前组的所有查询
for i, query := range group {
go func(i int, q queryFunc) {
result, err := q()
if err != nil {
errChan <- err
return
}
results[i] = result
}(i, query)
}
// 等待当前组完成或超时
select {
case err := <-errChan:
// 处理错误
case <-time.After(2 * time.Minute):
// 处理超时
}
}
- 优点:可以控制并发量,优先保证重要数据的查询
- 缺点:总体执行时间可能会更长
5. 混合策略(推荐)
func (s *StDashboardRtTask) AssembleData() (map[string]param.ResDashboard, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
// 定义查询的优先级和重要性
queries := []struct {
name string
critical bool // 是否是关键查询
query func() (interface{}, error)
}{
{"order", true, func() (interface{}, error) {
return queryWithRetry(ctx, func() (interface{}, error) {
return dbcpm.QueryOrderStatsRt(s.StartHour, s.EndHour)
})
}},
{"problem", false, func() (interface{}, error) {
return dbcpm.QueryProblemStatsRt(s.StartHour, s.EndHour)
}},
// ... 其他查询
}
results := make(map[string]interface{})
var criticalError error
for _, q := range queries {
select {
case <-ctx.Done():
if len(results) == 0 {
return nil, fmt.Errorf("所有查询都超时")
}
goto ProcessResults
default:
result, err := q.query()
if err != nil {
if q.critical {
criticalError = err
break
}
// 非关键查询失败,使用默认值
results[q.name] = getDefaultValue(q.name)
continue
}
results[q.name] = result
}
}
if criticalError != nil {
return nil, criticalError
}
ProcessResults:
return buildResult(results), nil
}
这个混合策略的特点是:
- 区分关键查询和非关键查询
- 关键查询失败会导致整个任务失败
- 非关键查询失败会使用默认值
- 为关键查询添加重试机制
- 设置合理的超时时间
- 提供清晰的日志记录和监控点
这样既保证了核心数据的可靠性,又不会因为非核心数据的问题导致整个任务失败。同时,通过日志和监控,我们可以及时发现和处理问题。
你觉得这些方案中哪个更适合你的场景?我们可以根据你的具体需求来调整和优化。