【JUC】线程池

线程池

连接池是创建管理连接 Connection 的缓冲池的技术,这些连接准备好被任何需要它们的线程使用

为什么要使用线程池?在高并发场景下,如果有 1000 万个请求同时访问服务器,则服务器需要新建 1000 万个线程,这样无疑是非常浪费资源的。需要一种技术,能够控制和管理线程的数量,只能同时工作一定数量的线程,使得后来的请求阻塞等待。线程池可以解决这个问题

线程池(ThreadPool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。其特点:

  • 线程复用
  • 管理线程
  • 控制最大并发数

线程池不仅能够保证内核的充分利用,还能防止过分调度。优势:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java 中的线程池是通过 Executor框架实现的,该框架中用到了 ExecutorExecutorsExecutorServiceThreadPoolExecutor这几个类( Executors为工具类,用于创建几种基础线程池):

img

阅读全文

【JVM】JVM 方法区

栈、堆、方法区的交互关系

内存区域划分:

image-20200708094507624

对象的访问定位:

image-20200708094747667

  • Person类信息(类型信息、方法信息、属性信息等)存放在元空间,也可以说方法区
  • person:类的实例对象名存放在Java栈的局部变量表中
  • new Person():类的实例对象数据存放在Java堆中
阅读全文

【RabbitMQ】RabbitMQ 常见问题

RabbitMQ 常见问题

RabbitMQ 是什么?

RabbitMQ 是实现了 高级消息队列协议(AMQP) 的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

PS: 也可能直接问什么是消息队列?消息队列就是一个使用队列来通信的组件。

RabbitMQ 特点?

  • 可靠性:RabbitMQ 使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
  • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起, 也可以通过插件机制来实现自己的交换器。
  • 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。
  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  • 多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP, MQTT 等多种消息中间件协议。
  • 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。
  • 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集 群中的节点等。
  • 插件机制:RabbitMQ 提供了许多插件, 以实现从多方面进行扩展,当然也可以编写自己的插件。

AMQP 是什么?

RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 )。 AMQP 的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

AMQP 协议 3 层?

  • Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
  • Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
  • Transport Layer:最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 模型的几大组件?

  • 交换器(Exchange):消息代理服务器中用于把消息路由到队列的组件。
  • 队列(Queue):用来存储消息的数据结构,位于硬盘或内存中。
  • 绑定(Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
阅读全文

【RabbitMQ】Spring Boot 整合 RabbitMQ

RabbitMQ 配置

  1. 导入 Maven 依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

该场景启动器将导入 RabbitMQ 的自动配置类 RabbitAutoConfiguration

image-20220115204414635

image-20220115210448861

其中,RabbitProperties 类绑定了配置文件中的 spring.rabbitmq 前缀:

image-20220115210648640
  1. 在配置文件中配置 RabbitMQ
1
2
3
4
5
6
7
8
9
10
11
12
spring:
rabbitmq:
host: yuyunzhao.cn
port: 5672
virtual-host: / # 设置虚拟主机
publisher-confirm-type: correlated # 是否启用【发布确认】:发布消息成功到交换器后会触发回调方法
publisher-returns: true # 启用【消息回退】:发送端消息抵达 Queue 失败时进行回调
template:
mandatory: true # 开启强制消息投递:发送端消息抵达 Queue 失败时进行回调,二者需要同时开启
listener:
simple:
acknowledge-mode: manual # 开启消费者手动应答。默认为自动应答

注意:rabbitmq 的属性需要配置在 spring.rabbitmq 下,否则会走默认的本地端口

  1. 在主启动类上添加 @EnableRabbit 注解:
1
2
3
4
5
6
7
@EnableRabbit
@SpringBootApplication
public class MallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(MallOrderApplication.class, args);
}
}
  1. 在配置类中注入 RabbitTemplate,定制化其消息转换器为 JSON 格式转换器,并设置确认回调失败返回回调方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Configuration
public class MyRabbitConfig {

private RabbitTemplate rabbitTemplate;

/**
* 消息转换器:使用 JSON 序列化方式将 POJO 以 JSON 形式保存到 RabbitMQ 中
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

/**
* 定制 RabbitTemplate,为其设置 JSON 消息转换器
*/
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}

