附代码实现 彻底弄懂RocketMQ文件存储( 二 )

< consumerQueueDatas.size(); i++) {ConsumerQueueData consumerQueueData = consumerQueueDatas.get(i);//指定写入位置mappedByteBuffer.position(i * 20);mappedByteBuffer.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)mappedByteBuffer.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)mappedByteBuffer.putLong(consumerQueueData.getTagCode());//8byte (tagCode)count++;System.out.println("consumerQueue数据写入完成:" + JSON.toJSONString(consumerQueueData));mappedByteBuffer.force();}System.out.println("ConsumerQueue数据保存完成count:" + count);}public static void createIndexFile() throws IOException {System.out.println("");System.out.println("IndexFile file create!" );//文件场创建时间,在写第一条消息的时候创建FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);ByteBuffer headerByteBuffer = mappedByteBuffer.slice();long firstDataTime = System.currentTimeMillis();fileChannel.close();//开始写hash槽,从头部后写入/*已经填充有index的slot数量(并不是每个slot槽下都挂载有index索引单元,这 里统计的是所有挂载了index索引单元的slot槽的数量,hash冲突)*/int hashSlotCount = 0;/* 已该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和),如果没有hash冲突,hashSlotCount = indexCount*/int indexCount = 0;//假设建立100个槽位(总长度400)int soltNum = 100;for (int i = 0; i < MESSAGE_COUNT; i++) {IndexFileItemData indexFileItemData = indexFileItemDatas.get(i);int keyHash = indexFileItemData.getKeyHash();//取模,计算第几个槽位int slotPos = keyHash % 100 > 0 ? keyHash % 100 : -1 * (keyHash % 100);// slot存放的文件偏移量(字节长度)int absSlotPos = 40 + slotPos * 4;// 存储实际数据的文件偏移量(字节长度)int absIndexPos =40 + soltNum * 4+ indexCount * 20;//将indexCount存到对应的hash槽mappedByteBuffer.putInt(absSlotPos, indexCount);//写入数据(IndecFile的实际数据部分)mappedByteBuffer.putInt(absIndexPos, indexFileItemData.getKeyHash());//8byte msg hashcodemappedByteBuffer.putLong(absIndexPos + 4, indexFileItemData.getPhyOffset());//8byte msg hashcodemappedByteBuffer.putInt(absIndexPos + 4 + 8, Integer.valueOf((System.currentTimeMillis() - firstDataTime) + ""));//8byte (timeDiff)mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, 0);//8byte (preIndex)暂不考虑hash冲突的情况//模拟最后一个文件,写入headerif (i == 0) {//该indexFile中第一条消息的存储时间headerByteBuffer.putLong(0, firstDataTime);//该indexFile种第一条消息在commitlog种的偏移量commitlog offsetmappedByteBuffer.putLong(16, indexFileItemData.getPhyOffset());}//模拟第一个文件,写入headerif (i == MESSAGE_COUNT - 1) {//该indexFile种最后一条消息存储时间headerByteBuffer.putLong(8, System.currentTimeMillis());//该indexFile中最后一条消息在commitlog中的偏移量commitlog offsetheaderByteBuffer.putLong(24, indexFileItemData.getPhyOffset());}//已经填充有index的slot数量headerByteBuffer.putInt(32, hashSlotCount + 1);//该indexFile中包含的索引单元个数headerByteBuffer.putInt(36, indexCount + 1);mappedByteBuffer.force();System.out.println("msgId:" + indexFileItemData.getMessageId() + ",keyHash:" + keyHash + ",保存槽位为" + slotPos + "的数据,absSlotPos=" + absSlotPos + ",值index=" + indexCount + ",绝对位置:" + absIndexPos + ",commit-phyOffset:" + indexFileItemData.getPhyOffset());indexCount++;hashSlotCount++;}}//将变长字符串定长byte[],方便读取private static byte[] getBytes(String s, int length) {int fixLength = length - s.getBytes().length;if (s.getBytes().length < length) {byte[] S_bytes = new byte[length];System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);for (int x = length - fixLength; x < length; x++) {S_bytes[x] = 0x00;}return S_bytes;}return s.getBytes(StandardCharsets.UTF_8);}}运行结果:
commitLog file create!写入消息,第:0次msgTotalLen:338msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05topic:Topic-testmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:874605e6-69d2-4301-a65e-01e63de75a4dcommitLogOffset:0写入消息,第:1次msgTotalLen:338msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681etopic:Topic-testmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3ccommitLogOffset:338写入消息,第:2次msgTotalLen:296msgId:a0c7c833-9811-4f17-800b-847766aef7ddtopic:Topic-testmsgBody:消息内容msgmtransactionId:9a836d21-704f-46ae-926c-b7933efe06a5commitLogOffset:676写入消息,第:3次msgTotalLen:299msgId:050d6330-1f4a-4dff-a650-4f7eaee63356topic:Topic-testmsgBody:消息内容msgmsgmtransactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44commitLogOffset:972写入消息,第:4次msgTotalLen:306msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5topic:Topic-testmsgBody:消息内容msgmsgmsgmsgmstransactionId:09f3b762-159e-4486-8820-0bce0ef7972dcommitLogOffset:1271写入消息,第:5次msgTotalLen:313msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1topic:Topic-testmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgtransactionId:42dce613-6aaf-466b-b185-02a3f7917579commitLogOffset:1577写入消息,第:6次msgTotalLen:321msgId:05be27f8-fb7a-4662-904f-2263e8899086topic:Topic-testmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:6c7db927-911c-4d19-a240-a951fad957bdcommitLogOffset:1890写入消息,第:7次msgTotalLen:318msgId:9a508d90-30f6-4a25-812f-25d750736afetopic:Topic-testmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmstransactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897commitLogOffset:2211写入消息,第:8次msgTotalLen:305msgId:63249e08-bd0c-4a5b-954b-aea83cb442betopic:Topic-testmsgBody:消息内容msgmsgmsgmsgmtransactionId:22cc0dd6-2036-4423-8e6f-d7043b953724commitLogOffset:2529写入消息,第:9次msgTotalLen:329msgId:93c46c53-b097-4dd0-90d7-06d5d877f489topic:Topic-testmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:e9078205-15be-42b1-ad7e-55b9f5e229ebcommitLogOffset:2834commitLog数据保存完成,totalSize:10ConsumerQueue file create!consumerQueue数据写入完成:{"msgLength":338,"offset":0,"tagCode":100}consumerQueue数据写入完成:{"msgLength":338,"offset":338,"tagCode":100}consumerQueue数据写入完成:{"msgLength":296,"offset":676,"tagCode":100}consumerQueue数据写入完成:{"msgLength":299,"offset":972,"tagCode":100}consumerQueue数据写入完成:{"msgLength":306,"offset":1271,"tagCode":100}consumerQueue数据写入完成:{"msgLength":313,"offset":1577,"tagCode":100}consumerQueue数据写入完成:{"msgLength":321,"offset":1890,"tagCode":100}consumerQueue数据写入完成:{"msgLength":318,"offset":2211,"tagCode":100}consumerQueue数据写入完成:{"msgLength":305,"offset":2529,"tagCode":100}consumerQueue数据写入完成:{"msgLength":329,"offset":2834,"tagCode":100}ConsumerQueue数据保存完成count:10IndexFile file create!msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05,keyHash:-358470777,保存槽位为77的数据,absSlotPos=348,值index=0,绝对位置:440,commit-phyOffset:338msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e,keyHash:466366793,保存槽位为93的数据,absSlotPos=412,值index=1,绝对位置:460,commit-phyOffset:676msgId:a0c7c833-9811-4f17-800b-847766aef7dd,keyHash:1237522456,保存槽位为56的数据,absSlotPos=264,值index=2,绝对位置:480,commit-phyOffset:972msgId:050d6330-1f4a-4dff-a650-4f7eaee63356,keyHash:-1115509881,保存槽位为81的数据,absSlotPos=364,值index=3,绝对位置:500,commit-phyOffset:1271msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5,keyHash:1219778974,保存槽位为74的数据,absSlotPos=336,值index=4,绝对位置:520,commit-phyOffset:1577msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1,keyHash:460184183,保存槽位为83的数据,absSlotPos=372,值index=5,绝对位置:540,commit-phyOffset:1890msgId:05be27f8-fb7a-4662-904f-2263e8899086,keyHash:-339624012,保存槽位为12的数据,absSlotPos=88,值index=6,绝对位置:560,commit-phyOffset:2211msgId:9a508d90-30f6-4a25-812f-25d750736afe,keyHash:403329587,保存槽位为87的数据,absSlotPos=388,值index=7,绝对位置:580,commit-phyOffset:2529msgId:63249e08-bd0c-4a5b-954b-aea83cb442be,keyHash:-1569335572,保存槽位为72的数据,absSlotPos=328,值index=8,绝对位置:600,commit-phyOffset:2834msgId:93c46c53-b097-4dd0-90d7-06d5d877f489,keyHash:597856342,保存槽位为42的数据,absSlotPos=208,值index=9,绝对位置:620,commit-phyOffset:3163


推荐阅读