This commit is contained in:
ljt 2024-11-15 10:14:00 +08:00
commit 96030a75c1
37 changed files with 2844 additions and 0 deletions

33
.gitignore vendored Normal file
View File

@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

19
.mvn/wrapper/maven-wrapper.properties vendored Normal file
View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.2
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.7/apache-maven-3.9.7-bin.zip

3
META-INF/MANIFEST.MF Normal file
View File

@ -0,0 +1,3 @@
Manifest-Version: 1.0
Main-Class: com.example.demo.DemoApplication

139
pom.xml Normal file
View File

@ -0,0 +1,139 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>demoChild</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>cn.org.wangchangjiu</groupId>
<artifactId>sqltomongo-converter</artifactId>
<version>1.0.2-RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!-- ciot httpClient-->
<dependency>
<groupId>com.ciot.gd</groupId>
<artifactId>ciot-client</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.69</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version> <!-- 或其他适合的版本 -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- kafka-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.kafka</groupId>-->
<!-- <artifactId>spring-kafka</artifactId>-->
<!-- <version>3.2.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- for test-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- grpc客户端-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
<!-- remoting客户端-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version>
</dependency>
<!-- 工具类-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-proto</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

3
src/META-INF/MANIFEST.MF Normal file
View File

@ -0,0 +1,3 @@
Manifest-Version: 1.0
Main-Class: com.example.demo.DemoApplication

View File

@ -0,0 +1,108 @@
package com.example.demo;
import com.example.demo.dto.AccountPermissionDto;
import com.example.demo.grpcProtocol.ProducerExample;
import com.example.demo.grpcProtocol.PushConsumerExample;
import com.example.demo.remotingProtocol.ConsumerRemoting;
//import com.example.demo.controller.ProducerExample;
import com.example.demo.remotingProtocol.ProdurcerRemoting;
//import com.example.demo.controller.PushConsumerExample;
//import org.apache.rocketmq.client.apis.ClientException;
import com.example.demo.rocketMq.MyClusterListSubCommand;
import com.example.demo.rocketMq.MyUpdateAccessConfigSubCommand;
import org.apache.rocketmq.client.apis.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
@SpringBootApplication
public class DemoApplication {
public static final String TOPIC = "1000101_test2";
public static final String GROUP = "ljtConsumerGroup";
public static void main(String[] args) throws IOException, InterruptedException, ClientException {
// PushConsumerExample.main(null);
SpringApplication.run(DemoApplication.class, args);
// String opt = System.getProperty("opt");
// if (opt.equals("0")) {
// testMessageSendAndReceive();
// }else {
// testACL();
// }
}
private static void testMessageSendAndReceive() throws InterruptedException {
System.out.println("开始测试消息收发...");
String protocol = System.getProperty("protocol");
if (Objects.equals(protocol, "grpc")) {
CompletableFuture.supplyAsync(()->{
try {
// PushConsumerExample.main(null);
} catch (Throwable e) {
e.printStackTrace();
}
return null;
});
CompletableFuture.supplyAsync(()->{
try {
ProducerExample.main(null);
} catch (Throwable e) {
e.printStackTrace();
}
return null;
});
}else {
CompletableFuture.supplyAsync(() -> {
try {
ConsumerRemoting.main(null);
} catch (Throwable e) {
e.printStackTrace();
}
return null;
});
CompletableFuture.supplyAsync(() -> {
try {
ProdurcerRemoting.main(null);
} catch (Throwable e) {
e.printStackTrace();
}
return null;
});
}
Thread.sleep(1000*10);
}
// private static void testACL() {
// System.out.println("开始测试acl修改...");
// MyMQAdminStartup myAdminStartup = new MyMQAdminStartup();
// myAdminStartup.init();
//
// AccountPermissionDto accountPermissionDto = AccountPermissionDto.valueOfTestData(123);
// if (!accountPermissionDto.isValid()) {
// throw new IllegalArgumentException("无效参数:"+accountPermissionDto.toString());
// }
//
// String[] clusterListArgs = new String[]{
// MyClusterListSubCommand.COMMAND_NAME,
// "-n", myAdminStartup.namesrvAddr
// };
// myAdminStartup.runACommand(clusterListArgs);
// if (MyClusterListSubCommand.BROKER_ADDRESSES.size()<=0) {
// throw new RuntimeException("无法找到broker");
// }
// CopyOnWriteArrayList<String> brokerAddresses = MyClusterListSubCommand.BROKER_ADDRESSES;
//
// for (String brokerAddr : brokerAddresses) {
// myAdminStartup.runACommand(accountPermissionDto.buildCreateAccountCommand(
// MyUpdateAccessConfigSubCommand.COMMAND_NAME,myAdminStartup.namesrvAddr,brokerAddr));
// }
// }
}

View File