/**
* 定制 RabbitTemplate
* 1. broker 收到消息就会回调
* 1.1 设置 spring.rabbitmq.publisher-confirm-type: correlated
* 1.2 设置确认回调
* 2. 消息无法正常抵达队列就会进行回调
* 2.1 设置 spring.rabbitmq.publisher-returns: true
* 设置 spring.rabbitmq.template.mandatory: true
* 2.2 设置确认回调 ReturnCallback
* 3. 消费端确认(保证每个消息都被正确消费,此时才可以从队列中删除这个消息)
*/
public void initRabbitTemplate() {
/**
* 1. 只要消息抵达 Broker 就 ack = true。并设置确认回调
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});

/**
* 2. 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}

绑定交换机和队列

  1. 可以使用 AmqpAdmin 对象创建交换机、创建队列并绑定交换机和队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Autowired
AmqpAdmin amqpAdmin;

/**
* 创建交换机
*/
@Test
public void contextLoads() {
DirectExchange directExchange = new DirectExchange("hello-java.exchange", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:","hello-java.exchange");
}

/**
* 创建队列
*/
@Test
public void createQueue() {
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]:","创建成功");
}

/**
* 绑定队列
*/
@Test
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java.exchange",
"hello.java", null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功","hello-java-binding");
}
  1. 更推荐使用 Spring 自动注入的方式创建交换机、队列并绑定二者(注意这种方式注入的队列和交换机不会立即创建,而是在由消费者监听时才会创建):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
@Configuration
public class MyRabbitMQConfig {
/**
* 订单服务总交换机
* @return
*/
@Bean
public Exchange orderEventExchange() {
/**
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
*/
return new TopicExchange("order-event-exchange", true, false);
}

/**
* 延迟队列
* @return
*/
@Bean
public Queue orderDelayQueue() {
/**
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
// 死信交换机
arguments.put("x-dead-letter-exchange", "order-event-exchange");
// 死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 消息过期时间 1分钟
arguments.put("x-message-ttl", 60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}

/**
* 普通队列(死信队列,负责存放过期的消息)
* @return
*/
@Bean
public Queue orderReleaseQueue() {
return new Queue("order.release.order.queue", true, false, false);
}

/**
* 创建订单的binding
*
* @return
*/
@Bean
public Binding orderCreateBinding() {
/**
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
*/
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,
"order-event-exchange", "order.create.order", null);
}

@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}

@Bean
public Binding orderReleaseOrderBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}

/**
* 商品秒杀队列
*
* @return
*/
@Bean
public Queue orderSecKillOrrderQueue() {
Queue queue = new Queue("order.seckill.order.queue", true, false, false);
return queue;
}

@Bean
public Binding orderSecKillOrrderQueueBinding() {
Binding binding = new Binding(
"order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);

return binding;
}
}

注意:这种方式注入的队列和交换机不会立即创建,而是在由消费者监听时才会创建

消息生产者

RabbitTemplate 可用于发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Autowired
@Test
public void sendMessageTest() {
for(int i = 1; i <= 5; i++) {
if(i % 2 == 0) {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1l);
reasonEntity.setCreateTime(new java.util.Date());
reasonEntity.setName("哈哈");

String msg = "Hello World";
// 发送消息(将被转换成 JSON 格式)
rabbitTemplate.convertAndSend("hello-java.exchange", "hello.java", reasonEntity);
} else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java.exchange", "hello.java", orderEntity);
}
log.info("消息发送完成");
}
}

消息消费者

使用 @RabbitListener@RabbitHandler 注解将消费者与队列进行绑定:

  • @RabbitListener:标注在类或方法上。如果标注在类上,则该类所有方法都会进行监听;如果标注在方法上,只有该方法会进行监听
  • @RabbitHandler:标注在方法上。根据方法中形参的类型不同,会自动决定调用哪个方法

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 监听死信队列(关单队列) order.release.order.queue,将过期的订单关闭
*/
@Component
@RabbitListener(queues = {"order.release.order.queue"})
public class OrderCloseListener {

@Autowired
private OrderService orderService;

/**
* 从队列中拿到订单实体对象 OrderEntity,调用 OrderService 关闭该订单
* 1. 如果关闭订单成功,就手动回复成功 Ack,从队列中删除该消息;
* 2. 如果关闭订单失败,就回复失败 Reject,并且重新入队:requeue=true,等待其他消费者重新消费该消息
* @param orderEntity
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void listener(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
orderService.closeOrder(orderEntity);
channel.basicAck(deliveryTag,false);
} catch (Exception e){
channel.basicReject(deliveryTag, true);
}
}
}

发布确认与消息应答

为保证消息能够可靠到达不丢失,需要引入发布确认机制与消息应答机制:

  • 生产者发布确认 confirmCallback:生产者消息成功到交换机异步回调通知生产者消息发布成功
  • 生产者消息回退 returnCallback:生产者消息如果未能从交换机投递到消息队列就异步回调通知生产者消息投递失败
  • 消费者消息应答 ack:消费者如果从消息队列收到消息并确认无误后手动或自动通知 Broker 删除该消息

image-20220118194850488

生产者发布确认

  1. 在配置文件中开启发布确认功能:
1
2
rabbitmq:
publisher-confirm-type: correlated # 开启【发布确认】:发布消息成功到交换器后会触发回调方法,异步通知生产者发布成功
  • NONE 值是禁用发布确认模式,是默认值。
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法。
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirmswaitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

生产者消息回退

  1. 在配置文件中开启消息回退功能:
1
2
3
4
5
spring:
rabbitmq:
publisher-returns: true # 开启【消息回退】:发送端消息抵达 Queue 失败时进行回调
template:
mandatory: true # 开启强制消息投递,需要与上面同时开启
  1. 在配置类中注入 RabbitTemplate,设置其确认回调(发布确认)和失败返回回调(消息回退)方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
public class MyRabbitConfig {

private RabbitTemplate rabbitTemplate;

/**
* 消息转换器:使用 JSON 序列化方式将 POJO 以 JSON 形式保存到 RabbitMQ 中
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

/**
* 定制 RabbitTemplate,为其设置 JSON 消息转换器
*/
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}

/**
* 定制 RabbitTemplate
* 1. 交换机收到消息就会回调:设置确认回调 ConfirmCallback
* 2. 消息无法正常抵达队列就会进行回调:设置确认回调 ReturnCallback
*/
public void initRabbitTemplate() {
/**
* 1. 只要消息抵达 Broker 就 ack = true。并设置确认回调
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});

/**
* 2. 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}

消费者消息应答

消费者的消息应答有两种类型:

  • 自动消息应答:在消费者收到消息后就被认为已经传送成功,并从队列中删除。这种方式容易造成消息丢失,因为消费者可能还没处理完该消息就宕机了,这时消息应该重新投递,如果被删掉了就造成消息丢失了
  • 手动消息应答分为:
    • Channel.basicAck(肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
    • Channel.basicNack(否定确认):可以批量否定确认
    • Channel.basicReject(否定确认):与 Channel.basicNack 相比少一个参数。若指定参数 requeue=true,则该消息重新入队;否则丢弃该消息
    • 如果消费者一直没有调用 ack/nack 方法,brocker 认为此消息正在被处理,不会投递给其他人也不会删除该消息。如果这时客户端断开,该消息不会被 broker 移除,而是会再次投递到队列中等待其他人消费(原理可能是 broker 会检测到其与消费者间的连接中断了,就重新投递)
  1. 在配置文件中开启消费者手动应答:
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 消费者手动应答
  1. 在通道 Channel 中回复确认收到消息
1
2
3
4
5
6
7
@RabbitHandler
public void recieveMessage(Message message.
OrderReturnEntity content,
Channel channel) throws InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}

延迟队列

实现延迟队列有两种方式:

  • 队列设置过期时间
  • 每个消息设置过期时间

队列设置过期时间:x-message-ttl:300000,整个队列的所有消息的过期时间都等于该值:

image-20220120100007189

每个消息设置过期时间:expiration:300000,只有该消息的过期时间等于该值:

image-20220120100046735

两种方式中应该选择给队列设置过期时间。这是因为 RabbitMQ 的惰性检查机制(懒检查机制):如果给每个消息设置过期时间,则 RabbitMQ 服务器每次只会先检查队首消息是否过期,如果没过期,就不检查后面的其他消息了。这就导致后进的消息如果过期时间更短反而不会被检查到,从而不能及时过期。

配置实战

在项目中使用延迟队列时,不为其设置任何消费者来监听,所以消息必定会过期,从而成为死信。过期后就会被路由到死信交换机,死信交换机会将死信消息路由到死信队列。设置消费者监听死信队列,就可以实现延迟一段时间后再消费这些消息。该过程的示意图:

image-20220120100309381

但以上方式需要创建多个交换机,其实没有必要,完全可以用一个交换机做整个服务的总交换机。例如在云商城项目中的消息队列配置示意图:

image-20220120101110172

整个订单服务只设置了一个总交换机,延迟队列和死信队列都与其绑定,通过不同的路由键实现路由到不同的队列中。

创建延迟队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 订单服务延迟队列 order.delay.queue。
* 每个订单创建成功后,都需要向延迟队列发送消息,等待30分钟后判断是否需要取消订单
* 没有消费者监听该队列
* @return
*/
@Bean
public Queue orderDelayQueue() {
/**
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
// 死信交换机
arguments.put("x-dead-letter-exchange", "order-event-exchange");
// 死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 消息过期时间 1分钟
arguments.put("x-message-ttl", 60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}

【JVM】JVM 堆

堆的核心概念

堆与进程

一个进程对应一个JVM实例,一个JVM实例有一个Runtime Data Area,其对应了一个Runtime类的单例对象。

堆针对一个JVM进程来说是唯一的,也就是一个进程只有一个JVM,但是进程包含多个线程,他们是共享同一堆空间的

image-20200706195127740

一个JVM实例只存在一个堆内存,堆也是Java内存管理的核心区域。Java堆区在JVM启动的时候即被创建,其空间大小也就确定了(堆内存的大小是可以调节的),是JVM管理的最大一块内存空间。

《Java虚拟机规范》规定,堆可以处于物理上不连续的内存空间中,但在逻辑上它应该被视为连续的。所有的线程共享Java堆,在这里还可以划分线程私有的缓冲区(Thread Local Allocation Buffer,TLAB)。

  • -Xms10m:最小堆内存
  • -Xmx10m:最大堆内存
阅读全文

【JUC】CAS 原理

CAS

CAS 是无锁实现线程安全的方式,是一种乐观锁

CAS是指Compare And Swap比较并交换,它是一条CPU并发原语,是一种很重要的同步思想,是乐观锁思想的一种实现。如果主内存的值跟期望值一样,那么就进行修改,否则一直重试,直到一致为止。

CAS并发原语体现在Java语言中就是sun.misc.Unsafe类中的各个方法(由C++实现)。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。CAS的好处是其能在保证数据一致性的同时,也保证并发性。

Java中CAS操作的执行依赖于Unsafe类的 native本地方法(由C++实现,其调用操作系统底层原语,能保证原子性)。

CAS 常和 volatile 配合使用。前者不断自旋比较共享变量在主存中的最新值是否和预期值一直,后者负责保证共享变量的值始终为主存中的最新值(可见性)。

为什么无锁效率高

  • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
  • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

但是特别频繁的 CAS 也会导致性能降低,因此 CAS 适合短时间内的并发控制,不适合长时间的自旋

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想
  • synchronized 是基于悲观锁的思想
  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响

原子类型工具类

java.util.concurrent.atomic 并发包提供了一些并发工具类,这里把它分成五类:

  • 原子基本类型
    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean :布尔型原子类
  • 原子引用 AtomicReference
  • 原子数组
    • AtomicIntegerArray:整形数组原子类
    • AtomicLongArray:长整形数组原子类
    • AtomicReferenceArray :引用类型数组原子类
  • 字段更新器 AtomicReferenceFieldUpdater
  • 原子累加器 LongAdder
阅读全文

【JVM】JVM 虚拟机栈

虚拟机栈概述

由于跨平台性的设计,Java的指令都是根据栈来设计的。不同平台CPU架构不同,所以不能设计为基于寄存器的。

基于栈的架构优点是跨平台,指令集小,编译器容易实现,缺点是性能下降,实现同样的功能需要更多的指令。

内存中的栈与堆

栈是运行时的单位,而堆是存储的单位

  • 栈解决程序的运行问题,即程序如何执行,或者说如何处理数据。
  • 堆解决的是数据存储的问题,即数据怎么放,放哪里

image-20200705163928652

阅读全文

【JUC】volatile

JMM

JMM(Java内存模型:Java Memory Model,简称JMM)本身是一种抽象的概念并不真实存在,它描述的是一组规则规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式,它从Java层面定义了主存工作内存抽象概念。JMM不是Java内存布局,不是所谓的栈、堆、方法区。

JMM关于同步的规定:

  1. 线程解锁前,必须把共享变量的值刷新回主内存
  2. 线程加锁前,必须读取主内存的最新值到自己的工作内存
  3. 加锁解锁是同一把锁

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间,其实是每个线程缓存在CPU中的高速缓存数据),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成,其简要访问过程如下图:

img

主存就是内存。工作内存指每个线程缓存在CPU中的数据(高速缓存),每个线程都在自己的CPU核心中缓存代码数据,通过总线嗅探得知主存中自己之前缓存的数据值已经过期,从而从主存中获取更新自己缓存中的数据

每个Java线程都有自己的工作内存。操作数据时,首先从主内存中读,得到一份拷贝,操作完毕后再写回到主内存。

JMM可能带来可见性原子性有序性问题。所谓可见性,就是某个线程对主内存内容的更改,应该立刻通知到其它线程。原子性是指一个操作是不可分割的,不能执行到一半,就不执行了。所谓有序性,就是指令是有序的,不会被重排

  • 原子性:保证指令不会受到线程上下文切换的影响
  • 可见性:保证指令不会受 cpu 缓存的影响
  • 有序性:保证指令不会受 cpu 指令并行优化(指令重排序)的影响

CPU 缓存结构

CPU 缓存结构示意图:

image-20220212152612808

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)

缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中。CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效(通过MESI 协议察觉)。

一旦某个缓存行内的数据不是最新的了(通过 MESI 缓存一致性协议,总线嗅探察觉到数据被其他核心锁修改,自己缓存的不再是最新值),就会将该行的缓存数据失效,重新从内存中更新值保存到缓存。

案例

下图中,两个线程分别占用两个 CPU 核心,并且都将内存中的两个数据 Cell[0]Cell[1] 缓存到自己的一个缓存行里(二者保存保存到同一个缓存行里):

image-20220212153306905

图中每一行为一个缓存行结构,一般是 64 byte(8 个 long)

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:

  • Core-0 要修改 Cell[0]
  • Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000,这时会让 Core-1 的缓存行失效

@sun.misc.Contended 注解用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行。这样,不会造成对方缓存行的失效:

image-20220212154151927

代码:

1
2
3
4
5
6
7
8
9
10
11
12
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }

// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}

volatile

volatile:易变的

volatile 是 JVM 提供的轻量级的同步机制synchronized锁是重量级的同步机制)。它可以用来修饰成员变量静态成员变量,可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的最新值。其特点:

  • 保证可见性(保证共享变量在多个线程操作下的可见性)
  • 保证有序性(禁止指令重排)
  • 不保证原子性(指令交错导致的原子性问题需要加锁来保证)

volatile 关键字常与 CAS 配合使用:自旋的过程中保证 volatile 修饰的共享变量的值永远是最新的(而非会用高速缓存值)。

原理

volatile 的底层实现原理是内存屏障:Memory Barrier(Memory Fence)

  • volatile 修饰的变量的写指令后会加入写屏障
  • volatile 修饰的变量的读指令前会加入读屏障

volatile 保证可见性

  • 写屏障(sfence)保证:在该屏障之前对共享变量的改动,结果都立即同步到主存当中
  • 读屏障(lfence)保证:在该屏障之后对共享变量的读取,加载的都是主存中的最新数据

通过读写屏障,在每次需要用到共享变量时都会保证和主存中的值一致。

其他非 volatile 修饰的共享变量仍然可以使用自己的高速缓存。有 volatile 修饰的共享变量会通过读写屏障保证可见性

volatile 保证有序性

  • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
  • 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

通过读写屏障,保证读写屏障前后的代码不会被重拍到屏障前后

volatile 无法保证原子性

volatile 无法保证原子性。无法解决高并发下的指令交错问题。仍需要加锁来解决。

synchronized

使用 synchronized 关键字能同时满足上面三个特性。在JMM中,synchronized 规定,线程在加锁时:

  • 清空自己工作内存的缓存
  • 然后在主内存中拷贝最新变量的副本到自己的工作内存告诉缓存
  • 执行再完代码将更改后的共享变量的值刷新到主内存
  • 释放 Monitor 锁
  • 其他工作线程通过总线嗅探发现自己之前缓存的值发生变化,就会重新从主内存拉取变量的最新值

总结:添加了 synchronized 关键字后,其所在线程就会强制从主存中更新当前线程所需的成员变量值到自己的高速缓存中,从而保证了可见性

注意:synchronized 包裹的代码块内仍然可能出现指令重排序其保证有序性的方式是通过保证原子性实现的。只要共享变量能在代码块内被完整保护,那么因为其原子性的特点,就能保证及时指令重排序了,也能保证是有序性的。因为其他线程都进不来,并且共享变量是被完全保护住的,所以就间接保证了有序性。

阅读全文

【JVM】JVM 运行时数据区

运行时数据区概述

image-20200705111640511

当我们通过前面的:类的加载 -> 验证 -> 准备 -> 解析 -> 初始化 这几个阶段完成后,就会用到执行引擎对我们的类进行使用,同时执行引擎将会使用到我们运行时数据区

image-20200705111843003

阅读全文

【JVM】JVM 类加载机制

类加载器子系统概述

img

img

img

如果自己想手写一个Java虚拟机的话,主要考虑哪些结构呢?

  • 类加载器
  • 执行引擎
阅读全文