Thrift 实战
服务调用结构图
创建项目文件目录 match_system、game、thrift
和 readme.md
文件
thrift
目录用于存放 远程调用接口文件match_system
:是匹配系统game
:是游戏系统
1、在 thrift
下编写 match.thrift
文件
namespace cpp match_service
struct User {
1: i32 id,
2: string name,
3: i32 score
}
service Match {
i32 add_user(1: User user, 2: string info),
i32 remove_user(1: User user, 2: string info),
}
2、在 match_system/src
下通过 match.thrift
文件生成 C++ 服务端客户端代码,但我们只用服务端
thrift -r --gen cpp ../../thrift/match.thrift
生成的代码目录
acs@a7e3435d46dd:~/homework/lesson_6/thrift_lesson/match_system/src$ tree .
.
`-- gen-cpp
|-- Match.cpp
|-- Match.h
|-- Match_server.skeleton.cpp
|-- match_types.cpp
`-- match_types.h
gen-cpp
文件夹改名为 match_server
作为匹配系统的 server 端
把生成的服务端代码 Match_server.skeleton.cpp
移动到 src
目录下并改名为 main.cpp
3、修改 main.cpp
写自己的代码逻辑,Match Server 1.0
先给两个函数添加 return 0
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <iostream>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace std;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
return 0;
}
};
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
cout << "Start Match Server" << endl;
server.serve();
return 0;
}
编译测试
- 编译:
g+t -c main.cpp match server/*.cpp
,编译所有的.cpp
文件 - 链接:
g++ *.o -o main -lthrift
,将所有.o
文件链接起来,-lthrift
是 thrift 的动态链接库
小技巧:写 thrift 代码先编译跑通环境,再逐步往里添加模块
4、进入 game/src
目录生成 python 服务端客户端代码,只用客户端
thrift -r --gen py ../../thrift/match.thrift
生成的代码目录
.
|-- __init__.py
`-- match
|-- Match-remote # python 服务器端代码
|-- Match.py # python 客户端代码
|-- __init__.py
|-- constants.py
`-- ttypes.py
gen-cpp
文件夹改名为 match_client
作为匹配系统的 client 端
Match-remote
用不到可以删掉
创建客户端逻辑文件 client.py
,从官网 Tutorial 中复制修改。修改成从终端读入操作,客户端搞定!
from match_client.match import Match
from match_client.match.ttypes import User
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
# 从终端读入内容
from sys import stdin
def operate(op, user_id, username, score):
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = Match.Client(protocol)
# Connect!
transport.open()
user = User(user_id, username, score)
if op == "add":
client.add_user(user, "")
elif op == "remove":
client.remove_user(user, "")
# Close!
transport.close()
def main():
# 从终端中读入操作
for line in stdin:
op, user_id, username, score = line.split(' ')
operate(op, int(user_id), username, int(score))
if __name__ == "__main__":
main()
4、完善 main.cpp
,Match Server 2.0,开一个线程去执行匹配任务,同时主线程去和客户端通信。
需要用到锁、条件变量来进行线程同步
匹配逻辑:每次取匹配池里的前两名玩家进行匹配。
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool
{
public:
void save_result(int a, int b)
{
printf("Match Result: % d %d\n", a, b);
}
void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user)
{
users.push_back(user);
}
void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i ++ )
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); // 为消息队列加锁,函数退出会自动解锁
message_queue.q.push({user, "add"});
message_queue.cv.notify_all(); // 唤醒所有被阻塞的线程
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m); // 加锁,两个方法同时只能有一个方法可以获取到消息队列的锁
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all(); // 删除也是一个任务,也需要唤醒所有被阻塞的线程
return 0;
}
};
void consume_task()
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty())
{
message_queue.cv.wait(lck); // 阻塞当前线程,等待唤醒
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 解锁
// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
cout << "Start Match Server" << endl;
// 开一个线程执行匹配任务
thread matching_thread(consume_task);
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}
我们只修改了 main.cpp
,所以只需要编译 main.cpp 即可
-
编译:
g++ -c main.cpp
-
链接:
g++ *.o -o main -lthrift -pthread
,-pthread
是线程动态链接库
5、在 match_server 端调用远程保存数据服务,所以需要创建保存数据客户端 save_client
在 thrift
目录下定义 save.thrift
接口
namespace cpp save_service
service Save {
# username: myserver的名称
# password: myserver的密码的md5sum的前8位
# 用户名密码验证成功会返回0,验证失败会返回1
# 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中
i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id)
}
获取密码
- 首先
homework 4 getinfo
,获取用户名 ip 地址和密码 - 然后输入
md5sum
命令,输入密码,按Ctrl + d
生成加密后的密码,取前 8 位。
match_server 端
match_client 端
myserver 端
main.cpp
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service;
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
// 玩家池
class Pool
{
public:
void save_result(int a, int b)
{
printf("Match Result: %d %d\n", a, b);
// myserver 的 ip 地址
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
// 通过 myserver 用户名和加密后的密码,才能把数据保存到 myserver 的 result.txt 上
int res = client.save_data("acs_4851", "c4cd762a", a, b);
// if (!res) puts("success");
// else puts("failed");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}
void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user)
{
users.push_back(user);
}
void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i ++ )
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "add"});
// 唤醒
message_queue.cv.notify_all();
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
// 唤醒
message_queue.cv.notify_all();
return 0;
}
};
void consume_task()
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty())
{
// 等待
message_queue.cv.wait(lck);
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
// 解锁
lck.unlock();
// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
cout << "Start Match Server" << endl;
// 开线程
thread matching_thread(consume_task);
server.serve();
return 0;
}
6、继续完善 main.cpp
匹配逻辑 Match Server 3.0,改为每 1s 钟匹配一次,发现两名玩家分值差小于等于 50 就匹配成功。
// 玩家池
class Pool
{
public:
...
void match()
{
while (users.size() > 1)
{
sort(users.begin(), users.end(), [](User& a, User& b){
return a.score < b.score;
});
bool flag = true;
for (uint32_t i = 1; i < users.size(); i ++ )
{
auto a = users[i - 1], b = users[i];
if (b.score - a.score <= 50)
{
users.erase(users.begin() + i - 1, users.begin() + i + 1);
save_result(a.id, b.id);
flag = false;
break;
}
}
if (flag) break;
}
}
...
private:
vector<User> users;
}pool;
match_server 端
match_client 端
7、继续完善 main.cpp
匹配逻辑 Match Server 4.0,将原来的单线程匹配升级为多线程。每次调用 consume_task
匹配函数就新建一个线程。
原来使用的是 TSimpleServer
,简单版的 Server,现在改用 TThreadServer
(T - Thrift)
int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());
cout << "Start Match Server" << endl;
// 开线程
thread matching_thread(consume_task);
server.serve();
return 0;
}
还需要添加一个工厂
class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
/*cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";*/
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};
另外需要替换一些变量名,将 Calculator
替换成 Match
,完整代码
8、终极版,在第 6 步的基础上继续完善 main.cpp
匹配逻辑,原来的匹配条件是如果两个人的分支相差不超过 50 就匹配成功,但这种实现存在问题,比如当前匹配池中有两名玩家,一个 1000 分一个 500 分,在没有其他玩家参与的情况下,这两名玩家将永远不会匹配成功,这是不合理的。
所以我们可以添加等待时间机制,在等待一段时间后,还没能匹配成功的话,就将他两匹配到一起。
具体逻辑:
- 每个玩家添加绑定一个等待时间,每等一秒就将分值扩大 50
- 如果此时有两个玩家的分值都满足可以接收的差值,就匹配成功
main.cpp
匹配逻辑
bool check_match(uint32_t i, uint32_t j)
{
auto a = users[i], b = users[j];
int dt = abs(a.score - b.score);
int a_max_diff = wt[i] * 50; // 每等待 1s 扩大 50 分差
int b_max_diff = wt[i] * 50;
return dt <= a_max_diff && dt <= b_max_diff;
}
void match()
{
for (uint32_t i = 0; i < wt.size(); i ++ )
wt[i] ++ ; // 表示等待秒数 + 1
while (users.size() > 1)
{
bool flag = true;
for (uint32_t i = 0; i < users.size(); i ++ )
{
for (uint32_t j = i + 1; j < users.size(); j ++ )
{
if (check_match(i, j))
{
auto a = users[i], b = users[j];
users.erase(users.begin() + j); // 将两名玩家删除,先删后面的,再删前面的,防止下标变化
users.erase(users.begin() + i);
wt.erase(wt.begin() + j); // 将两名玩家从等待数组中删除
wt.erase(wt.begin() + i);
save_result(a.id, b.id);
flag = false;
break;
}
}
if (!flag) break;
}
if (flag) break;
}
}
再修改 add_user()
和 remove_user()
方法,添加等待数组的添加和删除。
作业代码
仓库地址:https://git.acwing.com/tonngw/thrift_lesson
成功截图
牛!!结构图是啥软件画的?
这个网站:draw.io