Commit c9a1a456 authored by luz's avatar luz

SerialQueue: enhanced version with optional accept buffer, which collects data...

SerialQueue: enhanced version with optional accept buffer, which collects data and allows operations to peek into the data and deny it as long as there is not enough data ready
parent 25283d93
......@@ -51,9 +51,9 @@ void SerialOperation::setTransmitter(SerialOperationTransmitter aTransmitter)
// call to deliver received bytes
size_t SerialOperation::acceptBytes(size_t aNumBytes, uint8_t *aBytes)
ssize_t SerialOperation::acceptBytes(size_t aNumBytes, uint8_t *aBytes)
{
return 0;
return 0; // accept none, expect none
}
......@@ -186,7 +186,7 @@ void SerialOperationReceive::clearData()
}
size_t SerialOperationReceive::acceptBytes(size_t aNumBytes, uint8_t *aBytes)
ssize_t SerialOperationReceive::acceptBytes(size_t aNumBytes, uint8_t *aBytes)
{
// append bytes into buffer
if (!initiated)
......@@ -248,7 +248,10 @@ OperationPtr SerialOperationSendAndReceive::finalize(OperationQueue *aQueueP)
// Link into mainloop
SerialOperationQueue::SerialOperationQueue(MainLoop &aMainLoop) :
inherited(aMainLoop)
inherited(aMainLoop),
acceptBufferP(NULL),
acceptBufferSize(0),
bufferedBytes(0)
{
// Set handlers for FdComm
serialComm = SerialCommPtr(new SerialComm(aMainLoop));
......@@ -264,6 +267,7 @@ SerialOperationQueue::SerialOperationQueue(MainLoop &aMainLoop) :
SerialOperationQueue::~SerialOperationQueue()
{
serialComm->closeConnection();
setAcceptBuffer(0);
}
......@@ -306,31 +310,120 @@ void SerialOperationQueue::queueSerialOperation(SerialOperationPtr aOperation)
}
void SerialOperationQueue::setAcceptBuffer(size_t aBufferSize)
{
if (acceptBufferP) {
delete [] acceptBufferP;
acceptBufferP = NULL;
}
bufferedBytes = 0;
acceptBufferSize = 0;
if (aBufferSize>0) {
acceptBufferSize = aBufferSize;
acceptBufferP = new uint8_t[acceptBufferSize];
}
}
// deliver bytes to the most recent waiting operation
size_t SerialOperationQueue::acceptBytes(size_t aNumBytes, uint8_t *aBytes)
{
FOCUSLOG("Start of SerialOperationQueue::acceptBytes: received %d new bytes to accept\n", aNumBytes);
// first check if some operations still need processing
processOperations();
// let operations receive bytes
size_t acceptedBytes = 0;
for (OperationList::iterator pos = operationQueue.begin(); pos!=operationQueue.end(); ++pos) {
SerialOperationPtr sop = boost::dynamic_pointer_cast<SerialOperation>(*pos);
size_t consumed = 0;
if (sop)
consumed = sop->acceptBytes(aNumBytes, aBytes);
aBytes += consumed; // advance pointer
aNumBytes -= consumed; // count
acceptedBytes += consumed;
if (aNumBytes<=0)
break; // all bytes consumed
}
if (aNumBytes>0) {
// Still bytes left to accept
// TODO: possibly create "unexpected receive" operation
}
// check if some operations might be complete now
uint8_t *bytes;
size_t numBytes;
while (aNumBytes>0) {
FOCUSLOG("- %d bytes left to process\n", aNumBytes);
// buffered mode?
if (acceptBufferSize>0) {
// buffered mode - collect in buffer and then let operations process
ssize_t by = acceptBufferSize - bufferedBytes;
if (by>0) {
// still room in the buffer, append to buffer
if (aNumBytes<by) by = aNumBytes; // buffer at most aNumBytes
memcpy(acceptBufferP+bufferedBytes, aBytes, by); // buffer
bufferedBytes += by;
aNumBytes -= by;
aBytes += by;
FOCUSLOG("- %d bytes buffered, %d total buffered, %d remaining\n", by, bufferedBytes, aNumBytes);
}
else {
// buffer full, cannot store more
LOG(LOG_DEBUG, "- %d received bytes could neither be processed nor buffered -> ignored\n", aNumBytes);
break; // no point in iterating
}
// initiate processing on buffered data
bytes = acceptBufferP;
numBytes = bufferedBytes;
}
else {
// unbuffered, directly process incoming data
bytes = aBytes;
numBytes = aNumBytes;
aNumBytes = 0; // all must be consumed or are lost
}
// let operations process bytes now
if (FOCUSLOGENABLED) {
string s;
for (size_t i=0; i<numBytes; i++) {
string_format_append(s, " %02X", bytes[i]);
}
FOCUSLOG("- attempting to process %d bytes: %s\n", numBytes, s.c_str());
}
ssize_t consumed = 0;
for (OperationList::iterator pos = operationQueue.begin(); pos!=operationQueue.end(); ++pos) {
FOCUSLOG("- offering %d bytes to next operation to accept\n", numBytes);
SerialOperationPtr sop = boost::dynamic_pointer_cast<SerialOperation>(*pos);
if (sop) {
consumed = sop->acceptBytes(numBytes, bytes);
FOCUSLOG("- operation accepted %d bytes (-1: not enough to process anything)\n", consumed);
}
if (consumed==NOT_ENOUGH_BYTES) {
FOCUSLOG("- operation will accept bytes, but needs more at a time -> don't process more\n");
break; // this operation would accept bytes, but needs more of them at a time
}
bytes += consumed; // advance pointer
numBytes -= consumed; // count
acceptedBytes += consumed;
if (numBytes<=0)
break; // all bytes consumed
}
if (numBytes>0 && consumed!=NOT_ENOUGH_BYTES) {
// Still bytes left to accept, give chance to process these now
FOCUSLOG("- %d left after all pending operations asked, offer them to acceptExtraBytes()\n", numBytes);
consumed = acceptExtraBytes(numBytes, bytes);
FOCUSLOG("- acceptExtraBytes() accepted %d bytes (-1: not enough to process anything)\n", consumed);
if (consumed!=NOT_ENOUGH_BYTES) {
bytes += consumed; // advance pointer
numBytes -= consumed; // count
acceptedBytes += consumed;
}
}
// buffered mode?
if (acceptBufferSize>0) {
// in buffered mode, remove accepted bytes and keep rest for next run
bufferedBytes = numBytes;
if (numBytes>0) {
// still bytes left unprocessed, move them to the beginning of the buffer
// Note: in buffered mode, numBytes is always less or equal acceptBufferSize, so we know we can move the rest
memmove(acceptBufferP, bytes, numBytes);
}
}
else {
// unbuffered mode - bytes than cannot be processed are lost
if (numBytes>0) {
FOCUSLOG("SerialOperationQueue::acceptBytes - %d unprocessed bytes -> ignored\n", aNumBytes);
}
break;
}
} // while bytes to process
// final check if some operations might be complete now
processOperations();
// return number of accepted bytes
FOCUSLOG("End of SerialOperationQueue::acceptBytes: accepted %d bytes\n", acceptedBytes);
return acceptedBytes;
};
......
......@@ -37,6 +37,11 @@ using namespace std;
namespace p44 {
/// acceptBytes() can return this for a queue with an accept buffer to reject
/// accepting bytes now because more are needed.
#define NOT_ENOUGH_BYTES -1
// Errors
typedef enum {
SQErrorTransmit,
......@@ -83,8 +88,10 @@ namespace p44 {
/// call to deliver received bytes
/// @param aNumBytes number of bytes ready for accepting
/// @param aBytes pointer to bytes buffer
/// @return number of bytes operation could accept, 0 if none
virtual size_t acceptBytes(size_t aNumBytes, uint8_t *aBytes);
/// @return number of bytes operation could accept, 0 if none, NOT_ENOUGH_BYTES if operation would accept bytes,
/// but not enough of them are ready. Note that NOT_ENOUGH_BYTES may only be used when the SerialQueue has a
/// buffer for re-assembling messages (see SerialQueue::setAcceptBuffer())
virtual ssize_t acceptBytes(size_t aNumBytes, uint8_t *aBytes);
virtual OperationPtr finalize(OperationQueue *aQueueP = NULL);
virtual void abortOperation(ErrorPtr aError);
......@@ -133,7 +140,7 @@ namespace p44 {
size_t getDataSize() { return dataIndex; };
void clearData();
virtual size_t acceptBytes(size_t aNumBytes, uint8_t *aBytes);
virtual ssize_t acceptBytes(size_t aNumBytes, uint8_t *aBytes);
virtual bool hasCompleted();
virtual void abortOperation(ErrorPtr aError);
};
......@@ -170,6 +177,10 @@ namespace p44 {
SerialOperationTransmitter transmitter;
SerialOperationReceiver receiver;
size_t acceptBufferSize;
size_t bufferedBytes;
uint8_t *acceptBufferP;
public:
/// the serial communication channel
......@@ -187,12 +198,28 @@ namespace p44 {
/// set receiver
void setReceiver(SerialOperationReceiver aReceiver);
/// set an accept buffer
/// @param aBufferSize size of buffer that will hold received bytes until they can be processed.
/// setting a buffer size allows operations and acceptExtraBytes() to not accept bytes when there are to few bytes ready
void setAcceptBuffer(size_t aBufferSize);
/// queue a new operation
/// @param aOperation the serial IO operation to queue
void queueSerialOperation(SerialOperationPtr aOperation);
/// called to process extra bytes after all pending operations have processed their bytes
/// @param aNumBytes number of bytes ready to accept
/// @param aBytes bytes ready to accept
/// @return number of extra bytes that could be accepted, 0 if none, NOT_ENOUGH_BYTES if extra bytes would be accepted,
/// but not enough of them are ready. Note that NOT_ENOUGH_BYTES may only be used when the SerialQueue has a
/// buffer for re-assembling messages (see setAcceptBuffer())
virtual ssize_t acceptExtraBytes(size_t aNumBytes, uint8_t *aBytes) { return 0; /* base class does not accept extra bytes */ };
private:
/// base class implementation: deliver bytes to the most recent waiting operation
/// base class implementation: deliver bytes to the most recent waiting operation,
/// call acceptExtraBytes if bytes are left after all operations had chance to accept bytes.
virtual size_t acceptBytes(size_t aNumBytes, uint8_t *aBytes);
/// FdComm handler
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment