vertx-practice
Table of Contents
vertx
1.介绍
轻量级的异步框架,不过建议只引入作为web层,或者通信的框架,不建议直接抛弃spring
,最近打比赛用到了,还专门写了个模拟spring-mvc
的整合框架
2. 痛点
- jdbc,orm(mybatis-plus) 怎么办? 阻塞式api出现在响应式框架?
- web 层的route需要都写在一个类里?如何像spring-mvc一样开发?
- 如何处理复杂业务?陷入回调地狱?
3.解决思路
- 目前思路是对于需要返回结果的,直接放在
workerVerticle
,然后回调,不阻塞eventloop
线程,但是又存在线程切换的开销,只能平衡了 - 可以将方法返回 Handler这样可以收集Handler和RouteInfo,最后统一挂载,详见代码
- 目前无法解决,只能说异步只适合某些简单的特定场景
Object result = XXXService.selectOne(new LambdaQueryWrapper()...);
if(result == null ) {
// do something
}
认为其大概率是正常工作的,所以对于一些装配上下文的情况,早点查询,不过如果之后的校验出错,岂不是浪费了数据库资源?白白执行了这么多sql,这时候请注意前提:认为通常情况下校验成功,感觉这是个伪命题,还是避免block
ctx.vertx().executingBlocking(Callable call)
注意,有时候我们简单使用mp
的时候,经常不对其结果进行处理,比如deleteById()
,所以异步时,往往想使用一个Runnable
而不是Callable
,但是这些api
都是阻塞,存在返回结果
4. 实现
github 描述核心代码:
public class JacksonFactory implements io.vertx.core.spi.JsonFactory {
public static final JacksonFactory INSTANCE = new JacksonFactory();
public static final JacksonCodec CODEC;
static {
JacksonCodec codec;
try {
codec = new DatabindCodec();
} catch (Throwable ignore) {
// No databind
codec = new JacksonCodec();
}
CODEC = codec;
}
@Override
public JsonCodec codec() {
return CODEC;
}
}
这个是vertx web
的Json实现工厂,存在JacksonCodec和DatabindCodec,这里很坑的一点就是,databind 是option的,所以需要自己导依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
不然 JsonObject.mapTo() 或者 ctx.body().asPojo() 都会fail to decode
给出一些代码的实现
public Router routerHandle() {
// cors config
Router router = httpServerConfig.getRouter();
// config cors
router.route().handler(CorsHandler.create()
.allowedMethods(new HashSet<HttpMethod>() {{
add(HttpMethod.GET);
add(HttpMethod.POST);
add(HttpMethod.OPTIONS);
add(HttpMethod.PUT);
add(HttpMethod.DELETE);
add(HttpMethod.HEAD);
}}));
// request body handler --> json
router.route().handler(BodyHandler.create(false));
// init Interceptor
initInterceptor(router);
// init message Convertor
initVertxMessageConverter();
// init template engine
initVertxTemplateEngine();
// pre inception
registerInterceptorPreHandler(router);
// process route handler
registerRouterHandler(router);
// process Exception
registerRouterExceptionHandler(router);
// after inception
registerInterceptorAfterHandler(router);
return router;
}
这相当于伪代码了,主要的处理逻辑在这里,介绍下 registerRouterHander
private void registerRouterHandler(Router router) {
Set<Object> routeHandlers = beanFactory.getTypesAnnotatedWith(RouteHandler.class);
if(routeHandlers == null || routeHandlers.size() == 0) return;
routeHandlers.forEach(handler->{
// extract current RouteHandler route info
List<RouteInfo> routeInfos = extractRouteInfos(handler.getClass());
routeInfos.stream().sorted(Comparator.comparingInt(RouteInfo::getOrder)).forEach(routeInfo->{
// binding method handler
Handler<RoutingContext> methodHandler = routeInfo.getMethodHandler();
Route route = router.route(routeInfo.getRouteMethod(), routeInfo.getRoutePath());
// set blocking handler ,such as sql execute handler
if(routeInfo.isBlocked()) route.blockingHandler(methodHandler);
else route.handler(methodHandler);
String[] consumes = routeInfo.getConsumes();
if (StringKit.isNotEmpty(List.of(consumes))) {
Arrays.stream(consumes).forEach(route::consumes);
}
String[] produces = routeInfo.getProduces();
if(StringKit.isNotEmpty(List.of(produces))) {
Arrays.stream(produces).forEach(route::produces);
}
});
});
}
private List<RouteInfo> extractRouteInfos(Class routeHandlerClass) {
RouteHandler routeHandlerAnnotation = RouteKit.getRouteHandler(routeHandlerClass);
String serverPrefix = httpServerConfig.getServerBasePath();
String routePrefix = routeHandlerAnnotation.value();
Object o = beanFactory.get(routeHandlerClass);
return Stream.of(routeHandlerClass.getMethods()).filter(method -> method.isAnnotationPresent(RouteMapping.class))
.map(method -> {
RouteMapping routeMapping = RouteKit.getRouteMapping(method);
com.yutak.vertx.core.HttpMethod httpMethod = routeMapping.method();
String routeUrl = RouteKit.buildRouteUrl(serverPrefix, routePrefix, routeMapping.path());
RouteInfo r = new RouteInfo();
r.setRoutePath(routeUrl);
//set route method
r.setRouteMethod(HttpMethod.valueOf(httpMethod.name()));
r.setOrder(routeMapping.order());
r.setConsumes(routeMapping.consumes());
r.setProduces(routeMapping.produces());
RouteKit.resolveRouteMethodHandler(method, o, httpServerConfig, r);
return r;
}).toList();
}
解析 HttpMethod
为什么不直接使用 vertx
的呢?主要是 @interface只支持Enum不支持 final Class,对比vertx3和4就行了
@RouteHandler("/user")
@Component
@RequiredArgsConstructor
public class UserApi {
// private static final Logger log = LoggerFactory.getLogger(UserApi.class);
private final UserService userService;
private final DeviceService deviceService;
@RouteMapping(path = "/device/delete", method = HttpMethod.GET)
public Handler<RoutingContext> delDevice() {
return ctx -> {
LambdaQueryWrapper<Device> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Device::getDeviceId, ctx.request().getParam("deviceId"));
wrapper.eq(Device::getUid, ctx.request().getParam("uid"));
ctx.end("ok");
ctx.vertx().executeBlocking(() -> userService.removeById(wrapper));
};
}
@RouteMapping(path = "/device/list",method = HttpMethod.GET)
public Handler<RoutingContext> deviceList() {
return ctx -> {
LambdaQueryWrapper<Device> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Device::getUid, ctx.request().getParam("uid"));
wrapper.orderByAsc(Device::getLastLogin);
ctx.vertx().executeBlocking(()->deviceService.list(wrapper)).onSuccess(t->{
if( t == null ) {
ctx.end("no device found");
return;
}
JsonObject json = new JsonObject(t.stream().collect(Collectors.toMap(Device::getDeviceName, d -> d)));
ctx.end(json.encode());
});
};
}
}
这个是开发案例,特别注意的是 误认为 return 会结束这次请求
,必须调用 ctx.end() || ctx.reponse.end()
@Component
是为了让spring beanFactory
能收集到,这里使用spring ioc
十分方便,不然自行注入太麻烦了
RouteMapping
可以直接映射为block
源码分析
Buffer
其实在vertx中使用Buffer太简单了,也无需像netty中管理池子,平常使用直接 Buffer.buffer()可以了,这是因为vertx的Buffer的默认创建走的是HeapPool
,还有Buffer实际是个接口,真正实现是BufferImpl
public class BufferImpl implements BufferInternal {
// netty 的 ByteBuf
private ByteBuf buffer;
// 构造方法
public BufferImpl() {
this(0);
}
// 池化
BufferImpl(int initialSizeHint) {
buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(initialSizeHint, Integer.MAX_VALUE);
}
BufferImpl(byte[] bytes) {
buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(bytes.length, Integer.MAX_VALUE).writeBytes(bytes);
}
...
}
Vertx的Buffer
分配对象也是直接用了netty
的
public abstract class VertxByteBufAllocator extends AbstractByteBufAllocator {
/**
* Vert.x pooled allocator.
*/
public static final ByteBufAllocator POOLED_ALLOCATOR = new PooledByteBufAllocator(true);
/**
* Vert.x shared unpooled allocator.
*/
public static final ByteBufAllocator UNPOOLED_ALLOCATOR = new UnpooledByteBufAllocator(false);
private static final VertxByteBufAllocator UNSAFE_IMPL = new VertxByteBufAllocator() {
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return new VertxUnsafeHeapByteBuf(this, initialCapacity, maxCapacity);
}
};
private static final VertxByteBufAllocator IMPL = new VertxByteBufAllocator() {
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return new VertxHeapByteBuf(this, initialCapacity, maxCapacity);
}
};
// 默认实现是这个
public static final VertxByteBufAllocator DEFAULT = PlatformDependent.hasUnsafe() ? UNSAFE_IMPL : IMPL;
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
return UNPOOLED_ALLOCATOR.directBuffer(initialCapacity, maxCapacity);
}
@Override
public boolean isDirectBufferPooled() {
return false;
}
}
后续
之后对于 vertx几乎没怎么使用了,写业务代码真的太痛苦了,1,2个阻塞式api还好
但是面向数据库编程根本写不了,调试也做不了,现在yutak这个项目也废弃了