@ -0,0 +1,34 @@
package com.example.demo.Enum;
/**
* RocketMQ的权限类型
*/
public enum PermissionTypeOfRocketMQ {
TOPIC(0,"topic"),
GROUP(1,"group"),
;
int code;
String name;
PermissionTypeOfRocketMQ(int code,String name){
this.code = code;
this.name = name;
}
public int getCode() {
return code;
}
public String getName() {
return name;
}
@Override
public String toString() {
return "PermissionTypeOfRocketMQ{" +
"code=" + code +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,52 @@
package com.example.demo.Enum;
/**
* rocketMQ的权限字段枚举
*/
public enum PermissionValueOfRocketMQ {
/**
* 订阅权限
*/
SUB(0,"SUB"),
/**
* 发布权限
*/
PUB(1,"PUB"),
/**
* 订阅和发布权限
*/
SUB_PUB(2,"SUB|PUB"),
/**
* 没有任何权限
*/
DENY(3,"DENY");
String permissionString;
int code;
PermissionValueOfRocketMQ(int code, String permissionString){
this.code = code;
this.permissionString = permissionString;
}
@Override
public String toString() {
return "PermissionValueOfRocketMQ{" +
"permissionString='" + permissionString + '\'' +
", code=" + code +
'}';
}
public String getPermissionString() {
return permissionString;
}
public int getCode() {
return code;
}
}

View File

@ -0,0 +1,89 @@
package com.example.demo.controllers;
import com.example.demo.grpcProtocol.Config;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Controller
public class MessageController {
public static final Logger logger = LoggerFactory.getLogger(MessageController.class);
@GetMapping("/message")
public String getMessagePage() {
return "message0";
}
@GetMapping("/consumeMessage")
@ResponseBody
public List<String> consumeMessage(@RequestParam String addr,
@RequestParam String group,
@RequestParam String topic,
@RequestParam String ak,
@RequestParam String sk) {
// Simulating message consumption based on parameters
// List<String> messages = new ArrayList<>();
// for (int i = 0; i < 10; i++) {
// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
// }
// return messages;
try {
return consumerMessage(addr,group,topic,ak,sk);
} catch (Throwable e) {
e.printStackTrace();
return Collections.singletonList(e.getMessage());
}
}
public List<String> consumerMessage(String addr,String group,String topic,String accessKey,String secretKey) throws InterruptedException, ClientException {
ArrayList<String> results = new ArrayList<>();
ClientConfiguration clientConfiguration = new ClientConfigurationBuilder()
.setEndpoints(addr)
.setCredentialProvider(new StaticSessionCredentialsProvider(accessKey,secretKey))
.build();
PushConsumer pushConsumer = ClientServiceProvider.loadService().newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(group)
.setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
.setMessageListener(messageView -> {
ByteBuffer byteBuffer = messageView.getBody();
String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
results.add(content);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(4_000);
CompletableFuture.supplyAsync(()->{
try {
pushConsumer.close();
} catch (IOException e) {
e.printStackTrace();
}
return null;
});
return results;
}
}

View File

@ -0,0 +1,17 @@
package com.example.demo.controllers;
import com.example.demo.dto.TestMessage;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@PostMapping("/test")
public void test(@RequestBody TestMessage testMessage){
System.out.println(testMessage);
}
}

View File

@ -0,0 +1,174 @@
package com.example.demo.dto;
import com.example.demo.Enum.PermissionTypeOfRocketMQ;
import com.example.demo.Enum.PermissionValueOfRocketMQ;
import java.util.ArrayList;
import java.util.List;
public class AccountPermissionDto {
private String accessKey;
private String secretKey;
private List<PermissionOfRocketMQDto> topicPermissions;
private List<PermissionOfRocketMQDto> groupPermissions;
public static AccountPermissionDto valueOfTestData(int i) {
AccountPermissionDto accountPermissionDto = new AccountPermissionDto();
accountPermissionDto.accessKey = "testAccessKey"+i;
accountPermissionDto.secretKey = "testSecretKey";
accountPermissionDto.topicPermissions = new ArrayList<>();
PermissionOfRocketMQDto e = new PermissionOfRocketMQDto();
e.setPermissionName("TestTopic_ljt");
e.setPermissionType(PermissionTypeOfRocketMQ.TOPIC);
e.setPermissionValue(PermissionValueOfRocketMQ.SUB);
accountPermissionDto.topicPermissions.add(e);
PermissionOfRocketMQDto e1 = new PermissionOfRocketMQDto();
e1.setPermissionName("TestTopic_ljt2");
e1.setPermissionType(PermissionTypeOfRocketMQ.TOPIC);
e1.setPermissionValue(PermissionValueOfRocketMQ.SUB);
accountPermissionDto.topicPermissions.add(e1);
accountPermissionDto.groupPermissions = new ArrayList<>();
PermissionOfRocketMQDto e2 = new PermissionOfRocketMQDto();
e2.setPermissionName("ljtConsumerGroup");
e2.setPermissionType(PermissionTypeOfRocketMQ.GROUP);
e2.setPermissionValue(PermissionValueOfRocketMQ.SUB);
accountPermissionDto.groupPermissions.add(e2);
PermissionOfRocketMQDto e3 = new PermissionOfRocketMQDto();
e3.setPermissionName("ljtConsumerGroup2");
e3.setPermissionType(PermissionTypeOfRocketMQ.GROUP);
e3.setPermissionValue(PermissionValueOfRocketMQ.SUB);
accountPermissionDto.groupPermissions.add(e3);
return accountPermissionDto;
}
/**
* 参数是否有效
* @return boolean
*/
public boolean isValid() {
if(accessKey == null || accessKey.equals("") || secretKey == null || secretKey.equals("")){
return false;
}
if(!isTopicValidate() && !isGroupValidate()){
return false;
}
return true;
}
private boolean isGroupValidate() {
if(!haveGroupPermissions()){
return false;
}else{
for (PermissionOfRocketMQDto groupPermission : groupPermissions) {
if(!groupPermission.isValidate())return false;
}
}
return true;
}
private boolean isTopicValidate() {
if(!haveTopicPermissions()) {
return false;
}else{
for (PermissionOfRocketMQDto topicPermission : topicPermissions) {
if(!topicPermission.isValidate())return false;
}
}
return true;
}
/**
* 构建创建账户的mqadmin命令
* @return String[]
*/
public String[] buildCreateAccountCommand(String commandName, String serverAddr, String brokerAddr) {
List<String> commandList = new ArrayList<>();
// commandList.add("updateAclConfig");
commandList.add(commandName);
commandList.add("-n");
commandList.add(serverAddr);
commandList.add("-b");
commandList.add(brokerAddr);
commandList.add("-a");
commandList.add(accessKey);
commandList.add("-s");
commandList.add(secretKey);
if(haveTopicPermissions()){
commandList.add("-t");
commandList.add(buildTopicCommand());
}
if(haveGroupPermissions()){
commandList.add("-g");
commandList.add(buildGroupCommand());
}
return commandList.toArray(new String[0]);
}
private String buildGroupCommand() {
return buildCommandLine(groupPermissions);
}
private boolean haveGroupPermissions() {
return groupPermissions!=null && groupPermissions.size()>0;
}
private String buildTopicCommand() {
return buildCommandLine(topicPermissions);
}
private boolean haveTopicPermissions() {
return topicPermissions!=null && topicPermissions.size()>0;
}
private String buildCommandLine(List<PermissionOfRocketMQDto> permissions) {
StringBuilder r = new StringBuilder();
boolean isTheFirstLine = true;
for (PermissionOfRocketMQDto permission : permissions) {
if (!isTheFirstLine) {
r.append(",");
}else{
isTheFirstLine = false;
}
r.append(permission.buildCommand());
}
return r.toString();
}
public boolean isAccessKeyValid() {
return accessKey!=null && accessKey!="";
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public List<PermissionOfRocketMQDto> getTopicPermissions() {
return topicPermissions;
}
public void setTopicPermissions(List<PermissionOfRocketMQDto> topicPermissions) {
this.topicPermissions = topicPermissions;
}
public List<PermissionOfRocketMQDto> getGroupPermissions() {
return groupPermissions;
}
public void setGroupPermissions(List<PermissionOfRocketMQDto> groupPermissions) {
this.groupPermissions = groupPermissions;
}
}

View File

@ -0,0 +1,80 @@
package com.example.demo.dto;
import com.example.demo.Enum.PermissionTypeOfRocketMQ;
import com.example.demo.Enum.PermissionValueOfRocketMQ;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* rocketMQ权限
*/
public class PermissionOfRocketMQDto {
private String permissionName;
private PermissionTypeOfRocketMQ permissionType;
private PermissionValueOfRocketMQ permissionValue;
public PermissionOfRocketMQDto() {
}
public PermissionOfRocketMQDto(String permissionName, PermissionTypeOfRocketMQ permissionType, PermissionValueOfRocketMQ permissionValue) {
this.permissionName = permissionName;
this.permissionType = permissionType;
this.permissionValue = permissionValue;
}
public String getPermissionName() {
return permissionName;
}
public void setPermissionName(String permissionName) {
this.permissionName = permissionName;
}
public PermissionTypeOfRocketMQ getPermissionType() {
return permissionType;
}
public void setPermissionType(PermissionTypeOfRocketMQ permissionType) {
this.permissionType = permissionType;
}
public PermissionValueOfRocketMQ getPermissionValue() {
return permissionValue;
}
public void setPermissionValue(PermissionValueOfRocketMQ permissionValue) {
this.permissionValue = permissionValue;
}
/**
* 构建topic权限对象
* @param topicName 入参
* @param permissionValue 权限枚举类
* @return PermissionOfRocketMQDto
* @throws IllegalArgumentException 非法参数
*/
public static PermissionOfRocketMQDto valueOfTopic(String topicName,PermissionValueOfRocketMQ permissionValue) throws IllegalArgumentException{
if(topicName==null){
throw new IllegalArgumentException("topic name can not be null");
}
return new PermissionOfRocketMQDto(topicName, PermissionTypeOfRocketMQ.TOPIC,permissionValue);
}
public static PermissionOfRocketMQDto valueOfGroup(String name, PermissionValueOfRocketMQ permissionValue) throws IllegalArgumentException{
if(name==null){
throw new IllegalArgumentException("group name can not be null");
}
return new PermissionOfRocketMQDto(name, PermissionTypeOfRocketMQ.GROUP, permissionValue);
}
public String buildCommand() {
return getPermissionName()+"="+getPermissionValue().getPermissionString();
}
public boolean isValidate() {
return permissionName!=null && permissionName!="";
}
}

View File

@ -0,0 +1,35 @@
package com.example.demo.dto;
import com.fasterxml.jackson.annotation.JsonIgnore;
public class TestMessage {
String name;
@JsonIgnore
String age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
@Override
public String toString() {
return "TestMessage{" +
"name='" + name + '\'' +
", age='" + age + '\'' +
'}';
}
}

View File

@ -0,0 +1,14 @@
package com.example.demo.grpcProtocol;
import java.util.Calendar;
public class Config {
public static String ROCKET_MQ_SERVICE_ADD = "192.168.74.248:28081";
public static String GROUP = "1814209072292421634";
// public static String TOPIC = "1000101_testTopicName14";
//String group = "ljtConsumerGroup";
public static String TOPIC = "1000101_testTopicName15";
public static String ACCESS_KEY = "rocketmq2";
public static String SECRET_KEY = "12345678";
}

View File

@ -0,0 +1,97 @@
package com.example.demo.grpcProtocol;
import com.example.demo.DemoApplication;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
//public class ProducerExample {
// private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
//
// public static void main(String[] args) throws ClientException, IOException, InterruptedException {
// logger.info("running producer...");
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// String endpoint = Config.ROCKET_MQ_SERVICE_ADD;
// String topic = Config.TOPIC;
// ClientServiceProvider provider = ClientServiceProvider.loadService();
// ClientConfiguration configuration = new ClientConfigurationBuilder()
// .setEndpoints(endpoint)
// .setCredentialProvider(new StaticSessionCredentialsProvider(Config.ACCESS_KEY,Config.SECRET_KEY))
// .build();
// // 初始化Producer时需要设置通信配置以及预绑定的Topic
// Producer producer = provider.newProducerBuilder()
// .setTopics(topic)
// .setClientConfiguration(configuration)
// .build();
// try {
// Scanner scanner = new Scanner(System.in);
// System.out.println("请输出要发送的内容");
// while (scanner.hasNext()){
// Message message = provider.newMessageBuilder()
// .setTopic(topic)
// .setBody((simpleDateFormat.format(new Date())+":"+scanner.next()).getBytes())
// .build();
// SendReceipt sendReceipt = producer.send(message);
// logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
// }
// } catch (ClientException e) {
// logger.error("Failed to send message", e);
// }
// producer.close();
// logger.info("producer shutdown...");
// }
//}
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.io.IOException;
import java.util.Date;
import java.util.Scanner;
public class ProducerExample {
// private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws IOException, InterruptedException, IOException, ClientException {
// logger.info("running producer...");
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String endpoint = "ciotdmp.unimip.cn:28081";
String topic = "1000108_Test";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = new ClientConfigurationBuilder()
.setEndpoints(endpoint)
.setCredentialProvider(new StaticSessionCredentialsProvider("rmqadminstrator", "V#gVwMlh%uuLKsYrt7&gvnq1^"))
.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
try {
Scanner scanner = new Scanner(System.in);
System.out.println("请输出要发送的内容");
while (scanner.hasNext()) {
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setBody(("(simpleDateFormat.format(new Date()) + " + scanner.next()).getBytes())
.build();
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId={}" + sendReceipt);
// logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
}
} catch (ClientException e) {}
}
}

View File

@ -0,0 +1,105 @@
package com.example.demo.grpcProtocol;
import com.example.demo.DemoApplication;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
// public static void main(String[] args) throws ClientException, IOException, InterruptedException {
// logger.info("running consumer...");
// final ClientServiceProvider provider = ClientServiceProvider.loadService();
// // 接入点地址需要设置成Proxy的地址和端口列表一般是xxx:8081;xxx:8081
//// String endpoints = "192.168.74.248:28081";
// String endpoints = "192.168.6.90:28081";
//// String endpoints = System.getProperty("addr");
// ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
// .setEndpoints(endpoints)
// .build();
// ClientConfigurationBuilder clientConfigurationBuilder = new ClientConfigurationBuilder();
// clientConfiguration = clientConfigurationBuilder
// .setEndpoints(endpoints)
//// .enableSsl(false)
// .setCredentialProvider(new StaticSessionCredentialsProvider("1815638479991795714","a1d018cd992b19e59b9a1657194343a519b784e56ee29c8d1e398657b3fbcbed"))
//// .setCredentialProvider(new StaticSessionCredentialsProvider("rocketmq2","12345678"))
//// .setCredentialProvider(new StaticSessionCredentialsProvider(System.getProperty("ak"),System.getProperty("sk")))
// .build();
// // 订阅消息的过滤规则表示订阅所有Tag的消息
// String tag = "*";
// FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// // 为消费者指定所属的消费者分组Group需要提前创建
// String consumerGroup = "test2";
// // 指定需要订阅哪个目标TopicTopic需要提前创建
// String topic = DemoApplication.TOPIC;
// // 初始化PushConsumer需要绑定消费者分组ConsumerGroup通信参数以及订阅关系
// PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// .setClientConfiguration(clientConfiguration)
// // 设置消费者分组
// .setConsumerGroup(consumerGroup)
// // 设置预绑定的订阅关系
// .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// // 设置消费监听器
// .setMessageListener(messageView -> {
// ByteBuffer byteBuffer = messageView.getBody();
// String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
//
// // 处理消息并返回消费结果
// logger.info("Consume message successfully, messageId={},message body:{}", messageView.getMessageId(),content);
// return ConsumeResult.SUCCESS;
// })
// .build();
// logger.info("consumer initialized...");
// Thread.sleep(Long.MAX_VALUE);
// // 如果不需要再使用 PushConsumer可关闭该实例
// pushConsumer.close();
// logger.info("consumer shutdown...");
// }
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
//need to change
String rocketMQServiceAdd = Config.ROCKET_MQ_SERVICE_ADD;
String group = Config.GROUP;
String topic = Config.TOPIC;
String accessKey = Config.ACCESS_KEY;
String secretKey = Config.SECRET_KEY;
// String rocketMQServiceAdd = System.getProperty("addr");
// String group = System.getProperty("group");
// String topic = System.getProperty("topic");
// String accessKey = System.getProperty("ak");
// String secretKey = System.getProperty("sk");
ClientConfiguration clientConfiguration = new ClientConfigurationBuilder()
.setEndpoints(rocketMQServiceAdd)
.setCredentialProvider(new StaticSessionCredentialsProvider(accessKey,secretKey))
.build();
PushConsumer pushConsumer = ClientServiceProvider.loadService().newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(group)
.setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
.setMessageListener(messageView -> {
ByteBuffer byteBuffer = messageView.getBody();
String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
System.out.println(new Date()+":Consume message successfully, messageId="+messageView.getMessageId()+",message body:"+content);
return ConsumeResult.SUCCESS;
})
.build();
System.out.println(new Date()+"-qidong!!");
Thread.sleep(Long.MAX_VALUE);
pushConsumer.close();
}
}

View File

@ -0,0 +1,68 @@
package com.example.demo.kafka;
import com.example.demo.kafka.encrypt.AesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import javax.annotation.Resource;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Service
public class MyKafkaProducer {
private static String secret = "mA%@eaIF!I$0$NpVDDc&j*e@wdb1SpUU";
public static void main(String[] args) throws InterruptedException, NoSuchPaddingException, InvalidKeyException, NoSuchAlgorithmException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException, ExecutionException {
//need to change
String addr = "120.197.158.251:9092";
String topic = "device_alarm_add";
String username = "liantong";
String password = "Liantong@123";
// 1创建kafka生产者的配置对象
Properties properties = new Properties();
// 2给kafka配置对象添加配置信息bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, addr);
// key,value(必须)key.serializer, value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置安全协议为SASL_PLAINTEXT
properties.put("security.protocol", "SASL_PLAINTEXT");
// 设置SASL认证机制为PLAIN
properties.put("sasl.mechanism", "PLAIN");
// 设置认证信息格式为username:password
String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, username, password);
properties.put("sasl.jaas.config", jaasCfg);
// 3创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4调用send方法发送消息
kafkaProducer.send(new ProducerRecord<>(topic, AesUtil.encrypt(secret,"testMessage"))).get();
System.out.println("send succeed");
// 5关闭资源
kafkaProducer.close();
}
}

View File

@ -0,0 +1,77 @@
package com.example.demo.kafka;
import client.CiotClient;
import client.CiotRequestClient;
import model.HttpResult;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import utils.DigestUtil;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class MyUIAP {
public static void main(String[] args) {
// HttpResult device = createDevice();
// System.out.println(device.getBody());
String appId = "uET1mRLpQ6";
String appSecret = "rx71G2CDXH";
// String host = "http://localhost:8085";
String host = "http://100.74.68.167:8080";
//String host = "https://iot.unimip.cn/gw";//要作为配置项
String uri = "/openapi/uiap-open/collect/devices/create";
String productKey = "7ZvzoBNI2r";
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
// 创建 HttpPost 请求
HttpPost post = new HttpPost(host+uri);
// 添加请求头
post.setHeader("appKey", appSecret);
post.setHeader("appId", appId);
String nonce = UUID.randomUUID().toString();
post.setHeader("nonce", nonce);
String tm = String.valueOf(new Date().getTime());
post.setHeader("tm", tm);
post.setHeader("sign", buildSign(appId,nonce,tm,appSecret,uri));
// 设置请求体
StringEntity entity = new StringEntity("{\"productKey\":\"7ZvzoBNI2r\",\"deviceInfo\":{\"Name\":\"ljtTestDeviceName\"}}");
post.setEntity(entity);
// 发送请求
try (CloseableHttpResponse response = httpClient.execute(post)) {
// 获取响应码
int statusCode = response.getStatusLine().getStatusCode();
System.out.println("Response Code: " + statusCode);
// 获取响应实体
HttpEntity responseEntity = response.getEntity();
if (responseEntity != null) {
System.out.println("Response Content Length: " + responseEntity.getContentLength());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static String buildSign(String appId,String nonce,String tm,String appKey,String uri) throws NoSuchAlgorithmException {
String originTest = "appId="+appId+"&nonce="+nonce+"&tm="+tm+"&uri="+uri+"&appKey="+appKey;
// String originTest = "appId=test&nonce=1bf544a0-d129-484e-873f-16912f7d3b82&tm=1645497754337&uri=/openapi/uiapopen/devices/service/invoke&appKey=test";
String s = DigestUtil.md5(originTest).toUpperCase();
System.out.println(s);
return s;
}
}

View File

@ -0,0 +1,44 @@
package com.example.demo.kafka.encrypt;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
public class AesUtil {
private static final String ENCRYPT_ALGORITHMS = "AES";
private static final String ENCRYPT_PATTERN = "AES/CBC/PKCS5Padding";
private static final byte[] INITIAL_IV = {-5, -10, 105, -93, 3, 123, -76, -44, 116, 122, -54, -86, -47, 3, -22, -79};
public static String encrypt(String secret, String data) throws NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException, IllegalBlockSizeException {
byte[] iv = INITIAL_IV;
byte[] key = secret.getBytes(StandardCharsets.UTF_8);
SecretKeySpec secretKeySpec = new SecretKeySpec(key, ENCRYPT_ALGORITHMS);
IvParameterSpec ivParameterSpec = new IvParameterSpec(iv);
Cipher cipher = Cipher.getInstance(ENCRYPT_PATTERN);
cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, ivParameterSpec);
String encryptString = Util.getHexString(cipher.doFinal(data.getBytes(StandardCharsets.UTF_8)));
return encryptString;
}
public static String decrypt(String secret, String data) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException, IllegalBlockSizeException {
byte[] key = secret.getBytes(StandardCharsets.UTF_8);
byte[] iv = INITIAL_IV;
byte[] encryptedData = Util.hexStringToBytes(data);
Cipher cipher = Cipher.getInstance(ENCRYPT_PATTERN);
SecretKeySpec secretKeySpec = new SecretKeySpec(key, ENCRYPT_ALGORITHMS);
IvParameterSpec ivParameterSpec = new IvParameterSpec(iv);
cipher.init(Cipher.DECRYPT_MODE, secretKeySpec, ivParameterSpec);
return new String(cipher.doFinal(encryptedData));
}
}

View File

@ -0,0 +1,26 @@
package com.example.demo.kafka.encrypt;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
public class Main {
private static String secret = "wIFLxxn!@gRv2KnKohYsymDTesJWsQ5x";
public static void main(String[] args) throws InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, NoSuchAlgorithmException, BadPaddingException, InvalidKeyException {
String data = "xiaofang_test";
String encryptData = AesUtil.encrypt(secret, data);
System.out.println("加密数据:" + encryptData);
String decryptData = AesUtil.decrypt(secret, encryptData);
System.out.println("解密数据:" + decryptData);
}
}

View File

@ -0,0 +1,34 @@
package com.example.demo.kafka.encrypt;
public class Util {
public static String getHexString(byte[] bytes) {
String ret = "";
for (int i = 0; i < bytes.length; i++) {
ret += Integer.toString((bytes[i] & 0xff) + 0x100, 16).substring(1);
}
return ret.toUpperCase();
}
public static byte[] hexStringToBytes(String hexString)
{
if (hexString == null || hexString.equals(""))
{
return null;
}
hexString = hexString.toUpperCase();
int length = hexString.length() / 2;
char[] hexChars = hexString.toCharArray();
byte[] d = new byte[length];
for (int i = 0; i < length; i++)
{
int pos = i * 2;
d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return d;
}
private static byte charToByte(char c)
{
return (byte) "0123456789ABCDEF".indexOf(c);
}
}

View File

@ -0,0 +1,29 @@
package com.example.demo.mongo;
import cn.org.wangchangjiu.sqltomongo.core.Converter;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoDriverInformation;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.internal.MongoClientImpl;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PostMapping;
import javax.annotation.PostConstruct;
public class mongoTest {
public static void main(String[] args) {
MongoClient mongoClient = MongoClients.create("mongodb://admin:123@192.168.9.137:27017/admin");
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "test_convert_db");
String sql = "select * from table_a";
Document command = (Document) Converter.executeForBson(sql);
System.out.println(Converter.execute(sql));
System.out.println(command);
System.out.println("result:"+ mongoTemplate.executeCommand(command));
}
}

View File

@ -0,0 +1,42 @@
package com.example.demo.remotingProtocol;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerRemoting {
public static void main(String[] args) throws Exception {
// 创建消费者实例并设置消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("123","ljtConsumerGroup",
new AclClientRPCHook(new SessionCredentials(System.getProperty("ak"), System.getProperty("sk"))));
// 设置 Name Server 地址此处为示例实际使用时请替换为真实的 Name Server 地址
consumer.setNamesrvAddr("192.168.6.93:9876");
// consumer.setNamesrvAddr(System.getProperty("addr"));
// 订阅指定的主题和标签* 表示所有标签
consumer.subscribe("TestTopic_ljt", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}

View File

@ -0,0 +1,36 @@
package com.example.demo.remotingProtocol;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProdurcerRemoting {
public static void main(String[] args) throws Exception {
// 创建生产者实例并设置生产者组名
DefaultMQProducer producer = new DefaultMQProducer("ljtConsumerGroup",
new AclClientRPCHook(new SessionCredentials(System.getProperty("ak"), System.getProperty("sk"))));
// 设置 Name Server 地址此处为示例实际使用时请替换为真实的 Name Server 地址
producer.setNamesrvAddr("192.168.6.93:9876");
// producer.setNamesrvAddr(System.getProperty("addr"));
producer.start();
try {
// 创建消息实例指定 topicTag和消息体
Message msg = new Message("TestTopic_ljt", "TagA", ("Hello RocketMQ").getBytes());
// 发送消息并获取发送结果
SendResult sendResult = producer.send(msg);
System.out.println("Message sent: " + new String(msg.getBody()));
System.out.println("Send result: " + sendResult);
} catch (Exception e) {
e.printStackTrace();
System.out.println("Message sending failed.");
} finally {
// 关闭生产者
producer.shutdown();
}
}
}

View File

@ -0,0 +1,122 @@
package com.example.demo.rocketMq;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
//import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
//import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
public class MyClusterListSubCommand implements SubCommand {
public static CopyOnWriteArrayList<String> CLUSTER_NAMES = new CopyOnWriteArrayList<>();
public static CopyOnWriteArrayList<String> BROKER_ADDRESSES = new CopyOnWriteArrayList<>();
public static final String COMMAND_NAME = "myClusterList";
DefaultMQAdminExt defaultMQAdminExt;
MyClusterListSubCommand(DefaultMQAdminExt defaultMQAdminExt){
this.defaultMQAdminExt = defaultMQAdminExt;
}
@Override
public String commandName() {
return COMMAND_NAME;
}
@Override
public String commandDesc() {
return "List cluster infos";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("m", "moreStats", false, "Print more stats");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("i", "interval", true, "specify intervals numbers, it is in seconds");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "which cluster");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
long printInterval = 1;
boolean enableInterval = commandLine.hasOption('i');
if (enableInterval) {
printInterval = Long.parseLong(commandLine.getOptionValue('i')) * 1000;
}
String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : "";
// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
// defaultMQAdminExt.start();
long i = 0;
do {
if (i++ > 0) {
Thread.sleep(printInterval);
}
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
CLUSTER_NAMES = new CopyOnWriteArrayList<>(getTargetClusterNames(clusterName, clusterInfo));
BROKER_ADDRESSES = new CopyOnWriteArrayList<>(getBrokerAddresses(clusterInfo));
} while (enableInterval);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
// defaultMQAdminExt.shutdown();
}
}
private Set<String> getBrokerAddresses(ClusterInfo clusterInfo) {
HashSet<String> results = new HashSet<>();
Map<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
if(brokerAddrTable!=null){
for (Map.Entry<String, BrokerData> entry : brokerAddrTable.entrySet()) {
BrokerData value = entry.getValue();
if (value!=null) {
HashMap<Long, String> brokerAddrs = value.getBrokerAddrs();
if (brokerAddrs!=null) {
for (Map.Entry<Long, String> stringEntry : brokerAddrs.entrySet()) {
results.add(stringEntry.getValue());
}
}
}
}
}
return results;
}
private Set<String> getTargetClusterNames(String clusterName, ClusterInfo clusterInfo) {
if (StringUtils.isEmpty(clusterName)) {
return clusterInfo.getClusterAddrTable().keySet();
} else {
Set<String> clusterNames = new TreeSet<String>();
clusterNames.add(clusterName);
return clusterNames;
}
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.demo.rocketMq;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.Set;
public class MyDeleteAccessConfigSubCommand implements SubCommand {
public static final String COMMAND_NAME = "myDeleteAclConfig";
DefaultMQAdminExt defaultMQAdminExt;
MyDeleteAccessConfigSubCommand(DefaultMQAdminExt defaultMQAdminExt){
this.defaultMQAdminExt = defaultMQAdminExt;
}
@Override
public String commandName() {
return COMMAND_NAME;
}
@Override
public String commandAlias() {
return "deleteAccessConfig";
}
@Override
public String commandDesc() {
return "Delete Acl Config Account in broker";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "delete acl config account from which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "delete acl config account from which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("a", "accessKey", true, "set accessKey in acl config file for deleting which account");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String accessKey = commandLine.getOptionValue('a').trim();
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
// defaultMQAdminExt.start();
defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
System.out.printf("delete plain access config account from %s success.%n", addr);
System.out.printf("account's accesskey is:%s", accessKey);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
// defaultMQAdminExt.start();
Set<String> brokerAddrSet =
CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : brokerAddrSet) {
defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
System.out.printf("delete plain access config account from %s success.%n", addr);
}
System.out.printf("account's accesskey is:%s", accessKey);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
// defaultMQAdminExt.shutdown();
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.demo.rocketMq;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.Set;
public class MyDeleteSubscriptionGroupCommand implements SubCommand {
public static final String COMMAND_NAME = "myDeleteSubGroup";
DefaultMQAdminExt defaultMQAdminExt;
MyDeleteSubscriptionGroupCommand(DefaultMQAdminExt defaultMQAdminExt){
this.defaultMQAdminExt = defaultMQAdminExt;
}
@Override
public String commandName() {
return COMMAND_NAME;
}
@Override
public String commandDesc() {
return "Delete subscription group from broker.";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "delete subscription group from which broker");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "delete subscription group from which cluster");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("g", "groupName", true, "subscription group name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("r", "removeOffset", true, "remove offset");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
// groupName
String groupName = commandLine.getOptionValue('g').trim();
boolean cleanOffset = false;
if (commandLine.hasOption('r')) {
try {
cleanOffset = Boolean.valueOf(commandLine.getOptionValue('r').trim());
} catch (Exception e) {
}
}
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
// adminExt.start();
defaultMQAdminExt.deleteSubscriptionGroup(addr, groupName, cleanOffset);
System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName,
addr);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
// defaultMQAdminExt.start();
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String master : masterSet) {
defaultMQAdminExt.deleteSubscriptionGroup(master, groupName, cleanOffset);
System.out.printf(
"delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
groupName, master, clusterName);
}
try {
defaultMQAdminExt.deleteTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + groupName, clusterName);
defaultMQAdminExt.deleteTopic(MixAll.DLQ_GROUP_TOPIC_PREFIX + groupName, clusterName);
} catch (Exception e) {
e.printStackTrace();
}
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
// defaultMQAdminExt.shutdown();
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.demo.rocketMq;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
public class MyDeleteTopicSubCommand implements SubCommand {
public static final String COMMAND_NAME = "myDeleteTopic";
DefaultMQAdminExt defaultMQAdminExt;
MyDeleteTopicSubCommand(DefaultMQAdminExt defaultMQAdminExt){
this.defaultMQAdminExt = defaultMQAdminExt;
}
public static void deleteTopic(final DefaultMQAdminExt adminExt,
final String clusterName,
final String topic
) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
Set<String> masterBrokerAddressSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
adminExt.deleteTopicInBroker(masterBrokerAddressSet, topic);
System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
Set<String> nameServerSet = null;
if (adminExt.getNamesrvAddr() != null) {
String[] ns = adminExt.getNamesrvAddr().trim().split(";");
nameServerSet = new HashSet(Arrays.asList(ns));
}
adminExt.deleteTopicInNameServer(nameServerSet, clusterName, topic);
System.out.printf("delete topic [%s] from NameServer success.%n", topic);
}
@Override
public String commandName() {
return COMMAND_NAME;
}
@Override
public String commandDesc() {
return "Delete topic from broker and NameServer.";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "delete topic from which cluster");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
// DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
// adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String topic = commandLine.getOptionValue('t').trim();
if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
// adminExt.start();
deleteTopic(defaultMQAdminExt, clusterName, topic);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
// adminExt.shutdown();
}
}
}

View File

@ -0,0 +1,331 @@
//package com.example.demo.rocketMq;/*
// * Licensed to the Apache Software Foundation (ASF) under one or more
// * contributor license agreements. See the NOTICE file distributed with
// * this work for additional information regarding copyright ownership.
// * The ASF licenses this file to You under the Apache License, Version 2.0
// * (the "License"); you may not use this file except in compliance with
// * the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//import ch.qos.logback.classic.LoggerContext;
//import ch.qos.logback.classic.joran.JoranConfigurator;
//import org.apache.commons.cli.CommandLine;
//import org.apache.commons.cli.Options;
//import org.apache.commons.cli.PosixParser;
//import org.apache.rocketmq.acl.common.AclClientRPCHook;
//import org.apache.rocketmq.acl.common.AclUtils;
//import org.apache.rocketmq.acl.common.SessionCredentials;
//import org.apache.rocketmq.client.exception.MQClientException;
//import org.apache.rocketmq.common.MQVersion;
//import org.apache.rocketmq.common.MixAll;
//import org.apache.rocketmq.remoting.RPCHook;
//import org.apache.rocketmq.remoting.protocol.RemotingCommand;
//import org.apache.rocketmq.srvutil.ServerUtil;
//import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
//import org.apache.rocketmq.tools.command.SubCommand;
//import org.apache.rocketmq.tools.command.SubCommandException;
//import org.apache.rocketmq.tools.command.acl.*;
//import org.apache.rocketmq.tools.command.broker.*;
//import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
//import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand;
//import org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand;
//import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand;
//import org.apache.rocketmq.tools.command.consumer.*;
//import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
//import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
//import org.apache.rocketmq.tools.command.controller.*;
//import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
//import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
//import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
//import org.apache.rocketmq.tools.command.ha.GetSyncStateSetSubCommand;
//import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
//import org.apache.rocketmq.tools.command.message.*;
//import org.apache.rocketmq.tools.command.namesrv.*;
//import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
//import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
//import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
//import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
//import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
//import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
//import org.apache.rocketmq.tools.command.topic.*;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//import java.nio.file.Files;
//import java.nio.file.Paths;
//import java.util.HashSet;
//import java.util.Set;
//
//@Component
//public class MyMQAdminStartup {
// protected static final Set<SubCommand> SUB_COMMANDS = new HashSet<>();
//
// private static final String ROCKETMQ_HOME = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
// System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//
//// @Value("${rocketmq.name-server}")
// public String namesrvAddr = System.getProperty("nameServer");
//// @Value("${rocketmq.producer.access-key:rocketmq2}")
// private String accessKey = System.getProperty("ak");
//// @Value("${rocketmq.producer.secret-key:12345678}")
// private String secretKey = System.getProperty("sk");
//
// RPCHook rpcHook;
//
// DefaultMQAdminExt defaultMQAdminExt;
//
//
// public void runACommand(String[] args) {
// try {
// main0(args, rpcHook);
// } catch (SubCommandException e) {
// throw new RuntimeException("mqAdmin command execute failed");
// }
// }
//
// public void main0(String[] args, RPCHook rpcHook) throws SubCommandException {
// try {
// switch (args.length) {
// case 0:
// printHelp();
// break;
// case 2:
// if (args[0].equals("help")) {
// SubCommand cmd = findSubCommand(args[1]);
// if (cmd != null) {
// Options options = ServerUtil.buildCommandlineOptions(new Options());
// options = cmd.buildCommandlineOptions(options);
// if (options != null) {
// ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
// }
// } else {
// System.out.printf("The sub command %s not exist.%n", args[1]);
// }
// break;
// }
// case 1:
// default:
// SubCommand cmd = findSubCommand(args[0]);
// if (cmd != null) {
// String[] subargs = parseSubArgs(args);
//
// Options options = ServerUtil.buildCommandlineOptions(new Options());
// final CommandLine commandLine =
// ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
// new PosixParser());
// if (null == commandLine) {
// return;
// }
//
// if (commandLine.hasOption('n')) {
// String namesrvAddr = commandLine.getOptionValue('n');
// System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
// }
// if (rpcHook != null) {
// cmd.execute(commandLine, options, rpcHook);
// } else {
// cmd.execute(commandLine, options, AclUtils.getAclRPCHook(ROCKETMQ_HOME + MixAll.ACL_CONF_TOOLS_FILE));
// }
// } else {
// System.out.printf("The sub command %s not exist.%n", args[0]);
// }
// break;
// }
// } catch (Exception e) {
// e.printStackTrace();
// throw e;
// }
// }
//
// @PostConstruct
// public void init() {
// System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// initRPCHook();
// initMQAdminExt();
// initCommand();
//// try {
//// initLogback();
//// } catch (Exception e) {
//// e.printStackTrace();
//// }
// }
//
// private void initMQAdminExt() {
// System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
// defaultMQAdminExt = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
// try {
// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
// defaultMQAdminExt.start();
// } catch (MQClientException e) {
// e.printStackTrace();
// }
// }
//
// private void initRPCHook() {
// if(accessKey!=null && accessKey!="" && secretKey!=null && secretKey!=""){
// rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
// }else{
// rpcHook = AclUtils.getAclRPCHook(ROCKETMQ_HOME + MixAll.ACL_CONF_TOOLS_FILE);
// }
// }
//
// public void initCommand() {
// //myCommands
// initCommand(new MyClusterListSubCommand(defaultMQAdminExt));
// initCommand(new MyUpdateAccessConfigSubCommand(defaultMQAdminExt));
// initCommand(new MyDeleteAccessConfigSubCommand(defaultMQAdminExt));
// initCommand(new MyDeleteSubscriptionGroupCommand(defaultMQAdminExt));
// initCommand(new MyUpdateTopicSubCommand(defaultMQAdminExt));
// initCommand(new MyDeleteTopicSubCommand(defaultMQAdminExt));
//
// initCommand(new UpdateTopicSubCommand());
// initCommand(new DeleteTopicSubCommand());
// initCommand(new UpdateSubGroupSubCommand());
// initCommand(new SetConsumeModeSubCommand());
// initCommand(new DeleteSubscriptionGroupCommand());
// initCommand(new UpdateBrokerConfigSubCommand());
// initCommand(new UpdateTopicPermSubCommand());
//
// initCommand(new TopicRouteSubCommand());
// initCommand(new TopicStatusSubCommand());
// initCommand(new TopicClusterSubCommand());
//
// initCommand(new AddBrokerSubCommand());
// initCommand(new RemoveBrokerSubCommand());
// initCommand(new ResetMasterFlushOffsetSubCommand());
// initCommand(new BrokerStatusSubCommand());
// initCommand(new QueryMsgByIdSubCommand());
// initCommand(new QueryMsgByKeySubCommand());
// initCommand(new QueryMsgByUniqueKeySubCommand());
// initCommand(new QueryMsgByOffsetSubCommand());
// initCommand(new QueryMsgTraceByIdSubCommand());
//
// initCommand(new PrintMessageSubCommand());
// initCommand(new PrintMessageByQueueCommand());
// initCommand(new SendMsgStatusCommand());
// initCommand(new BrokerConsumeStatsSubCommad());
//
// initCommand(new ProducerConnectionSubCommand());
// initCommand(new ConsumerConnectionSubCommand());
// initCommand(new ConsumerProgressSubCommand());
// initCommand(new ConsumerStatusSubCommand());
// initCommand(new CloneGroupOffsetCommand());
// //for producer
// initCommand(new ProducerSubCommand());
//
// initCommand(new ClusterListSubCommand());
// initCommand(new TopicListSubCommand());
//
// initCommand(new UpdateKvConfigCommand());
// initCommand(new DeleteKvConfigCommand());
//
// initCommand(new WipeWritePermSubCommand());
// initCommand(new AddWritePermSubCommand());
// initCommand(new ResetOffsetByTimeCommand());
// initCommand(new SkipAccumulationSubCommand());
//
// initCommand(new UpdateOrderConfCommand());
// initCommand(new CleanExpiredCQSubCommand());
// initCommand(new DeleteExpiredCommitLogSubCommand());
// initCommand(new CleanUnusedTopicCommand());
//
// initCommand(new StartMonitoringSubCommand());
// initCommand(new StatsAllSubCommand());
//
// initCommand(new AllocateMQSubCommand());
//
// initCommand(new CheckMsgSendRTCommand());
// initCommand(new CLusterSendMsgRTCommand());
//
// initCommand(new GetNamesrvConfigCommand());
// initCommand(new UpdateNamesrvConfigCommand());
// initCommand(new GetBrokerConfigCommand());
// initCommand(new GetConsumerConfigSubCommand());
//
// initCommand(new QueryConsumeQueueCommand());
// initCommand(new SendMessageCommand());
// initCommand(new ConsumeMessageCommand());
//
// //for acl command
// initCommand(new UpdateAccessConfigSubCommand());
// initCommand(new DeleteAccessConfigSubCommand());
// initCommand(new ClusterAclConfigVersionListSubCommand());
// initCommand(new UpdateGlobalWhiteAddrSubCommand());
// initCommand(new GetAccessConfigSubCommand());
//
// initCommand(new UpdateStaticTopicSubCommand());
// initCommand(new RemappingStaticTopicSubCommand());
//
// initCommand(new ExportMetadataCommand());
// initCommand(new ExportConfigsCommand());
// initCommand(new ExportMetricsCommand());
//
// initCommand(new HAStatusSubCommand());
//
// initCommand(new GetSyncStateSetSubCommand());
// initCommand(new GetBrokerEpochSubCommand());
// initCommand(new GetControllerMetaDataSubCommand());
//
// initCommand(new GetControllerConfigSubCommand());
// initCommand(new UpdateControllerConfigSubCommand());
// initCommand(new ReElectMasterSubCommand());
// initCommand(new CleanControllerBrokerDataSubCommand());
// }
//
// private static void initLogback() throws Exception {
// LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
// JoranConfigurator configurator = new JoranConfigurator();
// configurator.setContext(lc);
// lc.reset();
//
// //avoid the exception
// if (ROCKETMQ_HOME != null
// && Files.exists(Paths.get(ROCKETMQ_HOME + "/conf/logback_tools.xml"))) {
// configurator.doConfigure(ROCKETMQ_HOME + "/conf/logback_tools.xml");
// }
// }
//
// private static void printHelp() {
// System.out.printf("The most commonly used mqadmin commands are:%n");
//
// for (SubCommand cmd : SUB_COMMANDS) {
// System.out.printf(" %-25s %s%n", cmd.commandName(), cmd.commandDesc());
// }
//
// System.out.printf("%nSee 'mqadmin help <command>' for more information on a specific command.%n");
// }
//
// private static SubCommand findSubCommand(final String name) {
// for (SubCommand cmd : SUB_COMMANDS) {
// if (cmd.commandName().equalsIgnoreCase(name) || cmd.commandAlias() != null && cmd.commandAlias().equalsIgnoreCase(name)) {
// return cmd;
// }
// }
//
// return null;
// }
//
// private static String[] parseSubArgs(String[] args) {
// if (args.length > 1) {
// String[] result = new String[args.length - 1];
// for (int i = 0; i < args.length - 1; i++) {
// result[i] = args[i + 1];
// }
// return result;
// }
// return null;
// }
//
// public static void initCommand(SubCommand command) {
// SUB_COMMANDS.add(command);
// }
//}

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.demo.rocketMq;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class MyUpdateAccessConfigSubCommand implements SubCommand {
public static final String COMMAND_NAME = "myUpdateAclConfig";
DefaultMQAdminExt defaultMQAdminExt;
MyUpdateAccessConfigSubCommand(DefaultMQAdminExt defaultMQAdminExt){
this.defaultMQAdminExt = defaultMQAdminExt;
}
@Override
public String commandName() {
return COMMAND_NAME;
}
@Override
public String commandDesc() {
return "Update acl config yaml file in broker";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "update acl config file to which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "update acl config file to which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("a", "accessKey", true, "set accessKey in acl config file");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("s", "secretKey", true, "set secretKey in acl config file");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("i", "defaultTopicPerm", true, "set default topicPerm in acl config file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("u", "defaultGroupPerm", true, "set default GroupPerm in acl config file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topicPerms", true, "set topicPerms list,eg: topicA=DENY,topicD=SUB");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("g", "groupPerms", true, "set groupPerms list,eg: groupD=DENY,groupD=SUB");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "admin", true, "set admin flag in acl config file");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
PlainAccessConfig accessConfig = new PlainAccessConfig();
accessConfig.setAccessKey(commandLine.getOptionValue('a').trim());
// Secretkey
if (commandLine.hasOption('s')) {
accessConfig.setSecretKey(commandLine.getOptionValue('s').trim());
}
// Admin
if (commandLine.hasOption('m')) {
accessConfig.setAdmin(Boolean.parseBoolean(commandLine.getOptionValue('m').trim()));
}
// DefaultTopicPerm
if (commandLine.hasOption('i')) {
accessConfig.setDefaultTopicPerm(commandLine.getOptionValue('i').trim());
}
// DefaultGroupPerm
if (commandLine.hasOption('u')) {
accessConfig.setDefaultGroupPerm(commandLine.getOptionValue('u').trim());
}
// WhiteRemoteAddress
if (commandLine.hasOption('w')) {
accessConfig.setWhiteRemoteAddress(commandLine.getOptionValue('w').trim());
}
// TopicPerms list value
if (commandLine.hasOption('t')) {
String[] topicPerms = commandLine.getOptionValue('t').trim().split(",");
List<String> topicPermList = new ArrayList<String>();
if (topicPerms != null) {
for (String topicPerm : topicPerms) {
topicPermList.add(topicPerm);
}
}
accessConfig.setTopicPerms(topicPermList);
}
// GroupPerms list value
if (commandLine.hasOption('g')) {
String[] groupPerms = commandLine.getOptionValue('g').trim().split(",");
List<String> groupPermList = new ArrayList<String>();
if (groupPerms != null) {
for (String groupPerm : groupPerms) {
groupPermList.add(groupPerm);
}
}
accessConfig.setGroupPerms(groupPermList);
}
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
// defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
System.out.printf("create or update plain access config to %s success.%n", addr);
System.out.printf("%s", accessConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
// defaultMQAdminExt.start();
Set<String> brokerAddrSet =
CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : brokerAddrSet) {
defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
System.out.printf("create or update plain access config to %s success.%n", addr);
}
System.out.printf("%s", accessConfig);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
// defaultMQAdminExt.shutdown();
}
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.demo.rocketMq;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.Map;
import java.util.Set;
public class MyUpdateTopicSubCommand implements SubCommand {
public static final String COMMAND_NAME = "myUpdateTopic";
DefaultMQAdminExt defaultMQAdminExt;
MyUpdateTopicSubCommand(DefaultMQAdminExt defaultMQAdminExt){
this.defaultMQAdminExt = defaultMQAdminExt;
}
@Override
public String commandName() {
return COMMAND_NAME;
}
@Override
public String commandDesc() {
return "Update or create topic";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "create topic to which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("a", "attributes", true, "attribute(+a=b,+c=d,-e)");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("r", "readQueueNums", true, "set read queue nums");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("w", "writeQueueNums", true, "set write queue nums");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("o", "order", true, "set topic's order(true|false)");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("u", "unit", true, "is unit topic (true|false)");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
if (commandLine.hasOption('a')) {
String attributesModification = commandLine.getOptionValue('a').trim();
Map<String, String> attributes = AttributeParser.parseToMap(attributesModification);
topicConfig.setAttributes(attributes);
}
// readQueueNums
if (commandLine.hasOption('r')) {
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
}
// writeQueueNums
if (commandLine.hasOption('w')) {
topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
}
// perm
if (commandLine.hasOption('p')) {
topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
}
boolean isUnit = false;
if (commandLine.hasOption('u')) {
isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
}
boolean isCenterSync = false;
if (commandLine.hasOption('s')) {
isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
}
int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
topicConfig.setTopicSysFlag(topicCenterSync);
boolean isOrder = false;
if (commandLine.hasOption('o')) {
isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
}
topicConfig.setOrder(isOrder);
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
// defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
if (isOrder) {
String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
isOrder, orderConf.toString()));
}
System.out.printf("create topic to %s success.%n", addr);
System.out.printf("%s", topicConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
// defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
}
if (isOrder) {
Set<String> brokerNameSet =
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
StringBuilder orderConf = new StringBuilder();
String splitor = "";
for (String s : brokerNameSet) {
orderConf.append(splitor).append(s).append(":")
.append(topicConfig.getWriteQueueNums());
splitor = ";";
}
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
orderConf.toString(), true);
System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
}
System.out.printf("%s", topicConfig);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
// defaultMQAdminExt.shutdown();
}
}
}

