news 2026/6/15 15:12:12

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务率先运行(一)

假设现在有一种场景,在一个任务接收器中,源源不断且不知道任务发送者何时会将新任务发送过来,每个任务都具备不同的任务优先级,任务无时无刻的进入任务缓冲池,目的是把任务缓冲池中优先级最高的那个任务挑选出来最先运行。

import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.runBlocking import java.util.UUID fun main() { val myThreadPool = newFixedThreadPoolContext(4, "my-thread") val bufferCapacity = 5 val totalTaskSize = 15 val channel = Channel<TaskInfo>() val taskList = mutableListOf<TaskInfo>() runBlocking { //接收任务 async { channel.receiveAsFlow() .buffer(bufferCapacity) .onEach { it -> //生产者 println("onEach $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") taskList.add(it) }.flowOn(myThreadPool) .collect { it -> //消费者 println("collect $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") val newOrderList = taskList.sortedBy { it.priority } newOrderList.forEach { print("${it.priority} ") } val lastTaskInfo = newOrderList.lastOrNull() println("\n最大优先级任务:$lastTaskInfo") taskList.remove(lastTaskInfo) loader(lastTaskInfo!!) } } //源源不断的密集发送加载任务。 async { repeat(totalTaskSize) { it -> enqueue(channel, it) } } } } private suspend fun enqueue(channel: Channel<TaskInfo>, id: Int) { val taskInfo = TaskInfo(id, (Math.random() * 9999).toInt()) println("enqueue $taskInfo") channel.send(taskInfo) } //假设这里是真正的耗时任务执行体 private suspend fun loader(info: TaskInfo) { println("load start $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") delay(500) println("load end $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") } private class TaskInfo { var id = 0 var priority = 0 private val taskId = UUID.randomUUID() constructor(id: Int, priority: Int) { this.id = id this.priority = priority } override fun equals(other: Any?): Boolean { return taskId == (other as TaskInfo).taskId } override fun toString(): String { return "TaskInfo(id=$id, priority=$priority)" } }

输出:

enqueue TaskInfo(id=0, priority=7947)
enqueue TaskInfo(id=1, priority=1045)
enqueue TaskInfo(id=2, priority=4478)
onEach TaskInfo(id=0, priority=7947) at time=1765979341859 my-thread-2
onEach TaskInfo(id=1, priority=1045) at time=1765979341859 my-thread-2
onEach TaskInfo(id=2, priority=4478) at time=1765979341859 my-thread-2
enqueue TaskInfo(id=3, priority=5964)
enqueue TaskInfo(id=4, priority=2658)
onEach TaskInfo(id=3, priority=5964) at time=1765979341859 my-thread-4
onEach TaskInfo(id=4, priority=2658) at time=1765979341859 my-thread-4
enqueue TaskInfo(id=5, priority=3495)
onEach TaskInfo(id=5, priority=3495) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=6, priority=1461)
onEach TaskInfo(id=6, priority=1461) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=7, priority=4860)
onEach TaskInfo(id=7, priority=4860) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=8, priority=7226)
onEach TaskInfo(id=8, priority=7226) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=9, priority=1939)
enqueue TaskInfo(id=10, priority=133)
onEach TaskInfo(id=9, priority=1939) at time=1765979341861 my-thread-3
onEach TaskInfo(id=10, priority=133) at time=1765979341861 my-thread-3
enqueue TaskInfo(id=11, priority=1818)
enqueue TaskInfo(id=12, priority=7695)
onEach TaskInfo(id=11, priority=1818) at time=1765979341861 my-thread-2
onEach TaskInfo(id=12, priority=7695) at time=1765979341861 my-thread-2
enqueue TaskInfo(id=13, priority=4365)
onEach TaskInfo(id=13, priority=4365) at time=1765979341862 my-thread-4
enqueue TaskInfo(id=14, priority=4889)
onEach TaskInfo(id=14, priority=4889) at time=1765979341862 my-thread-2
collect TaskInfo(id=0, priority=7947) at time=1765979341862 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695 7947
最大优先级任务:TaskInfo(id=0, priority=7947)
load start TaskInfo(id=0, priority=7947) @time=1765979341887 main
load end TaskInfo(id=0, priority=7947) @time=1765979342391 main
collect TaskInfo(id=1, priority=1045) at time=1765979342392 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695
最大优先级任务:TaskInfo(id=12, priority=7695)
load start TaskInfo(id=12, priority=7695) @time=1765979342392 main
load end TaskInfo(id=12, priority=7695) @time=1765979342901 main
collect TaskInfo(id=2, priority=4478) at time=1765979342901 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226
最大优先级任务:TaskInfo(id=8, priority=7226)
load start TaskInfo(id=8, priority=7226) @time=1765979342902 main
load end TaskInfo(id=8, priority=7226) @time=1765979343412 main
collect TaskInfo(id=3, priority=5964) at time=1765979343412 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964
最大优先级任务:TaskInfo(id=3, priority=5964)
load start TaskInfo(id=3, priority=5964) @time=1765979343412 main
load end TaskInfo(id=3, priority=5964) @time=1765979343922 main
collect TaskInfo(id=4, priority=2658) at time=1765979343922 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889
最大优先级任务:TaskInfo(id=14, priority=4889)
load start TaskInfo(id=14, priority=4889) @time=1765979343923 main
load end TaskInfo(id=14, priority=4889) @time=1765979344433 main
collect TaskInfo(id=5, priority=3495) at time=1765979344433 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860
最大优先级任务:TaskInfo(id=7, priority=4860)
load start TaskInfo(id=7, priority=4860) @time=1765979344434 main
load end TaskInfo(id=7, priority=4860) @time=1765979344943 main
collect TaskInfo(id=6, priority=1461) at time=1765979344943 main
133 1045 1461 1818 1939 2658 3495 4365 4478
最大优先级任务:TaskInfo(id=2, priority=4478)
load start TaskInfo(id=2, priority=4478) @time=1765979344943 main
load end TaskInfo(id=2, priority=4478) @time=1765979345452 main
collect TaskInfo(id=7, priority=4860) at time=1765979345452 main
133 1045 1461 1818 1939 2658 3495 4365
最大优先级任务:TaskInfo(id=13, priority=4365)
load start TaskInfo(id=13, priority=4365) @time=1765979345452 main
load end TaskInfo(id=13, priority=4365) @time=1765979345960 main
collect TaskInfo(id=8, priority=7226) at time=1765979345960 main
133 1045 1461 1818 1939 2658 3495
最大优先级任务:TaskInfo(id=5, priority=3495)
load start TaskInfo(id=5, priority=3495) @time=1765979345960 main
load end TaskInfo(id=5, priority=3495) @time=1765979346467 main
collect TaskInfo(id=9, priority=1939) at time=1765979346467 main
133 1045 1461 1818 1939 2658
最大优先级任务:TaskInfo(id=4, priority=2658)
load start TaskInfo(id=4, priority=2658) @time=1765979346467 main
load end TaskInfo(id=4, priority=2658) @time=1765979346973 main
collect TaskInfo(id=10, priority=133) at time=1765979346973 main
133 1045 1461 1818 1939
最大优先级任务:TaskInfo(id=9, priority=1939)
load start TaskInfo(id=9, priority=1939) @time=1765979346974 main
load end TaskInfo(id=9, priority=1939) @time=1765979347482 main
collect TaskInfo(id=11, priority=1818) at time=1765979347482 main
133 1045 1461 1818
最大优先级任务:TaskInfo(id=11, priority=1818)
load start TaskInfo(id=11, priority=1818) @time=1765979347483 main
load end TaskInfo(id=11, priority=1818) @time=1765979347986 main
collect TaskInfo(id=12, priority=7695) at time=1765979347986 main
133 1045 1461
最大优先级任务:TaskInfo(id=6, priority=1461)
load start TaskInfo(id=6, priority=1461) @time=1765979347987 main
load end TaskInfo(id=6, priority=1461) @time=1765979348498 main
collect TaskInfo(id=13, priority=4365) at time=1765979348498 main
133 1045
最大优先级任务:TaskInfo(id=1, priority=1045)
load start TaskInfo(id=1, priority=1045) @time=1765979348498 main
load end TaskInfo(id=1, priority=1045) @time=1765979349006 main
collect TaskInfo(id=14, priority=4889) at time=1765979349006 main
133
最大优先级任务:TaskInfo(id=10, priority=133)
load start TaskInfo(id=10, priority=133) @time=1765979349007 main
load end TaskInfo(id=10, priority=133) @time=1765979349513 main

相关:

https://blog.csdn.net/zhangphil/article/details/154843029

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/15 12:55:58

小白从零开始勇闯人工智能:爬虫初级篇(Selenium库)

引言在人工智能时代&#xff0c;获取数据是第一步。爬虫技术能帮我们打开网页背后的信息。对于初学者来说&#xff0c;Selenium是一个非常友好的入门选择——它不仅能获取网页数据&#xff0c;还能模拟真实用户的操作&#xff0c;比如点击按钮、输入文字、上传文件等。我们之前…

作者头像 李华
网站建设 2026/6/15 12:39:06

23、Linux 控制台操作与菜单创建全解析

Linux 控制台操作与菜单创建全解析 1. 控制台键盘模式 控制台键盘有三种主要模式: - Keycode(或 MEDIUMRAW)模式 :脚本读取代表控制台驱动对按键解释的数字代码。通常会返回两个键码,一个是按键按下时的,另一个是按键释放时的。不同的按键有不同的键码,例如,按下左…

作者头像 李华
网站建设 2026/6/15 14:17:01

Wan2.1视频生成模型:从入门到精通的完整指南

Wan2.1视频生成模型&#xff1a;从入门到精通的完整指南 【免费下载链接】Wan2.1-I2V-14B-480P-StepDistill-CfgDistill-Lightx2v 项目地址: https://ai.gitcode.com/hf_mirrors/lightx2v/Wan2.1-I2V-14B-480P-StepDistill-CfgDistill-Lightx2v Wan2.1-I2V-14B-480P-St…

作者头像 李华
网站建设 2026/6/14 23:09:16

微信小程序开发云函数锁定状态解决

微信小程序开发&#xff0c;云函数重新更新时报错说状态仍在更新中&#xff0c;不能重新进行安装配置&#xff0c;但已经确保前一次配置失败&#xff0c;解决方法&#xff1a; 1. 静置30min&#xff0c;等待自动恢复。一般而言&#xff0c;云函数通常会在15-30分钟后自动释放锁…

作者头像 李华
网站建设 2026/6/15 13:53:51

线程组之间的JMeter传递变量

下面&#xff0c;我们将看看如何在线程组之间共享和传递变量。 在开发高级JMeter脚本时&#xff0c;很可能您将拥有多个线程组。每个线程组将执行不同的请求。 一个很好的例子是我们需要使用Bearer Tokens对用户进行身份验证。一个线程组执行身份验证并保存令牌。另一个线程组…

作者头像 李华