switch to Boost for network connectivity

This commit is contained in:
2025-09-26 20:03:28 -05:00
parent 5f57f4631b
commit 80690945fe
3 changed files with 98 additions and 75 deletions

View File

@@ -6,39 +6,55 @@
#include <unistd.h> #include <unistd.h>
#include "transfer.h" #include "transfer.h"
#include "logging.h" #include "logging.h"
#include <chrono>
#include <boost/asio.hpp>
using namespace std; using namespace std;
using boost::asio::ip::tcp;
int main() { int main() {
const int FRAME_DELAY_MS = 1000 / 30;
// create video capture // create video capture
cv::VideoCapture cap = cv::VideoCapture(0); cv::VideoCapture cap = cv::VideoCapture(0);
try {
boost::asio::io_context io_context;
tcp::resolver resolver(io_context);
// create socket tcp::resolver::results_type endpoints =
int clientSocket = socket(AF_INET, SOCK_STREAM, 0); resolver.resolve("127.0.0.1", "8080");
// specifying address cv::Mat image = cv::Mat::zeros(cv::Size(640, 480), CV_8UC3);
sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_port = htons(8080);
serverAddress.sin_addr.s_addr = INADDR_ANY;
cv::Mat image = cv::Mat::zeros(cv::Size(640, 480), CV_8UC3); info("Ready to connect.");
// sending connection request
tcp::socket socket(io_context);
boost::asio::connect(socket, endpoints);
info("Ready to connect."); info("Connected.");
// sending connection request // create buffer for serialization
connect(clientSocket, reinterpret_cast<sockaddr *>(&serverAddress), vector<uchar> imgbuf;
sizeof(serverAddress));
info("Connected."); while (true) {
// create buffer for serialization auto start_time = std::chrono::high_resolution_clock::now();
vector<uchar> imgbuf; cap.read(image);
trace("Sending image");
sendImage(socket, image, imgbuf);
while (true) { auto end_time = std::chrono::high_resolution_clock::now();
cap.read(image); auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
trace("Sending image");
sendImage(clientSocket, image, imgbuf); // Calculate the remaining time to sleep
auto sleep_duration = std::chrono::milliseconds(FRAME_DELAY_MS) - elapsed_time;
// Sleep for the remaining duration if positive
if (sleep_duration.count() > 0) {
std::this_thread::sleep_for(sleep_duration);
}
}
}
catch (std::exception& e) {
error(e.what());
} }
close(clientSocket);
return 0; return 0;
} }

View File

@@ -1,5 +1,6 @@
#include <opencv2/highgui.hpp> #include <opencv2/highgui.hpp>
#include <opencv2/core/mat.hpp> #include <opencv2/core/mat.hpp>
#include <opencv2/imgproc.hpp>
#include <cstring> #include <cstring>
#include <iostream> #include <iostream>
#include <netinet/in.h> #include <netinet/in.h>
@@ -7,50 +8,49 @@
#include <unistd.h> #include <unistd.h>
#include "transfer.h" #include "transfer.h"
#include "logging.h" #include "logging.h"
#include <boost/asio.hpp>
using namespace std; using namespace std;
using boost::asio::ip::tcp;
int main() { int main() {
// TODO: read image data from socket instead of VideoCapture try {
// creating socket boost::asio::io_context io_context;
int serverSocket = socket(AF_INET, SOCK_STREAM, 0); // creating socket
// specifying the address tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 8080));
sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_port = htons(8080);
serverAddress.sin_addr.s_addr = INADDR_ANY;
// binding socket. tcp::socket socket(io_context);
bind(serverSocket, reinterpret_cast<sockaddr *>(&serverAddress),
sizeof(serverAddress));
info("Ready to accept connections."); info("Ready to accept connections.");
// listening to the assigned socket
listen(serverSocket, 5);
// accepting connection request // accepting connection request
int clientSocket = accept(serverSocket, nullptr, nullptr); acceptor.accept(socket);
info("Client connected."); info("Client connected.");
// TODO: handle multiple images // TODO: handle multiple images
cv::Mat image = cv::Mat::zeros(cv::Size(640, 480), CV_8UC3); cv::Mat image = cv::Mat::zeros(cv::Size(640, 480), CV_8UC3);
bool running = true; bool running = true;
// TODO: make this asynchronous. probably do that in tandem with setting up networking // TODO: make this asynchronous. probably do that in tandem with setting up networking
while (running) { while (running) {
// receive data // receive data
vector<uchar> buffer; vector<uchar> buffer;
trace("Receiving image"); trace("Receiving image");
recvImage(clientSocket, buffer); int latency = recvImage(socket, buffer);
trace("Applying new data to image"); trace("Applying new data to image");
applyImage(image, &buffer); applyImage(image, &buffer);
trace("Displaying image"); cv::putText(image, std::format("Latency: {}ms", latency), cv::Point(0, 480), cv::FONT_HERSHEY_PLAIN, 1, cv::Scalar(255, 0, 0), 1, cv::LINE_AA);
imshow("image", image);
running = cv::waitKey(30) != 27; trace("Displaying image");
imshow("image", image);
running = cv::waitKey(30) != 27;
}
}
catch (std::exception& e) {
error(e.what());
} }
close(serverSocket);
return 0; return 0;
} }