View File

@ -0,0 +1,8 @@
spring.application.name=demo
spring.thymeleaf.prefix=classpath:/templates/
spring.thymeleaf.suffix=.html
spring.data.mongodb.uri=mongodb://admin:123@192.168.9.137:27017/admin

View File

@ -0,0 +1,34 @@
rocketmq:
name-server: 192.168.74.248:28081
producer:
group: testGroup_dev
send-message-timeout: 300000
access-key: rocketmq2
secret-key: 12345678
consumer:
access-key: rocketmq2
secret-key: 12345678
spring:
application:
name: yuan-kafka-boot
# 连接kafka集群
kafka:
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
controlFlag: required
options: username="kafkaAdmin" password="admin"
enabled: true
# 多个主机用逗号隔开
bootstrap-servers: localhost:9092
# 生产者
producer:
# key与value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
security:
protocol: SASL_PLAINTEXT

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
<property name="LOG_HOME" value="/home" />
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期%thread表示线程名%-5level级别从左显示5个字符宽度%msg日志消息%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- 按照每天生成日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--日志文件输出的文件名-->
<FileNamePattern>${LOG_HOME}/TestWeb.log.%d{yyyy-MM-dd}.log</FileNamePattern>
<!--日志文件保留天数-->
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期%thread表示线程名%-5level级别从左显示5个字符宽度%msg日志消息%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
<!--日志文件最大的大小-->
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender>
<!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,68 @@
<!-- src/main/resources/templates/message.html -->
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>Consume Messages</title>
<style>
.message-list {
max-height: 300px;
overflow-y: auto;
border: 1px solid #ccc;
padding: 10px;
}
</style>
</head>
<body>
<h2>Consume Messages</h2>
<form id="messageForm">
<label for="param1">Parameter 1:</label>
<input type="text" id="param1" name="param1" required>
<br>
<label for="param2">Parameter 2:</label>
<input type="text" id="param2" name="param2" required>
<br>
<label for="param3">Parameter 3:</label>
<input type="text" id="param3" name="param3" required>
<br>
<label for="param4">Parameter 4:</label>
<input type="text" id="param4" name="param4" required>
<br><br>
<button type="button" onclick="consumeMessages()">Consume Messages</button>
</form>
<div class="message-list">
<h3>Messages:</h3>
<ul id="messageList">
<!-- Messages will be dynamically added here -->
</ul>
</div>
<script th:inline="javascript">
function consumeMessages() {
var param1 = document.getElementById('param1').value;
var param2 = document.getElementById('param2').value;
var param3 = document.getElementById('param3').value;
var param4 = document.getElementById('param4').value;
var url = '/consumeMessage?param1=' + encodeURIComponent(param1) +
'&param2=' + encodeURIComponent(param2) +
'&param3=' + encodeURIComponent(param3) +
'&param4=' + encodeURIComponent(param4);
fetch(url)
.then(response => response.json())
.then(messages => {
var messageList = document.getElementById('messageList');
messageList.innerHTML = '';
messages.forEach(message => {
var li = document.createElement('li');
li.textContent = message;
messageList.appendChild(li);
});
})
.catch(error => console.error('Error fetching messages:', error));
}
</script>
</body>
</html>

