您现在的位置是:乐刷官网 > 乐刷收付贝
pos机刷卡连接超时, broker 文件刷盘机制
乐刷官网2025-04-25 01:41:40【乐刷收付贝】3人已围观
简介网上有很多关于pos机刷卡连接超时, broker 文件刷盘机制的知识,也有很多人为大家解答关于pos机刷卡连接超时的问题,今天乐刷POS机官网(b06.cn)为大家整理了关于这方面的知
【温馨提示】如果您有办理pos机的需求或者疑问,可以联系官方微信 18127011016

网上有很多关于pos机刷卡连接超时, broker 文件刷盘机制的机机制知识,也有很多人为大家解答关于pos机刷卡连接超时的刷卡刷盘问题,今天乐刷官方代理商(www.zypos.cn)为大家整理了关于这方面的连接知识,让我们一起来看下吧!
本文目录一览:
1、文件pos机刷卡连接超时
pos机刷卡连接超时
RocketMQ的机机制存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存中,刷卡刷盘再根据配置的连接刷盘策略在不同时间刷盘。
RocketMQ使用一个单独的文件线程按照某一个设定的频率执行刷盘操作。通过在broker配置文件中配置flushDiskType来设定刷盘方式,机机制可选值为ASYNC_FLUSH(异步刷盘)、刷卡刷盘SYNC_FLUSH(同步刷盘),连接默认为异步刷盘。
本节以CommitLog文件刷盘机制为例来剖析RocketMQ的刷盘机制,ConsumeQueue文件、Index文件刷盘的实现原理与CommitLog刷盘机制类似。
RocketMQ处理刷盘的实现方法为Commitlog#handleDiskFlush(),刷盘流程作为消息发送、消息存储的 子流程。值得注意的是,Index文件的刷盘并不是采取定时刷盘机制,而是每更新一次Index文 件就会将上一次的改动写入磁盘。
1. 刷盘策略在理解RocketMQ刷盘实现之前,先理解一下上图展示的刷盘的2种实现的:
直接通过内存映射文件,通过flush刷新到磁盘当异步刷盘且启用了对外内存池的时候,先write到writeBuffer,然后commit到FILEchannel,最后flush到磁盘CommitLog的asyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushrequest方法执行刷盘策略:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { ... // 获取最后一个 MappedFile MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); ... try { ... // todo 往mappedFile追加消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback); ... } finally { putMessageLock.unlock(); } ... // todo 消息首先进入pagecache,然后执行刷盘操作, CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); ...}复制代码
刷盘有两种策略:
同步刷盘,表示消息写入到内存之后需要立刻刷到磁盘文件中。同步刷盘会构建GroupCommitRequest组提交请求并设置本次刷盘后的位置偏移量的值(写入位置偏移量+写入数据字节数),然后将请求添加到GroupCommitService中进行刷盘。异步刷盘,表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。CommitLog#submitFlushRequest如下:
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // Synchronization flush 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 获取GroupCommitService final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 是否等待 if (messageExt.isWaitStoreMsgOK()) { // 构建组提交请求,传入本次刷盘后位置的偏移量:写入位置偏移量+写入数据字节数 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盘请求 service.putRequest(request); return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // Asynchronous flush 异步刷盘 这个就是靠os else { // 如果未使用暂存池 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 唤醒刷盘线程进行刷盘 flushCommitLogService.wakeup(); } else { // 如果使用暂存池,使用commitLogService,先将数据写入到FILECHANNEL,然后统一进行刷盘 commitLogService.wakeup(); } // 返回结果 return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }}复制代码2. 同步刷盘
如果使用的是同步刷盘,首先获取了GroupCommitService,然后构建GroupCommitRequest组提交请求,将请求添加到GroupCommitService中,GroupCommitService用于提交刷盘数据。
2.1 GroupCommitRequest提交请求GroupCommitRequest是CommitLog的内部类:
nextOffset:写入位置偏移量+写入数据字节数,也就是本次刷盘成功后应该对应的flush偏移量flushOKFuture:刷盘结果timeoutMillis:刷盘的超时时间,超过超时时间还未刷盘完毕会被认为超时public static class GroupCommitRequest { // 刷盘点偏移量 private final long nextOffset; // 刷盘状态 private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>(); private final long startTimestamp = System.currentTimeMillis(); // 超时时间 private long timeoutMillis = Long.MAX_VALUE; public GroupCommitRequest(long nextOffset, long timeoutMillis) { this.nextOffset = nextOffset; this.timeoutMillis = timeoutMillis; } public void wakeupCustomer(final PutMessageStatus putMessageStatus) { // todo 在这里调用 结束刷盘,设置刷盘状态 this.flushOKFuture.complete(putMessageStatus); }复制代码2.2 GroupCommitService处理刷盘
GroupCommitService是CommitLog的内部类,从继承关系中可知它实现了Runnable接口,在run方法调用waitForRunning等待刷盘请求的提交,然后处理刷盘,不过这个线程是在什么时候启动的呢?
public class CommitLog { /** * GroupCommit Service */ class GroupCommitService extends FlushCommitLogService { // ... // run方法 public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 等待刷盘请求的到来 this.waitForRunning(10); // 处理刷盘 this.doCommit(); } catch (exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // ... } }}复制代码2.2.1 刷盘线程的启动
在BrokerController的启动方法中,可以看到调用了messageStore的start方法,前面可知使用的是DefaultMessageStore,进入到DefaultMessageStore的start方法,它又调用了commitLog的start方法,在CommitLog的start方法中,启动了刷盘的线程和监控刷盘的线程:
public class BrokerController { public void start() throws Exception { if (this.messageStore != null) { // 启动 this.messageStore.start(); } // ... }}public class DefaultMessageStore implements MessageStore { /** * @throws Exception */ public void start() throws Exception { // ... this.flushConsumeQueueService.start(); // 调用CommitLog的启动方法 this.commitLog.start(); this.storeStatsService.start(); // ... }}public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盘 private final FlushCommitLogService commitLogService; // commitLogService public void start() { // 启动刷盘的线程 this.flushCommitLogService.start(); flushDiskWatcher.setDaemon(true); if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { this.commitLogService.start(); } }}复制代码2.2.2 刷盘请求的处理
既然知道了线程在何时启动的,接下来详细看一下GroupCommitService是如何处理刷盘提交请求的。
前面知道在GroupCommitService的run方法中,调用了waitForRunning方法等待刷盘请求,waitForRunning在GroupCommitService父类ServiceThread中实现。ServiceThread是一个抽象类,实现了Runnable接口,里面使用了CountDownLatch进行线程间的通信,大小设为1。
waitForRunning方法在进入的时候先判断hasNotified是否为true(已通知),并尝试将其更新为false(未通知),由于hasNotified的初始化值为false,所以首次进入的时候条件不成立,不会进入到这个处理逻辑,会继续执行后面的代码。
接着调用 waitPoint的reset方法将其重置为1,并调用waitPoint的await方法进行等待:
// ServiceThreadpublic abstract class ServiceThread implements Runnable { // 是否通知,初始化为false protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); // CountDownLatch用于线程间的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待运行 protected void waitForRunning(long interval) { // 判断hasNotified是否为true,并尝试将其更新为false if (hasNotified.compareAndSet(true, false)) { // 调用onWaitEnd this.onWaitEnd(); return; } // 重置waitPoint的值,也就是值为1 waitPoint.reset(); try { // 会一直等待waitPoint值降为0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知设置为false hasNotified.set(false); this.onWaitEnd(); } }}复制代码1. 添加刷盘请求,唤醒刷盘线程
上面可知需要刷盘的时候调用了GroupCommitService的putRequest方法添加刷盘请求,在putRequest方法中,将刷盘请求GroupCommitRequest添加到了requestsWrite组提交写请求链表中,然后调用wakeup方法唤醒刷盘线程,wakeup方法在它的父类ServiceThread中实现。
在wakeup方法中可以看到,首先将hasNotified更改为了true表示处于已通知状态,然后调用了countDown方法,此时waitPoint值变成0,就会唤醒之前waitForRunning方法中一直在等待的线程。
public class CommitLog { /** * 组提交Service */ class GroupCommitService extends FlushCommitLogService { // 组提交写请求链表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // ... // 添加提交请求 public synchronized void putRequest(final GroupCommitRequest request) { // 加锁 lock.lock(); try { // 加入到写请求链表 this.requestsWrite.add(request); } finally { lock.unlock(); } // 唤醒线程执行提交任务 this.wakeup(); } // ... } }// ServiceThreadpublic abstract class ServiceThread implements Runnable { // CountDownLatch用于线程间的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 唤醒刷盘线程 public void wakeup() { // 更改状态为已通知状态 if (hasNotified.compareAndSet(false, true)) { // waitPoint的值减1,由于大小设置为1,减1之后变为0,会唤醒等待的线程 waitPoint.countDown(); } } // ...}复制代码2. 线程被唤醒,执行刷盘前的操作
waitForRunning方法中的await方法一直在等待countdown的值变为0,当上一步调用了wakeup后,就会唤醒该线程,然后开始往下执行,在finally中可以看到将是否被通知hasNotified又设置为了false。
然后调用了onWaitEnd方法,GroupCommitService方法中重写了该方法,里面又调用了swapRequests方法将读写请求列表的数据进行了交换,putRequest方法中将提交的刷盘请求放在了写链表中,经过交换,数据会被放在读链表中,后续进行刷盘时会从读链表中获取请求进行处理:
// ServiceThreadpublic abstract class ServiceThread implements Runnable { // CountDownLatch protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待运行 protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { // 交换 this.onWaitEnd(); return; } // 重置 waitPoint.reset(); try { // 会一直等待countdown为0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知设置为false hasNotified.set(false); this.onWaitEnd(); } }}public class CommitLog { /** * 组提交Service */ class GroupCommitService extends FlushCommitLogService { // 组提交写请求链表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // 组提交读请求链表 private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>(); @Override protected void onWaitEnd() { // 交换读写请求列表的数据请求 this.swapRequests(); } private void swapRequests() { // 加锁 lock.lock(); try { // 将读写请求链表的数据进行交换 LinkedList<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); } } // ... }}复制代码
这里使用读写链表进行交换应该是为了提升性能,如果只使用一个链表,在提交请求的时候需要往链表中添加请求,此时需要加锁,而刷盘线程在处理完请求之后是需要从链表中移除请求的,假设添加请求时加的锁还未释放,刷盘线程就要一直等待,而添加和处理完全可以同时进行,所以使用了两个链表,在添加请求的时候使用写链表,处理请求的时候对读写链表的数据进行交换使用读链表,这样只需在交换数据的时候加锁,以此来提升性能。
3. 执行刷盘waitForRunning执行完毕后,会回到GroupCommitService中的run方法开始继续往后执行代码,从代码中可以看到接下来会调用doCommit方法执行刷盘。
doCommit方法中对读链表中的数据进行了判空,如果不为空,进行遍历处理每一个提交请求,处理逻辑如下:
获取CommitLog映射文件记录的刷盘位置偏移量flushedWhere,判断是否大于请求设定的刷盘位置偏移量nextOffset,正常情况下flush的位置应该小于本次刷入数据后的偏移量,所以如果flush位置大于等于本次请求设置的flush偏移量,本次将不能进行刷盘开启一个循环,调用mappedFileQueue的flush方法执行刷盘(具体的实现在异步刷盘的时候再看),由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行。请求处理之后会清空读链表。class GroupCommitService extends FlushCommitLogService { // 组提交写请求链表 // 同步刷盘任务暂存容器 private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); // 读写分离,避免了任务提交与任务执行的锁冲突 private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); // 提交刷盘 private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { // 遍历刷盘请求 for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush // 获取映射文件的flush位置,判断是否大于请求设定的刷盘位置 boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); // 请求1次+重试1次 for (int i = 0; i < 2 && !flushOK; i++) { // todo 刷盘操作 CommitLog.this.mappedFileQueue.flush(0); // 由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置, // 表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } // todo 唤醒消息发送线程并通知刷盘结果 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 请求处理完之后清空链表 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // todo 等待10ms 并交换读写容器 this.waitForRunning(10); // todo this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush // 睡眠10毫秒 try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { // 交换 this.swapRequests(); } // 停止之前提交一次 this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }复制代码3. 异步刷盘
上面讲解了同步刷盘,接下来去看下异步刷盘,首先会判断是否使用了暂存池,如果未开启调用flushCommitLogService的wakeup唤醒刷盘线程,否则使用commitLogService先将数据写入到FileChannel,然后统一进行刷盘:
public class CommitLog { public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 是否是同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // ... } // 如果是异步刷盘 else { // 如果未使用暂存池 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 唤醒刷盘线程进行刷盘 flushCommitLogService.wakeup(); } else { // 如果使用暂存池,使用commitLogService,先将数据写入到FILECHANNEL,然后统一进行刷盘 commitLogService.wakeup(); } // 返回结果 return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }}复制代码
在CommitLog的构造函数中可以看到,commitLogService使用的是CommitRealTimeService进行实例化的,flushCommitLogService需要根据设置决定使用哪种类型进行实例化:
如果是同步刷盘,使用GroupCommitService,由前面的同步刷盘可知,使用的就是GroupCommitService进行刷盘的。如果是异步刷盘,使用FlushRealTimeService。所以接下来需要关注CommitRealTimeService和FlushRealTimeService:
public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盘Service private final FlushCommitLogService commitLogService; public CommitLog(final DefaultMessageStore defaultMessageStore) { // 如果设置的同步刷盘 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 使用GroupCommitService this.flushCommitLogService = new GroupCommitService(); } else { // 使用FlushRealTimeService this.flushCommitLogService = new FlushRealTimeService(); } // commitLogService this.commitLogService = new CommitRealTimeService(); }}复制代码3.1 CommitRealTimeService
在开启暂存池时,会使用CommitRealTimeService,它继承了FlushCommitLogService,所以会实现run方法,处理逻辑如下:
从配置信息中获取提交间隔、每次提交的最少页数和两次提交的最大间隔时间如果当前时间大于上次提交时间+两次提交的最大间隔时间,意味着已经有比较长的一段时间没有进行提交了,需要尽快刷盘,此时将每次提交的最少页数设置为0不限制提交页数调用mappedFileQueue的commit方法进行提交,并返回提交的结果:如果结果为true表示未提交任何数据如果结果为false表示进行了数据提交,需要等待刷盘判断提交返回结果是否返回false,如果是调用flushCommitLogService的wakeup方法唤醒刷盘线程,进行刷盘调用waitForRunning等待下一次提交处理class CommitRealTimeService extends FlushCommitLogService { // 上次提交时间戳 private long lastCommitTimestamp = 0; @Override public String getServiceName() { return CommitRealTimeService.class.getSimpleName(); } @Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // CommitRealTimeService线程间隔时间,默认200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 一次提交任务至少包含的页数,如果待提交数据不足,小于该参数配置的值,将忽略本次提交任务,默认4页 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 两次真实提交的最大间隔时间,默认200ms int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); // 开始时间 long begin = System.currentTimeMillis(); // 如果距上次提交间隔超过commitDataThoroughInterval,则本次提交忽略commitLogLeastPages //参数,也就是如果待提交数据小于指定页数,也执行提交操作 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { // 提交时间 this.lastCommitTimestamp = begin; // 最少提交页数设为0,表示不限制提交页数 commitDataLeastPages = 0; } try { // todo 执行提交操作,将待提交数据提交到物理文件的内存映射内存区 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); // 提交结束时间 long end = System.currentTimeMillis(); // 如果返回false,并不代表提交失败,而是表示有数据提交成功了,唤醒刷盘线程执行刷盘操作 if (!result) { // 再次更新提交时间戳 this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. // 唤醒flush线程进行刷盘 flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin);} // 该线程每完成一次提交动作,将等待200ms再继续执行下一次提交任务 this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }}复制代码提交
提交的方法在MappedFileQueue的commit方法中实现,处理逻辑如下:
根据记录的CommitLog文件提交位置的偏移量获取映射文件,如果获取不为空,调用MappedFile的commit方法进行提交,然后返回本次提交数据的偏移量记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量判断本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何数据,返回结果置为true,否则表示提交了数据,等待刷盘,返回结果为false更新上一次提交偏移量committedWhere的值为本次的提交偏移量的值public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 public boolean commit(final int commitLeastPages) { boolean result = true; // 根据提交位置的偏移量获取映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { // 调用mappedFile的commit方法进行提交,返回提交数据的偏移量 int offset = mappedFile.commit(commitLeastPages); // 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量 long where = mappedFile.getFileFromOffset() + offset; // 设置返回结果,如果本次提交偏移量等于上一次的提交偏移量为true,表示什么也没干,否则表示提交了数据,等待刷盘 result = where == this.committedWhere; // 更新上一次提交偏移量的值为本次的 this.committedWhere = where; } return result; }}复制代码3.2 MappedFile
MappedFile中记录CommitLog的写入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,调用了isAbleToCommit判断是否可以提交数据,判断的流程如下:
获取提交数据的位置偏移量和写入数据的位置偏移量如果最少提交页数大于0,计算本次写入的页数是否大于或等于最少提交页数本次写入数据的页数计算方法:写入位置/页大小 - flush位置/页大小如果以上条件都满足,判断写入位置是否大于flush位置,如果大于表示有一部数据未flush可以进行提交满足提交条件后,就会调用commit0方法提交数据,将数据写入到fileChannel中:
public class MappedFile extends ReferenceResource { // 数据写入位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 数据提交位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); // 数据flush位置 private final AtomicInteger flushedPosition = new AtomicInteger(0); // 提交数据 public int commit(final int commitLeastPages) { // 如果writeBuffer为空 if (writeBuffer == null) { // 不需要提交任何数据到,返回之前记录的写入位置 return this.wrotePosition.get(); } // 如果可以提交数据 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { // 提交数据 commit0(); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } // 返回提交位置 return this.committedPosition.get(); } // 是否可以提交数据 protected boolean isAbleToCommit(final int commitLeastPages) { // 获取提交数据的位置偏移量 int flush = this.committedPosition.get(); // 获取写入数据的位置偏移量 int write = this.wrotePosition.get(); if (this.isFull()) { return true; } // 如果最少提交页数大于0 if (commitLeastPages > 0) { // 写入位置/页大小 - flush位置/页大小 是否大于至少提交的页数 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } // 判断是否需要flush数据 return write > flush; } protected void commit0() { // 获取写入位置 int writePos = this.wrotePosition.get(); // 获取上次提交的位置 int lastCommittedPosition = this.committedPosition.get(); if (writePos - lastCommittedPosition > 0) { try { // 创建共享缓冲区 ByteBuffer byteBuffer = writeBuffer.slice(); // 设置上一次提交位置 byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); // 数据写入fileChannel this.fileChannel.write(byteBuffer); // 更新写入的位置 this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }}复制代码3.3 FlushRealTimeService
如果未开启暂存池,会直接使用FlushRealTimeService进行刷盘,当然如果开启暂存池,写入一批数据后,同样会使用FlushRealTimeService进行刷盘,FlushRealTimeService同样继承了FlushCommitLogService,是用于执行刷盘的线程,处理逻辑与提交刷盘数据逻辑相似,只不过不是提交数据,而是调用flush方法将提交的数据刷入磁盘:
从配置信息中获取flush间隔、每次flush的最少页数和两次flush的最大间隔时间如果当前时间大于上次flush时间+两次flush的最大间隔时间,意味着已经有比较长的一段时间没有进行flush,此时将每次flush的最少页数设置为0不限制flush页数调用waitForRunning等待被唤醒如果被唤醒,调用mappedFileQueue的flush方法进行刷盘class FlushRealTimeService extends FlushCommitLogService { // 上一次flush的时间 private long lastFlushTimestamp = 0; private long printTimes = 0; public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 默认为false,表示使用await方法等待;如果为true,表示使用Thread.sleep方法等待 boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); // 线程任务运行间隔时间 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // 一次刷盘任务至少包含页数,如 //果待写入数据不足,小于该参数配置的值,将忽略本次刷盘任务,默认4页。 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); // 两次真实刷盘任务的最大间隔时间,默认10s int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); // 如果距上次提交数据的间隔时间超过 //flushPhysicQueueThoroughInterval,则本次刷盘任务将忽略 //flushPhysicQueueLeastPages,也就是如果待写入数据小于指定页 //数,也执行刷盘操作 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { // 更新flush时间 this.lastFlushTimestamp = currentTimeMillis; // flush至少包含的页数置为0 flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { // 执行一次刷盘任务前先等待指定时间间隔 if (flushCommitLogTimed) { Thread.sleep(interval); } else { // 等待flush被唤醒 this.waitForRunning(interval); } if (printFlushProgress) { // 打印刷盘进程 this.printFlushProgress(); } long begin = System.currentTimeMillis(); // todo flush方法 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { // todo 更新checkpoint文件的CommitLog文件更新时间戳 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past);} } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit // 如果服务停止,确保数据被刷盘 boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { // 进行刷盘 result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }复制代码刷盘
刷盘的方法在MappedFileQueue的flush方法中实现,处理逻辑如下:
根据 flush的位置偏移量获取映射文件调用mappedFile的flush方法进行刷盘,并返回刷盘后的位置偏移量计算最新的flush偏移量更新flushedWhere的值为最新的flush偏移量public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 // flush刷盘 public boolean flush(final int flushLeastPages) { boolean result = true; // 获取flush的位置偏移量映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { // 获取时间戳 long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 调用MappedFile的flush方法进行刷盘,返回刷盘后的偏移量 int offset = mappedFile.flush(flushLeastPages); // 计算最新的flush偏移量 long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; // 更新flush偏移量 this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } // 返回flush的偏移量 return result; }}复制代码
flush的逻辑也与commit方法的逻辑类似:
调用isAbleToFlush判断是否满足刷盘条件,获取上次flush位置偏移量和当前写入位置偏移量进行如下校验:文件是否已写满,即文件大小是否与写入数据位置相等,如果相等说明文件已经写满需要执行刷盘,满足刷盘条件如果最少flush页数大于0,计算本次flush的页数是否大于或等于最少flush页数,如果满足可以进行刷盘本次flush数据的页数计算方法:写入位置/页大小 - flush位置/页大小如果写入位置偏移量是否大于flush位置偏移量,如果大于表示有数据未进行刷盘,满足刷盘条件调用fileChannel的force或者mappedByteBuffer的force方法进行刷盘记录本次flush的位置,并作为结果返回public class MappedFile extends ReferenceResource { protected final AtomicInteger wrotePosition = new AtomicInteger(0); protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); /** * 进行刷盘并返回flush后的偏移量 */ public int flush(final int flushLeastPages) { // 是否可以刷盘 if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { // 如果writeBuffer不为空 if (writeBuffer != null || this.fileChannel.position() != 0) { // 将数据刷到硬盘 this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 记录flush位置 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } // 返回flush位置 return this.getFlushedPosition(); } // 是否可以刷盘 private boolean isAbleToFlush(final int flushLeastPages) { // 获取上次flush位置 int flush = this.flushedPosition.get(); // 写入位置偏移量 int write = getReadPosition(); if (this.isFull()) { return true; } // 如果flush的页数大于0,校验本次flush的页数是否满足条件 if (flushLeastPages > 0) { // 本次flush的页数:写入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPages return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } // 写入位置偏移量是否大于flush位置偏移量 return write > flush; } // 文件是否已写满 public boolean isFull() { // 文件大小是否与写入数据位置相等 return this.fileSize == this.wrotePosition.get(); } /** * 返回当前有效数据的位置 */ public int getReadPosition() { // 如果writeBuffer为空使用写入位置,否则使用提交位置 return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); }}复制代码4. 总结
同步刷盘总体流程:
异步刷盘总体流程:
以上就是关于pos机刷卡连接超时, broker 文件刷盘机制的知识,后面我们会继续为大家整理关于pos机刷卡连接超时的知识,希望能够帮助到大家!
关键词:站前区pos机办理需要注意什么
很赞哦!(9928)
相关文章
- 全面解析POS机CE认证流程与费用 - 深圳POS机办理中心
- 十大乐刷收银通POS机详解 - 深圳POS机办理中心
- 乐刷收银通触碰刷KA机,安全、便捷的支付解决方案 - 深圳POS机办理中心
- 正规POS机申请渠道汇总及其申请流程详解 - 深圳POS机办理中心
- 全面解析,乐刷收银通POS机如何查询交易总额 - 深圳POS机办理中心
- 银联商务POS机办理指南,官网办理条件及最低手续费正规渠道 - 深圳POS机办理中心
- 正规pos机费率多少?pos机什么牌子非常好?一篇全面分析解答 - 深圳POS机办理中心
- 乐刷收银通POS机与银联POS机官网申请指南 - 深圳POS机办理中心
- 盛付通正规POS机申请官网详解,助您轻松拥有合法支付工具 - 深圳POS机办理中心
- POS机哪个牌子好?银联正规POS机名单 - 深圳POS机办理中心
热门文章
- 乐刷收银通个人pos机不能用了怎么办?——从多个角度分析解决问题 - 深圳POS机办理中心
- 银联官网与乐刷收银通官网POS机办理申请,个人免费办理POS机的正规流程解析 - 深圳POS机办理中心
- 乐刷收银通电签POS机免费申请及办理指南——官网正规流程详解 - 深圳POS机办理中心
- 乐刷收银通pos机的注意事项? - 深圳POS机办理中心
- 免费送pos机广告录音下载,让你的生意红红火火! - 深圳POS机办理中心
- 官网免费领取个人银联乐刷收银通POS机与盛付通POS机的申请流程详解 - 深圳POS机办理中心
- 乐刷收银通POS机申请的正确方式详解 - 深圳POS机办理中心
- 乐刷收银通POS机官网申请指南与品牌推荐 - 深圳POS机办理中心
- 深圳POS机办理中心,一站式服务与便捷体验 - 深圳POS机办理中心
- 乐刷收银通个人POS机排名及品牌介绍 - 深圳POS机办理中心
热门视频
- https://www.bilibili.com/opus/1029734094487945219
- https://www.bilibili.com/opus/1012551132906520577
- https://space.bilibili.com/1082673316/settings
- https://www.bilibili.com/opus/1011770174698684420
- https://www.bilibili.com/opus/1015905077140914212
- https://space.bilibili.com/628312092/relation/fans
- https://www.bilibili.com/read/cv40540293/
- https://www.bilibili.com/read/cv40492217/
- https://www.bilibili.com/video/BV1NkktYTEkV/
- https://www.bilibili.com/opus/1018608540029288472
站长推荐
手机银行转账POS机能否查出来 - 深圳POS机办理中心
银联商务POS机办理指南,官网办理条件及最低手续费正规渠道 - 深圳POS机办理中心
乐刷收银通POS机个人申请平台详解,申请流程、条件、费用及注意事项 - 深圳POS机办理中心
银联POS机办理全攻略,高效、便捷,一站式解决您的支付需求 - 深圳POS机办理中心
二维码支付POS机品牌盘点 - 深圳POS机办理中心
旅付通乐刷收银通POS机办理攻略,全方位解析办理渠道与流程 - 深圳POS机办理中心
一机一码对刷信用ka卡有影响吗?——从原理、风险和防范措施三个方面进行详细阐述 - 深圳POS机办理中心
个人POS刷KA机品牌及成都推荐 - 深圳POS机办理中心
全国POS机办理网点
最新标签
- 福海县pos机办理需要多少钱
- 南关区pos机办理需要注意什么
- 临桂县pos机办理需要多少钱
- 南关区pos机办理需要注意什么
- 孟村回族自治县pos机正规办理方法
- 隆德县pos机办理需要注意什么
- 蓬安县pos机正规办理方法
- 富顺县pos机正规办理方法
- 长治pos机办理需要多少钱
- 荆州区pos机办理需要多少钱
- 百色pos机代理
- 济源pos机办理需要什么资料
- 东台pos机办理需要注意什么
- 冷水江pos机办理需要什么资料
- 桐庐县pos机正规办理方法
- 南关区pos机办理需要注意什么
- 海曙区pos机办理需要注意什么
- 尼玛县pos机正规办理方法
- 祁门县pos机办理需要多少钱
- 深圳pos机办理需要多少钱
- 宁阳县pos机正规办理方法
- 库车县pos机办理需要注意什么
- 三穗县pos机办理需要注意什么
- 东兴pos机办理需要注意什么
- 玉环县pos机办理需要注意什么
- 泰和县pos机办理需要多少钱
- 库车县pos机办理需要注意什么
- 武夷山pos机正规办理方法
- 八道江区pos机正规办理方法
- 江北区pos机正规办理方法