View File

@@ -5,10 +5,12 @@
#include <opencv2/core/mat.hpp> #include <opencv2/core/mat.hpp>
#include <opencv2/imgcodecs.hpp> #include <opencv2/imgcodecs.hpp>
#include <sys/socket.h> #include <sys/socket.h>
#include <logging.h> #include "logging.h"
#include <vector> #include <vector>
#include <boost/asio.hpp>
using namespace std; using namespace std;
using boost::asio::ip::tcp;
struct imageHeader { struct imageHeader {
size_t size; size_t size;
@@ -29,7 +31,9 @@ inline void serializeImage(const cv::Mat& image, std::vector<uchar>& buffer) {
cv::imencode(".jpg", image, buffer); cv::imencode(".jpg", image, buffer);
} }
int sendImage(int socket, const cv::Mat& image, std::vector<uchar>& buffer) { void sendImage(tcp::socket& socket, const cv::Mat& image, std::vector<uchar>& buffer) {
boost::system::error_code e;
serializeImage(image, buffer); serializeImage(image, buffer);
size_t totalSent = 0; size_t totalSent = 0;
@@ -40,30 +44,33 @@ int sendImage(int socket, const cv::Mat& image, std::vector<uchar>& buffer) {
header.size = size; header.size = size;
header.timestamp = timestamp; header.timestamp = timestamp;
trace("Buffer size: ", size); trace("Buffer size: ", size);
if (const ssize_t sent = send(socket, &header, sizeof(header), 0); sent == -1) { if (const ssize_t sent = boost::asio::write(socket, boost::asio::buffer(&header, sizeof(header)), e); sent == -1) {
error("Error sending data header"); throw boost::system::system_error(e);
return -1;
} }
// then start sending the serialized image // then start sending the serialized image
while (totalSent < size) { while (totalSent < size) {
const ssize_t sent = send(socket, buffer.data() + totalSent, size - totalSent, 0); const ssize_t sent = boost::asio::write(socket, boost::asio::buffer(buffer), e);
if (sent == -1) { if (sent == -1) {
error("Error sending data"); throw boost::system::system_error(e);
return -1;
} }
totalSent += sent; totalSent += sent;
debug("Packet sent (", sent, " bytes, total ", totalSent, " bytes)"); debug("Packet sent (", sent, " bytes, total ", totalSent, " / ", size, " bytes)");
} }
return 0;
} }
int recvImage(int socket, std::vector<uchar>& buffer) { int recvImage(tcp::socket& socket, std::vector<uchar>& buffer) {
boost::system::error_code e;
// first receive the size of the image // first receive the size of the image
imageHeader header; imageHeader header;
if (ssize_t recvd = recv(socket, &header, sizeof(imageHeader), 0); recvd <= 0) {
error("Error receiving data header"); size_t len = 0;
return -1; while (len < sizeof(imageHeader)) {
len += socket.read_some(boost::asio::buffer(&header, sizeof(header)), e);
if (e.failed()) {
throw boost::system::system_error(e);
}
} }
size_t dataSize = header.size; size_t dataSize = header.size;
@@ -77,20 +84,20 @@ int recvImage(int socket, std::vector<uchar>& buffer) {
// start receiving the image until the whole thing arrives // start receiving the image until the whole thing arrives
size_t totalReceived = 0; size_t totalReceived = 0;
while (totalReceived < dataSize) { while (totalReceived < dataSize) {
ssize_t bytesReceived = recv(socket, buffer.data() + totalReceived, dataSize, 0); size_t bytesReceived = boost::asio::read(socket, boost::asio::buffer(buffer), e);
if (bytesReceived <= 0) { if (e == boost::asio::error::eof) // Connection closed cleanly by peer.
error("Error receiving data"); throw boost::system::system_error(e);
buffer.clear(); else if (e)
return -1; throw boost::system::system_error(e);
}
totalReceived += bytesReceived; totalReceived += bytesReceived;
debug("Packet received (", bytesReceived, " bytes, total ", totalReceived, " bytes)"); debug("Packet received (", bytesReceived, " bytes, total ", totalReceived, " / ", dataSize, " bytes)");
} }
chrono::milliseconds currentTime = getMillis(); chrono::milliseconds currentTime = getMillis();
chrono::milliseconds diff = currentTime - sentTime; chrono::milliseconds diff = currentTime - sentTime;
debug("Packet latency: ", diff.count(), "ms"); debug("Packet latency: ", diff.count(), "ms");
return 0;
return diff.count();
} }
bool applyImage(cv::Mat& image, std::vector<uchar> *src) { bool applyImage(cv::Mat& image, std::vector<uchar> *src) {