View File

@ -0,0 +1,112 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
"http://www.w3.org/TR/html4/loose.dtd">
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>Consume Messages</title>
<style>
.message-list {
max-height: 800px;
overflow-y: auto;
border: 1px solid #ccc;
padding: 10px;
max-width: 800px;
word-wrap: break-word;
}
</style>
</head>
<body>
<h2>Consume Messages</h2>
<form id="messageForm">
<label for="param1">addr:</label>
<input type="text" id="param1" name="addr" required>
<br>
<label for="param2">group:</label>
<input type="text" id="param2" name="group" required>
<br>
<label for="param3">topic:</label>
<input type="text" id="param3" name="topic" required>
<br>
<label for="param4">accessKey:</label>
<input type="text" id="param4" name="ak" required>
<br>
<label for="param4">secretKey:</label>
<input type="text" id="param5" name="sk" required>
<br><br>
<div id="errorContainer" style="color: red; margin-bottom: 10px;"></div>
<button id="consumeButton" type="button" onclick="consumeMessages()">Consume Messages</button>
</form>
<div class="message-list">
<h3>Messages:</h3>
<ul id="messageList">
<!-- Messages will be dynamically added here -->
</ul>
</div>
<script th:inline="javascript">
function consumeMessages() {
var param1 = document.getElementById('param1').value.trim();
var param2 = document.getElementById('param2').value.trim();
var param3 = document.getElementById('param3').value.trim();
var param4 = document.getElementById('param4').value.trim();
var param5 = document.getElementById('param5').value.trim();
// 获取按钮并设置为不可点击
var button = document.getElementById('consumeButton');
button.disabled = true;
button.textContent = 'Loading...';
// 清空所有可能存在的错误提示
var errorContainer = document.getElementById('errorContainer');
errorContainer.textContent = "";
// 验证参数是否为空
if (param1 === "" || param2 === "" || param3 === "" || param4 === "" || param5 === "") {
errorContainer.textContent = "参数不能为空";
// 恢复按钮状态
button.disabled = false;
button.textContent = 'Consume Messages';
return;
}
var url = '/consumeMessage?addr=' + encodeURIComponent(param1) +
'&group=' + encodeURIComponent(param2) +
'&topic=' + encodeURIComponent(param3) +
'&ak=' + encodeURIComponent(param4) +
'&sk=' + encodeURIComponent(param5);
fetch(url)
.then(response => response.json())
.then(messages => {
var messageList = document.getElementById('messageList');
messageList.innerHTML = '';
if (messages && messages.length > 0) {
messages.forEach(message => {
var li = document.createElement('li');
li.textContent = message;
messageList.appendChild(li);
});
} else {
var li = document.createElement('li');
li.textContent = "No messages available.";
messageList.appendChild(li);
}
// 恢复按钮状态
button.disabled = false;
button.textContent = 'Consume Messages';
})
.catch(error => {
console.error('Error fetching messages:', error);
// 恢复按钮状态
button.disabled = false;
button.textContent = 'Consume Messages';
});
}
</script>
</body>
</html>

View File

@ -0,0 +1,13 @@
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemoApplicationTests {
@Test
void contextLoads() {
}
}