Flink 系列第10篇:Flink 分布式缓存详解

张开发
2026/4/15 19:15:24 15 分钟阅读

分享文章

Flink 系列第10篇:Flink 分布式缓存详解
一、分布式缓存概述Flink 提供的分布式缓存核心作用是让用户在并行函数中便捷读取本地或远程文件并将文件同步至所有 TaskManager 节点的本地文件系统避免 Task 重复拉取文件提升作业执行效率降低网络开销。二、分布式缓存工作机制Flink 分布式缓存的工作流程可分为以下4个步骤确保文件高效同步且仅执行一次用户注册文件/目录可注册本地文件或远程文件系统如 HDFS、S3中的文件/目录通过执行环境注册借助 ExecutionEnvironment 注册缓存文件并为其指定一个唯一名称后续用于查找文件自动同步至 TaskManager程序执行时Flink 会自动将注册的文件/目录复制到所有 TaskManager 节点的本地文件系统该同步操作仅执行一次本地访问文件用户在并行函数中通过注册时指定的名称查找文件/目录从 TaskManager 本地文件系统直接访问无需重复拉取。三、代码示例3.1 注册缓存文件通过 ExecutionEnvironment 注册缓存文件支持本地文件或远程文件如 HDFS示例如下// 1. 获取Flink批处理运行环境分布式缓存主要用于批处理流处理需结合特定场景ExecutionEnvironmentenvExecutionEnvironment.getExecutionEnvironment();// 2. 注册缓存文件第一个参数为文件路径本地路径/远程路径第二个参数为缓存名称唯一标识// 示例注册本地文件缓存名称为a.txtenv.registerCachedFile(/Users/wangzhiwu/WorkSpace/quickstart/text,a.txt);3.2 在并行函数中访问缓存文件需通过继承 RichFunction如 RichMapFunction借助 RuntimeContext 读取缓存文件。原因是 RichFunction 提供了 RuntimeContext 实例可用于获取分布式缓存资源。// 3. 在RichMapFunction中访问缓存文件DataSetStringresultdata.map(newRichMapFunctionString,String(){// 用于存储缓存文件中的数据供后续业务逻辑使用privateArrayListStringdataListnewArrayListString();// open方法在Task启动时执行一次适合初始化操作如读取缓存Overridepublicvoidopen(Configurationparameters)throwsException{super.open(parameters);// 4. 通过缓存名称a.txt获取本地文件FilemyFilegetRuntimeContext().getDistributedCache().getFile(a.txt);// 5. 读取文件内容需导入org.apache.commons.io.FileUtilsListStringlinesFileUtils.readLines(myFile);// 6. 将文件内容存入dataList供map方法使用for(Stringline:lines){this.dataList.add(line);System.err.println(分布式缓存内容line);}}// map方法并行处理每条数据可直接使用缓存中的dataListOverridepublicStringmap(Stringvalue)throwsException{// 打印缓存数据和当前处理的value便于调试System.err.println(使用缓存数据dataList------------value);// 业务逻辑将缓存数据与当前value拼接返回returndataListvalue;}});// 打印结果printToErr()用于区分标准输出和错误输出便于查看缓存相关日志result.printToErr();3.3 完整代码含注释以下为完整可运行代码包含环境初始化、缓存注册、缓存访问及结果输出注释详细可直接复用importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.DataSource;importorg.apache.flink.configuration.Configuration;importorg.apache.commons.io.FileUtils;importjava.io.File;importjava.util.ArrayList;importjava.util.List;publicclassDisCacheTest{publicstaticvoidmain(String[]args)throwsException{// 1. 获取Flink批处理运行环境ExecutionEnvironmentenvExecutionEnvironment.getExecutionEnvironment();// 2. 注册缓存文件本地文件路径可替换为HDFS路径如hdfs://xxx/text// 备注缓存文件text中包含4个单词hello flink hello FLINKenv.registerCachedFile(/Users/wangzhiwu/WorkSpace/quickstart/text,a.txt);// 3. 构造测试数据源4条数据a、b、c、dDataSourceStringdataenv.fromElements(a,b,c,d);// 4. 利用RichMapFunction访问缓存并处理数据DataSetStringresultdata.map(newRichMapFunctionString,String(){// 存储缓存文件内容的集合privateArrayListStringdataListnewArrayListString();// Task启动时执行仅执行一次用于读取缓存文件Overridepublicvoidopen(Configurationparameters)throwsException{super.open(parameters);// 通过缓存名称a.txt获取TaskManager本地的缓存文件FilemyFilegetRuntimeContext().getDistributedCache().getFile(a.txt);// 读取文件所有行依赖commons-io包需引入相关依赖ListStringlinesFileUtils.readLines(myFile);// 将文件内容存入dataListfor(Stringline:lines){this.dataList.add(line);System.err.println(分布式缓存内容line);}}// 并行处理每条输入数据可直接使用缓存中的dataListOverridepublicStringmap(Stringvalue)throwsException{// 打印缓存数据和当前处理的value用于调试System.err.println(使用缓存数据dataList------------value);// 业务逻辑将缓存数据与当前value拼接作为结果返回returndataListvalue;}});// 5. 输出结果使用printToErr()避免与缓存日志混淆result.printToErr();}}四、输出结果运行上述代码后输出结果如下包含缓存读取日志和最终处理结果分布式缓存内容hello 分布式缓存内容flink 分布式缓存内容hello 分布式缓存内容FLINK 使用缓存数据[hello, flink, hello, FLINK]------------a [hello, flink, hello, FLINK]a 使用缓存数据[hello, flink, hello, FLINK]------------b [hello, flink, hello, FLINK]b 使用缓存数据[hello, flink, hello, FLINK]------------c [hello, flink, hello, FLINK]c 使用缓存数据[hello, flink, hello, FLINK]------------d [hello, flink, hello, FLINK]d

更多文章