commit 96030a75c1d9cdf0cd0c59cbc8092d393f760e3c
Author: ljt <1830477687@qq.com>
Date: Fri Nov 15 10:14:00 2024 +0800
init
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..549e00a
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 0000000..8f96f52
--- /dev/null
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+wrapperVersion=3.3.2
+distributionType=only-script
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.7/apache-maven-3.9.7-bin.zip
diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF
new file mode 100644
index 0000000..1a90ab7
--- /dev/null
+++ b/META-INF/MANIFEST.MF
@@ -0,0 +1,3 @@
+Manifest-Version: 1.0
+Main-Class: com.example.demo.DemoApplication
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c984b4e
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,139 @@
+
+
+ 4.0.0
+ pom
+
+ demoChild
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.7.7
+
+
+ com.example
+ demo
+ 0.0.1-SNAPSHOT
+ demo
+ Demo project for Spring Boot
+
+ 8
+
+
+
+ cn.org.wangchangjiu
+ sqltomongo-converter
+ 1.0.2-RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb
+
+
+
+
+ com.ciot.gd
+ ciot-client
+ 1.1.1
+
+
+ org.bouncycastle
+ bcprov-jdk15on
+ 1.69
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.14
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ 2.7.9
+ pom
+ import
+
+
+
+
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.2.0
+
+
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ 2.2.3
+
+
+ org.apache.tomcat
+ annotations-api
+
+
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ 5.0.7
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ 5.0.0
+
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ 5.0.0
+
+
+ org.apache.rocketmq
+ rocketmq-proto
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-thymeleaf
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
diff --git a/src/META-INF/MANIFEST.MF b/src/META-INF/MANIFEST.MF
new file mode 100644
index 0000000..1a90ab7
--- /dev/null
+++ b/src/META-INF/MANIFEST.MF
@@ -0,0 +1,3 @@
+Manifest-Version: 1.0
+Main-Class: com.example.demo.DemoApplication
+
diff --git a/src/main/java/com/example/demo/DemoApplication.java b/src/main/java/com/example/demo/DemoApplication.java
new file mode 100644
index 0000000..42a7a49
--- /dev/null
+++ b/src/main/java/com/example/demo/DemoApplication.java
@@ -0,0 +1,108 @@
+package com.example.demo;
+
+import com.example.demo.dto.AccountPermissionDto;
+import com.example.demo.grpcProtocol.ProducerExample;
+import com.example.demo.grpcProtocol.PushConsumerExample;
+import com.example.demo.remotingProtocol.ConsumerRemoting;
+//import com.example.demo.controller.ProducerExample;
+import com.example.demo.remotingProtocol.ProdurcerRemoting;
+//import com.example.demo.controller.PushConsumerExample;
+//import org.apache.rocketmq.client.apis.ClientException;
+import com.example.demo.rocketMq.MyClusterListSubCommand;
+import com.example.demo.rocketMq.MyUpdateAccessConfigSubCommand;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+
+@SpringBootApplication
+public class DemoApplication {
+ public static final String TOPIC = "1000101_test2";
+ public static final String GROUP = "ljtConsumerGroup";
+
+ public static void main(String[] args) throws IOException, InterruptedException, ClientException {
+// PushConsumerExample.main(null);
+ SpringApplication.run(DemoApplication.class, args);
+// String opt = System.getProperty("opt");
+// if (opt.equals("0")) {
+// testMessageSendAndReceive();
+// }else {
+// testACL();
+// }
+ }
+
+ private static void testMessageSendAndReceive() throws InterruptedException {
+ System.out.println("开始测试消息收发...");
+ String protocol = System.getProperty("protocol");
+ if (Objects.equals(protocol, "grpc")) {
+ CompletableFuture.supplyAsync(()->{
+ try {
+// PushConsumerExample.main(null);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ CompletableFuture.supplyAsync(()->{
+ try {
+ ProducerExample.main(null);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ }else {
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ ConsumerRemoting.main(null);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ ProdurcerRemoting.main(null);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ }
+ Thread.sleep(1000*10);
+ }
+
+// private static void testACL() {
+// System.out.println("开始测试acl修改...");
+// MyMQAdminStartup myAdminStartup = new MyMQAdminStartup();
+// myAdminStartup.init();
+//
+// AccountPermissionDto accountPermissionDto = AccountPermissionDto.valueOfTestData(123);
+// if (!accountPermissionDto.isValid()) {
+// throw new IllegalArgumentException("无效参数:"+accountPermissionDto.toString());
+// }
+//
+// String[] clusterListArgs = new String[]{
+// MyClusterListSubCommand.COMMAND_NAME,
+// "-n", myAdminStartup.namesrvAddr
+// };
+// myAdminStartup.runACommand(clusterListArgs);
+// if (MyClusterListSubCommand.BROKER_ADDRESSES.size()<=0) {
+// throw new RuntimeException("无法找到broker");
+// }
+// CopyOnWriteArrayList brokerAddresses = MyClusterListSubCommand.BROKER_ADDRESSES;
+//
+// for (String brokerAddr : brokerAddresses) {
+// myAdminStartup.runACommand(accountPermissionDto.buildCreateAccountCommand(
+// MyUpdateAccessConfigSubCommand.COMMAND_NAME,myAdminStartup.namesrvAddr,brokerAddr));
+// }
+// }
+
+}
diff --git a/src/main/java/com/example/demo/Enum/PermissionTypeOfRocketMQ.java b/src/main/java/com/example/demo/Enum/PermissionTypeOfRocketMQ.java
new file mode 100644
index 0000000..ad9171d
--- /dev/null
+++ b/src/main/java/com/example/demo/Enum/PermissionTypeOfRocketMQ.java
@@ -0,0 +1,34 @@
+package com.example.demo.Enum;
+
+
+/**
+ * RocketMQ的权限类型
+ */
+public enum PermissionTypeOfRocketMQ {
+ TOPIC(0,"topic"),
+ GROUP(1,"group"),
+ ;
+ int code;
+ String name;
+
+ PermissionTypeOfRocketMQ(int code,String name){
+ this.code = code;
+ this.name = name;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return "PermissionTypeOfRocketMQ{" +
+ "code=" + code +
+ ", name='" + name + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/example/demo/Enum/PermissionValueOfRocketMQ.java b/src/main/java/com/example/demo/Enum/PermissionValueOfRocketMQ.java
new file mode 100644
index 0000000..35f5f51
--- /dev/null
+++ b/src/main/java/com/example/demo/Enum/PermissionValueOfRocketMQ.java
@@ -0,0 +1,52 @@
+package com.example.demo.Enum;
+
+/**
+ * rocketMQ的权限字段枚举
+ */
+
+public enum PermissionValueOfRocketMQ {
+
+ /**
+ * 订阅权限
+ */
+ SUB(0,"SUB"),
+
+ /**
+ * 发布权限
+ */
+ PUB(1,"PUB"),
+
+ /**
+ * 订阅和发布权限
+ */
+ SUB_PUB(2,"SUB|PUB"),
+
+ /**
+ * 没有任何权限
+ */
+ DENY(3,"DENY");
+
+ String permissionString;
+ int code;
+
+ PermissionValueOfRocketMQ(int code, String permissionString){
+ this.code = code;
+ this.permissionString = permissionString;
+ }
+
+ @Override
+ public String toString() {
+ return "PermissionValueOfRocketMQ{" +
+ "permissionString='" + permissionString + '\'' +
+ ", code=" + code +
+ '}';
+ }
+
+ public String getPermissionString() {
+ return permissionString;
+ }
+
+ public int getCode() {
+ return code;
+ }
+}
diff --git a/src/main/java/com/example/demo/controllers/MessageController.java b/src/main/java/com/example/demo/controllers/MessageController.java
new file mode 100644
index 0000000..dd912db
--- /dev/null
+++ b/src/main/java/com/example/demo/controllers/MessageController.java
@@ -0,0 +1,89 @@
+package com.example.demo.controllers;
+
+import com.example.demo.grpcProtocol.Config;
+import org.apache.rocketmq.client.apis.*;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+@Controller
+public class MessageController {
+ public static final Logger logger = LoggerFactory.getLogger(MessageController.class);
+
+ @GetMapping("/message")
+ public String getMessagePage() {
+ return "message0";
+ }
+
+ @GetMapping("/consumeMessage")
+ @ResponseBody
+ public List consumeMessage(@RequestParam String addr,
+ @RequestParam String group,
+ @RequestParam String topic,
+ @RequestParam String ak,
+ @RequestParam String sk) {
+ // Simulating message consumption based on parameters
+// List messages = new ArrayList<>();
+// for (int i = 0; i < 10; i++) {
+// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
+// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
+// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
+// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
+// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
+// messages.add("Message 1hasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdfhasuhdfkahsdkjfhashdfhakjsdhfkjahsdjfhajksdhfkajsdhfkjasdf");
+// }
+// return messages;
+ try {
+ return consumerMessage(addr,group,topic,ak,sk);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ return Collections.singletonList(e.getMessage());
+ }
+ }
+
+ public List consumerMessage(String addr,String group,String topic,String accessKey,String secretKey) throws InterruptedException, ClientException {
+ ArrayList results = new ArrayList<>();
+ ClientConfiguration clientConfiguration = new ClientConfigurationBuilder()
+ .setEndpoints(addr)
+ .setCredentialProvider(new StaticSessionCredentialsProvider(accessKey,secretKey))
+ .build();
+ PushConsumer pushConsumer = ClientServiceProvider.loadService().newPushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(group)
+ .setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
+ .setMessageListener(messageView -> {
+ ByteBuffer byteBuffer = messageView.getBody();
+ String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
+ results.add(content);
+ return ConsumeResult.SUCCESS;
+ })
+ .build();
+ Thread.sleep(4_000);
+ CompletableFuture.supplyAsync(()->{
+ try {
+ pushConsumer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ return results;
+ }
+}
diff --git a/src/main/java/com/example/demo/controllers/TestController.java b/src/main/java/com/example/demo/controllers/TestController.java
new file mode 100644
index 0000000..95176ab
--- /dev/null
+++ b/src/main/java/com/example/demo/controllers/TestController.java
@@ -0,0 +1,17 @@
+package com.example.demo.controllers;
+
+import com.example.demo.dto.TestMessage;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class TestController {
+
+ @PostMapping("/test")
+ public void test(@RequestBody TestMessage testMessage){
+
+ System.out.println(testMessage);
+ }
+}
diff --git a/src/main/java/com/example/demo/dto/AccountPermissionDto.java b/src/main/java/com/example/demo/dto/AccountPermissionDto.java
new file mode 100644
index 0000000..9207b96
--- /dev/null
+++ b/src/main/java/com/example/demo/dto/AccountPermissionDto.java
@@ -0,0 +1,174 @@
+package com.example.demo.dto;
+
+
+import com.example.demo.Enum.PermissionTypeOfRocketMQ;
+import com.example.demo.Enum.PermissionValueOfRocketMQ;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class AccountPermissionDto {
+ private String accessKey;
+ private String secretKey;
+ private List topicPermissions;
+ private List groupPermissions;
+
+ public static AccountPermissionDto valueOfTestData(int i) {
+ AccountPermissionDto accountPermissionDto = new AccountPermissionDto();
+ accountPermissionDto.accessKey = "testAccessKey"+i;
+ accountPermissionDto.secretKey = "testSecretKey";
+ accountPermissionDto.topicPermissions = new ArrayList<>();
+ PermissionOfRocketMQDto e = new PermissionOfRocketMQDto();
+ e.setPermissionName("TestTopic_ljt");
+ e.setPermissionType(PermissionTypeOfRocketMQ.TOPIC);
+ e.setPermissionValue(PermissionValueOfRocketMQ.SUB);
+ accountPermissionDto.topicPermissions.add(e);
+ PermissionOfRocketMQDto e1 = new PermissionOfRocketMQDto();
+ e1.setPermissionName("TestTopic_ljt2");
+ e1.setPermissionType(PermissionTypeOfRocketMQ.TOPIC);
+ e1.setPermissionValue(PermissionValueOfRocketMQ.SUB);
+ accountPermissionDto.topicPermissions.add(e1);
+ accountPermissionDto.groupPermissions = new ArrayList<>();
+ PermissionOfRocketMQDto e2 = new PermissionOfRocketMQDto();
+ e2.setPermissionName("ljtConsumerGroup");
+ e2.setPermissionType(PermissionTypeOfRocketMQ.GROUP);
+ e2.setPermissionValue(PermissionValueOfRocketMQ.SUB);
+ accountPermissionDto.groupPermissions.add(e2);
+ PermissionOfRocketMQDto e3 = new PermissionOfRocketMQDto();
+ e3.setPermissionName("ljtConsumerGroup2");
+ e3.setPermissionType(PermissionTypeOfRocketMQ.GROUP);
+ e3.setPermissionValue(PermissionValueOfRocketMQ.SUB);
+ accountPermissionDto.groupPermissions.add(e3);
+ return accountPermissionDto;
+ }
+
+ /**
+ * 参数是否有效
+ * @return boolean
+ */
+ public boolean isValid() {
+ if(accessKey == null || accessKey.equals("") || secretKey == null || secretKey.equals("")){
+ return false;
+ }
+ if(!isTopicValidate() && !isGroupValidate()){
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isGroupValidate() {
+ if(!haveGroupPermissions()){
+ return false;
+ }else{
+ for (PermissionOfRocketMQDto groupPermission : groupPermissions) {
+ if(!groupPermission.isValidate())return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isTopicValidate() {
+ if(!haveTopicPermissions()) {
+ return false;
+ }else{
+ for (PermissionOfRocketMQDto topicPermission : topicPermissions) {
+ if(!topicPermission.isValidate())return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * 构建创建账户的mqadmin命令
+ * @return String[]
+ */
+ public String[] buildCreateAccountCommand(String commandName, String serverAddr, String brokerAddr) {
+ List commandList = new ArrayList<>();
+// commandList.add("updateAclConfig");
+ commandList.add(commandName);
+ commandList.add("-n");
+ commandList.add(serverAddr);
+ commandList.add("-b");
+ commandList.add(brokerAddr);
+ commandList.add("-a");
+ commandList.add(accessKey);
+ commandList.add("-s");
+ commandList.add(secretKey);
+ if(haveTopicPermissions()){
+ commandList.add("-t");
+ commandList.add(buildTopicCommand());
+ }
+ if(haveGroupPermissions()){
+ commandList.add("-g");
+ commandList.add(buildGroupCommand());
+ }
+ return commandList.toArray(new String[0]);
+ }
+
+ private String buildGroupCommand() {
+ return buildCommandLine(groupPermissions);
+ }
+
+ private boolean haveGroupPermissions() {
+ return groupPermissions!=null && groupPermissions.size()>0;
+ }
+
+ private String buildTopicCommand() {
+ return buildCommandLine(topicPermissions);
+ }
+
+ private boolean haveTopicPermissions() {
+ return topicPermissions!=null && topicPermissions.size()>0;
+ }
+
+ private String buildCommandLine(List permissions) {
+ StringBuilder r = new StringBuilder();
+ boolean isTheFirstLine = true;
+ for (PermissionOfRocketMQDto permission : permissions) {
+ if (!isTheFirstLine) {
+ r.append(",");
+ }else{
+ isTheFirstLine = false;
+ }
+ r.append(permission.buildCommand());
+ }
+ return r.toString();
+ }
+
+ public boolean isAccessKeyValid() {
+ return accessKey!=null && accessKey!="";
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public List getTopicPermissions() {
+ return topicPermissions;
+ }
+
+ public void setTopicPermissions(List topicPermissions) {
+ this.topicPermissions = topicPermissions;
+ }
+
+ public List getGroupPermissions() {
+ return groupPermissions;
+ }
+
+ public void setGroupPermissions(List groupPermissions) {
+ this.groupPermissions = groupPermissions;
+ }
+}
diff --git a/src/main/java/com/example/demo/dto/PermissionOfRocketMQDto.java b/src/main/java/com/example/demo/dto/PermissionOfRocketMQDto.java
new file mode 100644
index 0000000..889d9ed
--- /dev/null
+++ b/src/main/java/com/example/demo/dto/PermissionOfRocketMQDto.java
@@ -0,0 +1,80 @@
+package com.example.demo.dto;
+
+
+import com.example.demo.Enum.PermissionTypeOfRocketMQ;
+import com.example.demo.Enum.PermissionValueOfRocketMQ;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * rocketMQ权限
+ */
+public class PermissionOfRocketMQDto {
+ private String permissionName;
+ private PermissionTypeOfRocketMQ permissionType;
+ private PermissionValueOfRocketMQ permissionValue;
+
+ public PermissionOfRocketMQDto() {
+ }
+
+ public PermissionOfRocketMQDto(String permissionName, PermissionTypeOfRocketMQ permissionType, PermissionValueOfRocketMQ permissionValue) {
+ this.permissionName = permissionName;
+ this.permissionType = permissionType;
+ this.permissionValue = permissionValue;
+ }
+
+ public String getPermissionName() {
+ return permissionName;
+ }
+
+ public void setPermissionName(String permissionName) {
+ this.permissionName = permissionName;
+ }
+
+ public PermissionTypeOfRocketMQ getPermissionType() {
+ return permissionType;
+ }
+
+ public void setPermissionType(PermissionTypeOfRocketMQ permissionType) {
+ this.permissionType = permissionType;
+ }
+
+ public PermissionValueOfRocketMQ getPermissionValue() {
+ return permissionValue;
+ }
+
+ public void setPermissionValue(PermissionValueOfRocketMQ permissionValue) {
+ this.permissionValue = permissionValue;
+ }
+
+ /**
+ * 构建topic权限对象
+ * @param topicName 入参
+ * @param permissionValue 权限枚举类
+ * @return PermissionOfRocketMQDto
+ * @throws IllegalArgumentException 非法参数
+ */
+ public static PermissionOfRocketMQDto valueOfTopic(String topicName,PermissionValueOfRocketMQ permissionValue) throws IllegalArgumentException{
+ if(topicName==null){
+ throw new IllegalArgumentException("topic name can not be null");
+ }
+ return new PermissionOfRocketMQDto(topicName, PermissionTypeOfRocketMQ.TOPIC,permissionValue);
+ }
+
+ public static PermissionOfRocketMQDto valueOfGroup(String name, PermissionValueOfRocketMQ permissionValue) throws IllegalArgumentException{
+ if(name==null){
+ throw new IllegalArgumentException("group name can not be null");
+ }
+ return new PermissionOfRocketMQDto(name, PermissionTypeOfRocketMQ.GROUP, permissionValue);
+ }
+
+ public String buildCommand() {
+ return getPermissionName()+"="+getPermissionValue().getPermissionString();
+ }
+
+ public boolean isValidate() {
+ return permissionName!=null && permissionName!="";
+ }
+}
diff --git a/src/main/java/com/example/demo/dto/TestMessage.java b/src/main/java/com/example/demo/dto/TestMessage.java
new file mode 100644
index 0000000..4c17be4
--- /dev/null
+++ b/src/main/java/com/example/demo/dto/TestMessage.java
@@ -0,0 +1,35 @@
+package com.example.demo.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+public class TestMessage {
+
+ String name;
+
+ @JsonIgnore
+ String age;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getAge() {
+ return age;
+ }
+
+ public void setAge(String age) {
+ this.age = age;
+ }
+
+ @Override
+ public String toString() {
+ return "TestMessage{" +
+ "name='" + name + '\'' +
+ ", age='" + age + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/example/demo/grpcProtocol/Config.java b/src/main/java/com/example/demo/grpcProtocol/Config.java
new file mode 100644
index 0000000..35b93fc
--- /dev/null
+++ b/src/main/java/com/example/demo/grpcProtocol/Config.java
@@ -0,0 +1,14 @@
+package com.example.demo.grpcProtocol;
+
+import java.util.Calendar;
+
+public class Config {
+ public static String ROCKET_MQ_SERVICE_ADD = "192.168.74.248:28081";
+ public static String GROUP = "1814209072292421634";
+// public static String TOPIC = "1000101_testTopicName14";
+ //String group = "ljtConsumerGroup";
+ public static String TOPIC = "1000101_testTopicName15";
+ public static String ACCESS_KEY = "rocketmq2";
+
+ public static String SECRET_KEY = "12345678";
+}
diff --git a/src/main/java/com/example/demo/grpcProtocol/ProducerExample.java b/src/main/java/com/example/demo/grpcProtocol/ProducerExample.java
new file mode 100644
index 0000000..832c335
--- /dev/null
+++ b/src/main/java/com/example/demo/grpcProtocol/ProducerExample.java
@@ -0,0 +1,97 @@
+package com.example.demo.grpcProtocol;
+import com.example.demo.DemoApplication;
+import org.apache.rocketmq.client.apis.*;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+
+//public class ProducerExample {
+// private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
+//
+// public static void main(String[] args) throws ClientException, IOException, InterruptedException {
+// logger.info("running producer...");
+// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+// String endpoint = Config.ROCKET_MQ_SERVICE_ADD;
+// String topic = Config.TOPIC;
+// ClientServiceProvider provider = ClientServiceProvider.loadService();
+// ClientConfiguration configuration = new ClientConfigurationBuilder()
+// .setEndpoints(endpoint)
+// .setCredentialProvider(new StaticSessionCredentialsProvider(Config.ACCESS_KEY,Config.SECRET_KEY))
+// .build();
+// // 初始化Producer时需要设置通信配置以及预绑定的Topic。
+// Producer producer = provider.newProducerBuilder()
+// .setTopics(topic)
+// .setClientConfiguration(configuration)
+// .build();
+// try {
+// Scanner scanner = new Scanner(System.in);
+// System.out.println("请输出要发送的内容");
+// while (scanner.hasNext()){
+// Message message = provider.newMessageBuilder()
+// .setTopic(topic)
+// .setBody((simpleDateFormat.format(new Date())+":"+scanner.next()).getBytes())
+// .build();
+// SendReceipt sendReceipt = producer.send(message);
+// logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+// }
+// } catch (ClientException e) {
+// logger.error("Failed to send message", e);
+// }
+// producer.close();
+// logger.info("producer shutdown...");
+// }
+//}
+
+import org.apache.rocketmq.client.apis.*;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Scanner;
+
+public class ProducerExample {
+// private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
+
+ public static void main(String[] args) throws IOException, InterruptedException, IOException, ClientException {
+// logger.info("running producer...");
+// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String endpoint = "ciotdmp.unimip.cn:28081";
+ String topic = "1000108_Test";
+ ClientServiceProvider provider = ClientServiceProvider.loadService();
+ ClientConfiguration configuration = new ClientConfigurationBuilder()
+ .setEndpoints(endpoint)
+ .setCredentialProvider(new StaticSessionCredentialsProvider("rmqadminstrator", "V#gVwMlh%uuLKsYrt7&gvnq1^"))
+ .build();
+ // 初始化Producer时需要设置通信配置以及预绑定的Topic。
+ Producer producer = provider.newProducerBuilder()
+ .setTopics(topic)
+ .setClientConfiguration(configuration)
+ .build();
+ try {
+ Scanner scanner = new Scanner(System.in);
+ System.out.println("请输出要发送的内容");
+ while (scanner.hasNext()) {
+ Message message = provider.newMessageBuilder()
+ .setTopic(topic)
+ .setBody(("(simpleDateFormat.format(new Date()) + " + scanner.next()).getBytes())
+ .build();
+ SendReceipt sendReceipt = producer.send(message);
+ System.out.println("Send message successfully, messageId={}" + sendReceipt);
+// logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }
+ } catch (ClientException e) {}
+ }
+}
diff --git a/src/main/java/com/example/demo/grpcProtocol/PushConsumerExample.java b/src/main/java/com/example/demo/grpcProtocol/PushConsumerExample.java
new file mode 100644
index 0000000..d501790
--- /dev/null
+++ b/src/main/java/com/example/demo/grpcProtocol/PushConsumerExample.java
@@ -0,0 +1,105 @@
+package com.example.demo.grpcProtocol;
+
+import com.example.demo.DemoApplication;
+import org.apache.rocketmq.client.apis.*;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Date;
+
+public class PushConsumerExample {
+ private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
+
+ private PushConsumerExample() {
+ }
+
+// public static void main(String[] args) throws ClientException, IOException, InterruptedException {
+// logger.info("running consumer...");
+// final ClientServiceProvider provider = ClientServiceProvider.loadService();
+// // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
+//// String endpoints = "192.168.74.248:28081";
+// String endpoints = "192.168.6.90:28081";
+//// String endpoints = System.getProperty("addr");
+// ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+// .setEndpoints(endpoints)
+// .build();
+// ClientConfigurationBuilder clientConfigurationBuilder = new ClientConfigurationBuilder();
+// clientConfiguration = clientConfigurationBuilder
+// .setEndpoints(endpoints)
+//// .enableSsl(false)
+// .setCredentialProvider(new StaticSessionCredentialsProvider("1815638479991795714","a1d018cd992b19e59b9a1657194343a519b784e56ee29c8d1e398657b3fbcbed"))
+//// .setCredentialProvider(new StaticSessionCredentialsProvider("rocketmq2","12345678"))
+//// .setCredentialProvider(new StaticSessionCredentialsProvider(System.getProperty("ak"),System.getProperty("sk")))
+// .build();
+// // 订阅消息的过滤规则,表示订阅所有Tag的消息。
+// String tag = "*";
+// FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
+// // 为消费者指定所属的消费者分组,Group需要提前创建。
+// String consumerGroup = "test2";
+// // 指定需要订阅哪个目标Topic,Topic需要提前创建。
+// String topic = DemoApplication.TOPIC;
+// // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
+// PushConsumer pushConsumer = provider.newPushConsumerBuilder()
+// .setClientConfiguration(clientConfiguration)
+// // 设置消费者分组。
+// .setConsumerGroup(consumerGroup)
+// // 设置预绑定的订阅关系。
+// .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+// // 设置消费监听器。
+// .setMessageListener(messageView -> {
+// ByteBuffer byteBuffer = messageView.getBody();
+// String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
+//
+// // 处理消息并返回消费结果。
+// logger.info("Consume message successfully, messageId={},message body:{}", messageView.getMessageId(),content);
+// return ConsumeResult.SUCCESS;
+// })
+// .build();
+// logger.info("consumer initialized...");
+// Thread.sleep(Long.MAX_VALUE);
+// // 如果不需要再使用 PushConsumer,可关闭该实例。
+// pushConsumer.close();
+// logger.info("consumer shutdown...");
+// }
+
+ public static void main(String[] args) throws ClientException, IOException, InterruptedException {
+ //need to change
+ String rocketMQServiceAdd = Config.ROCKET_MQ_SERVICE_ADD;
+ String group = Config.GROUP;
+ String topic = Config.TOPIC;
+ String accessKey = Config.ACCESS_KEY;
+ String secretKey = Config.SECRET_KEY;
+// String rocketMQServiceAdd = System.getProperty("addr");
+// String group = System.getProperty("group");
+// String topic = System.getProperty("topic");
+// String accessKey = System.getProperty("ak");
+// String secretKey = System.getProperty("sk");
+
+ ClientConfiguration clientConfiguration = new ClientConfigurationBuilder()
+ .setEndpoints(rocketMQServiceAdd)
+ .setCredentialProvider(new StaticSessionCredentialsProvider(accessKey,secretKey))
+ .build();
+ PushConsumer pushConsumer = ClientServiceProvider.loadService().newPushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(group)
+ .setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
+ .setMessageListener(messageView -> {
+ ByteBuffer byteBuffer = messageView.getBody();
+ String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
+ System.out.println(new Date()+":Consume message successfully, messageId="+messageView.getMessageId()+",message body:"+content);
+ return ConsumeResult.SUCCESS;
+ })
+ .build();
+ System.out.println(new Date()+"-qidong!!");
+ Thread.sleep(Long.MAX_VALUE);
+ pushConsumer.close();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/example/demo/kafka/MyKafkaProducer.java b/src/main/java/com/example/demo/kafka/MyKafkaProducer.java
new file mode 100644
index 0000000..8df2d60
--- /dev/null
+++ b/src/main/java/com/example/demo/kafka/MyKafkaProducer.java
@@ -0,0 +1,68 @@
+package com.example.demo.kafka;
+
+
+import com.example.demo.kafka.encrypt.AesUtil;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.stereotype.Service;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import javax.annotation.Resource;
+import javax.crypto.BadPaddingException;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+@Service
+public class MyKafkaProducer {
+ private static String secret = "mA%@eaIF!I$0$NpVDDc&j*e@wdb1SpUU";
+
+
+ public static void main(String[] args) throws InterruptedException, NoSuchPaddingException, InvalidKeyException, NoSuchAlgorithmException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException, ExecutionException {
+ //need to change
+ String addr = "120.197.158.251:9092";
+ String topic = "device_alarm_add";
+ String username = "liantong";
+ String password = "Liantong@123";
+
+ // 1、创建kafka生产者的配置对象
+ Properties properties = new Properties();
+
+ // 2、给kafka配置对象添加配置信息:bootstrap.servers
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, addr);
+ // key,value(必须):key.serializer, value.serializer
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ // 设置安全协议为SASL_PLAINTEXT
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+
+ // 设置SASL认证机制为PLAIN
+ properties.put("sasl.mechanism", "PLAIN");
+
+ // 设置认证信息,格式为:username:password
+ String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
+ String jaasCfg = String.format(jaasTemplate, username, password);
+ properties.put("sasl.jaas.config", jaasCfg);
+
+ // 3、创建kafka生产者对象
+ KafkaProducer kafkaProducer = new KafkaProducer(properties);
+
+ // 4、调用send方法,发送消息
+ kafkaProducer.send(new ProducerRecord<>(topic, AesUtil.encrypt(secret,"testMessage"))).get();
+ System.out.println("send succeed");
+ // 5、关闭资源
+ kafkaProducer.close();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/example/demo/kafka/MyUIAP.java b/src/main/java/com/example/demo/kafka/MyUIAP.java
new file mode 100644
index 0000000..2853da8
--- /dev/null
+++ b/src/main/java/com/example/demo/kafka/MyUIAP.java
@@ -0,0 +1,77 @@
+package com.example.demo.kafka;
+
+import client.CiotClient;
+import client.CiotRequestClient;
+import model.HttpResult;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import utils.DigestUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class MyUIAP {
+ public static void main(String[] args) {
+// HttpResult device = createDevice();
+// System.out.println(device.getBody());
+
+
+ String appId = "uET1mRLpQ6";
+ String appSecret = "rx71G2CDXH";
+// String host = "http://localhost:8085";
+ String host = "http://100.74.68.167:8080";
+ //String host = "https://iot.unimip.cn/gw";//要作为配置项
+ String uri = "/openapi/uiap-open/collect/devices/create";
+ String productKey = "7ZvzoBNI2r";
+
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ // 创建 HttpPost 请求
+ HttpPost post = new HttpPost(host+uri);
+
+ // 添加请求头
+ post.setHeader("appKey", appSecret);
+ post.setHeader("appId", appId);
+ String nonce = UUID.randomUUID().toString();
+ post.setHeader("nonce", nonce);
+ String tm = String.valueOf(new Date().getTime());
+ post.setHeader("tm", tm);
+ post.setHeader("sign", buildSign(appId,nonce,tm,appSecret,uri));
+
+ // 设置请求体
+ StringEntity entity = new StringEntity("{\"productKey\":\"7ZvzoBNI2r\",\"deviceInfo\":{\"Name\":\"ljtTestDeviceName\"}}");
+ post.setEntity(entity);
+
+ // 发送请求
+ try (CloseableHttpResponse response = httpClient.execute(post)) {
+ // 获取响应码
+ int statusCode = response.getStatusLine().getStatusCode();
+ System.out.println("Response Code: " + statusCode);
+
+ // 获取响应实体
+ HttpEntity responseEntity = response.getEntity();
+ if (responseEntity != null) {
+ System.out.println("Response Content Length: " + responseEntity.getContentLength());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static String buildSign(String appId,String nonce,String tm,String appKey,String uri) throws NoSuchAlgorithmException {
+ String originTest = "appId="+appId+"&nonce="+nonce+"&tm="+tm+"&uri="+uri+"&appKey="+appKey;
+// String originTest = "appId=test&nonce=1bf544a0-d129-484e-873f-16912f7d3b82&tm=1645497754337&uri=/openapi/uiapopen/devices/service/invoke&appKey=test";
+ String s = DigestUtil.md5(originTest).toUpperCase();
+ System.out.println(s);
+ return s;
+ }
+}
diff --git a/src/main/java/com/example/demo/kafka/encrypt/AesUtil.java b/src/main/java/com/example/demo/kafka/encrypt/AesUtil.java
new file mode 100644
index 0000000..94bae17
--- /dev/null
+++ b/src/main/java/com/example/demo/kafka/encrypt/AesUtil.java
@@ -0,0 +1,44 @@
+package com.example.demo.kafka.encrypt;
+
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+public class AesUtil {
+
+ private static final String ENCRYPT_ALGORITHMS = "AES";
+
+ private static final String ENCRYPT_PATTERN = "AES/CBC/PKCS5Padding";
+
+ private static final byte[] INITIAL_IV = {-5, -10, 105, -93, 3, 123, -76, -44, 116, 122, -54, -86, -47, 3, -22, -79};
+
+ public static String encrypt(String secret, String data) throws NoSuchPaddingException, NoSuchAlgorithmException,
+ InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException, IllegalBlockSizeException {
+ byte[] iv = INITIAL_IV;
+ byte[] key = secret.getBytes(StandardCharsets.UTF_8);
+ SecretKeySpec secretKeySpec = new SecretKeySpec(key, ENCRYPT_ALGORITHMS);
+ IvParameterSpec ivParameterSpec = new IvParameterSpec(iv);
+ Cipher cipher = Cipher.getInstance(ENCRYPT_PATTERN);
+ cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, ivParameterSpec);
+ String encryptString = Util.getHexString(cipher.doFinal(data.getBytes(StandardCharsets.UTF_8)));
+ return encryptString;
+ }
+
+ public static String decrypt(String secret, String data) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException, IllegalBlockSizeException {
+ byte[] key = secret.getBytes(StandardCharsets.UTF_8);
+ byte[] iv = INITIAL_IV;
+ byte[] encryptedData = Util.hexStringToBytes(data);
+ Cipher cipher = Cipher.getInstance(ENCRYPT_PATTERN);
+ SecretKeySpec secretKeySpec = new SecretKeySpec(key, ENCRYPT_ALGORITHMS);
+ IvParameterSpec ivParameterSpec = new IvParameterSpec(iv);
+ cipher.init(Cipher.DECRYPT_MODE, secretKeySpec, ivParameterSpec);
+ return new String(cipher.doFinal(encryptedData));
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/example/demo/kafka/encrypt/Main.java b/src/main/java/com/example/demo/kafka/encrypt/Main.java
new file mode 100644
index 0000000..a6f44ab
--- /dev/null
+++ b/src/main/java/com/example/demo/kafka/encrypt/Main.java
@@ -0,0 +1,26 @@
+package com.example.demo.kafka.encrypt;
+
+import javax.crypto.BadPaddingException;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+public class Main {
+
+ private static String secret = "wIFLxxn!@gRv2KnKohYsymDTesJWsQ5x";
+
+ public static void main(String[] args) throws InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, NoSuchAlgorithmException, BadPaddingException, InvalidKeyException {
+
+ String data = "xiaofang_test";
+
+ String encryptData = AesUtil.encrypt(secret, data);
+
+ System.out.println("加密数据:" + encryptData);
+
+ String decryptData = AesUtil.decrypt(secret, encryptData);
+
+ System.out.println("解密数据:" + decryptData);
+ }
+}
diff --git a/src/main/java/com/example/demo/kafka/encrypt/Util.java b/src/main/java/com/example/demo/kafka/encrypt/Util.java
new file mode 100644
index 0000000..227b281
--- /dev/null
+++ b/src/main/java/com/example/demo/kafka/encrypt/Util.java
@@ -0,0 +1,34 @@
+package com.example.demo.kafka.encrypt;
+
+public class Util {
+ public static String getHexString(byte[] bytes) {
+ String ret = "";
+ for (int i = 0; i < bytes.length; i++) {
+ ret += Integer.toString((bytes[i] & 0xff) + 0x100, 16).substring(1);
+ }
+ return ret.toUpperCase();
+ }
+
+ public static byte[] hexStringToBytes(String hexString)
+ {
+ if (hexString == null || hexString.equals(""))
+ {
+ return null;
+ }
+ hexString = hexString.toUpperCase();
+ int length = hexString.length() / 2;
+ char[] hexChars = hexString.toCharArray();
+ byte[] d = new byte[length];
+ for (int i = 0; i < length; i++)
+ {
+ int pos = i * 2;
+ d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
+ }
+ return d;
+ }
+
+ private static byte charToByte(char c)
+ {
+ return (byte) "0123456789ABCDEF".indexOf(c);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/example/demo/mongo/mongoTest.java b/src/main/java/com/example/demo/mongo/mongoTest.java
new file mode 100644
index 0000000..716426a
--- /dev/null
+++ b/src/main/java/com/example/demo/mongo/mongoTest.java
@@ -0,0 +1,29 @@
+package com.example.demo.mongo;
+
+import cn.org.wangchangjiu.sqltomongo.core.Converter;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoDriverInformation;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.internal.MongoClientImpl;
+import org.bson.Document;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.stereotype.Service;
+import org.springframework.web.bind.annotation.PostMapping;
+
+import javax.annotation.PostConstruct;
+
+public class mongoTest {
+ public static void main(String[] args) {
+ MongoClient mongoClient = MongoClients.create("mongodb://admin:123@192.168.9.137:27017/admin");
+ MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "test_convert_db");
+
+ String sql = "select * from table_a";
+ Document command = (Document) Converter.executeForBson(sql);
+ System.out.println(Converter.execute(sql));
+ System.out.println(command);
+ System.out.println("result:"+ mongoTemplate.executeCommand(command));
+ }
+}
diff --git a/src/main/java/com/example/demo/remotingProtocol/ConsumerRemoting.java b/src/main/java/com/example/demo/remotingProtocol/ConsumerRemoting.java
new file mode 100644
index 0000000..5f0a0bc
--- /dev/null
+++ b/src/main/java/com/example/demo/remotingProtocol/ConsumerRemoting.java
@@ -0,0 +1,42 @@
+package com.example.demo.remotingProtocol;
+
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class ConsumerRemoting {
+ public static void main(String[] args) throws Exception {
+ // 创建消费者实例,并设置消费者组名
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("123","ljtConsumerGroup",
+ new AclClientRPCHook(new SessionCredentials(System.getProperty("ak"), System.getProperty("sk"))));
+ // 设置 Name Server 地址,此处为示例,实际使用时请替换为真实的 Name Server 地址
+ consumer.setNamesrvAddr("192.168.6.93:9876");
+// consumer.setNamesrvAddr(System.getProperty("addr"));
+ // 订阅指定的主题和标签(* 表示所有标签)
+ consumer.subscribe("TestTopic_ljt", "*");
+
+ // 注册消息监听器
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ for (MessageExt msg : msgs) {
+ System.out.println("Received message: " + new String(msg.getBody()));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ // 启动消费者
+ consumer.start();
+ System.out.println("Consumer started.");
+ }
+}
diff --git a/src/main/java/com/example/demo/remotingProtocol/ProdurcerRemoting.java b/src/main/java/com/example/demo/remotingProtocol/ProdurcerRemoting.java
new file mode 100644
index 0000000..2c3deac
--- /dev/null
+++ b/src/main/java/com/example/demo/remotingProtocol/ProdurcerRemoting.java
@@ -0,0 +1,36 @@
+package com.example.demo.remotingProtocol;
+
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+
+public class ProdurcerRemoting {
+
+ public static void main(String[] args) throws Exception {
+ // 创建生产者实例,并设置生产者组名
+ DefaultMQProducer producer = new DefaultMQProducer("ljtConsumerGroup",
+ new AclClientRPCHook(new SessionCredentials(System.getProperty("ak"), System.getProperty("sk"))));
+ // 设置 Name Server 地址,此处为示例,实际使用时请替换为真实的 Name Server 地址
+ producer.setNamesrvAddr("192.168.6.93:9876");
+// producer.setNamesrvAddr(System.getProperty("addr"));
+ producer.start();
+
+ try {
+ // 创建消息实例,指定 topic、Tag和消息体
+ Message msg = new Message("TestTopic_ljt", "TagA", ("Hello RocketMQ").getBytes());
+ // 发送消息并获取发送结果
+ SendResult sendResult = producer.send(msg);
+ System.out.println("Message sent: " + new String(msg.getBody()));
+ System.out.println("Send result: " + sendResult);
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Message sending failed.");
+ } finally {
+ // 关闭生产者
+ producer.shutdown();
+ }
+ }
+
+}
diff --git a/src/main/java/com/example/demo/rocketMq/MyClusterListSubCommand.java b/src/main/java/com/example/demo/rocketMq/MyClusterListSubCommand.java
new file mode 100644
index 0000000..f03265f
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketMq/MyClusterListSubCommand.java
@@ -0,0 +1,122 @@
+package com.example.demo.rocketMq;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+//import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+//import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class MyClusterListSubCommand implements SubCommand {
+
+ public static CopyOnWriteArrayList CLUSTER_NAMES = new CopyOnWriteArrayList<>();
+ public static CopyOnWriteArrayList BROKER_ADDRESSES = new CopyOnWriteArrayList<>();
+ public static final String COMMAND_NAME = "myClusterList";
+ DefaultMQAdminExt defaultMQAdminExt;
+
+ MyClusterListSubCommand(DefaultMQAdminExt defaultMQAdminExt){
+ this.defaultMQAdminExt = defaultMQAdminExt;
+ }
+
+ @Override
+ public String commandName() {
+ return COMMAND_NAME;
+ }
+
+ @Override
+ public String commandDesc() {
+ return "List cluster infos";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("m", "moreStats", false, "Print more stats");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("i", "interval", true, "specify intervals numbers, it is in seconds");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(final CommandLine commandLine, final Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+ long printInterval = 1;
+ boolean enableInterval = commandLine.hasOption('i');
+
+ if (enableInterval) {
+ printInterval = Long.parseLong(commandLine.getOptionValue('i')) * 1000;
+ }
+
+ String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : "";
+
+// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+// defaultMQAdminExt.start();
+ long i = 0;
+ do {
+ if (i++ > 0) {
+ Thread.sleep(printInterval);
+ }
+
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+
+ CLUSTER_NAMES = new CopyOnWriteArrayList<>(getTargetClusterNames(clusterName, clusterInfo));
+ BROKER_ADDRESSES = new CopyOnWriteArrayList<>(getBrokerAddresses(clusterInfo));
+ } while (enableInterval);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+// defaultMQAdminExt.shutdown();
+ }
+
+ }
+
+ private Set getBrokerAddresses(ClusterInfo clusterInfo) {
+ HashSet results = new HashSet<>();
+ Map brokerAddrTable = clusterInfo.getBrokerAddrTable();
+ if(brokerAddrTable!=null){
+ for (Map.Entry entry : brokerAddrTable.entrySet()) {
+ BrokerData value = entry.getValue();
+ if (value!=null) {
+ HashMap brokerAddrs = value.getBrokerAddrs();
+ if (brokerAddrs!=null) {
+ for (Map.Entry stringEntry : brokerAddrs.entrySet()) {
+ results.add(stringEntry.getValue());
+ }
+ }
+ }
+ }
+ }
+ return results;
+ }
+
+ private Set getTargetClusterNames(String clusterName, ClusterInfo clusterInfo) {
+ if (StringUtils.isEmpty(clusterName)) {
+ return clusterInfo.getClusterAddrTable().keySet();
+ } else {
+ Set clusterNames = new TreeSet();
+ clusterNames.add(clusterName);
+ return clusterNames;
+ }
+ }
+
+}
diff --git a/src/main/java/com/example/demo/rocketMq/MyDeleteAccessConfigSubCommand.java b/src/main/java/com/example/demo/rocketMq/MyDeleteAccessConfigSubCommand.java
new file mode 100644
index 0000000..765cf79
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketMq/MyDeleteAccessConfigSubCommand.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example.demo.rocketMq;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.Set;
+
+public class MyDeleteAccessConfigSubCommand implements SubCommand {
+ public static final String COMMAND_NAME = "myDeleteAclConfig";
+
+ DefaultMQAdminExt defaultMQAdminExt;
+
+ MyDeleteAccessConfigSubCommand(DefaultMQAdminExt defaultMQAdminExt){
+ this.defaultMQAdminExt = defaultMQAdminExt;
+ }
+
+ @Override
+ public String commandName() {
+ return COMMAND_NAME;
+ }
+
+ @Override
+ public String commandAlias() {
+ return "deleteAccessConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Delete Acl Config Account in broker";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option opt = new Option("b", "brokerAddr", true, "delete acl config account from which broker");
+ optionGroup.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "delete acl config account from which cluster");
+ optionGroup.addOption(opt);
+
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ opt = new Option("a", "accessKey", true, "set accessKey in acl config file for deleting which account");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+
+ String accessKey = commandLine.getOptionValue('a').trim();
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+
+// defaultMQAdminExt.start();
+ defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
+
+ System.out.printf("delete plain access config account from %s success.%n", addr);
+ System.out.printf("account's accesskey is:%s", accessKey);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+// defaultMQAdminExt.start();
+
+ Set brokerAddrSet =
+ CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : brokerAddrSet) {
+ defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
+ System.out.printf("delete plain access config account from %s success.%n", addr);
+ }
+
+ System.out.printf("account's accesskey is:%s", accessKey);
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+// defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/src/main/java/com/example/demo/rocketMq/MyDeleteSubscriptionGroupCommand.java b/src/main/java/com/example/demo/rocketMq/MyDeleteSubscriptionGroupCommand.java
new file mode 100644
index 0000000..fd30ff1
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketMq/MyDeleteSubscriptionGroupCommand.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example.demo.rocketMq;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.Set;
+
+public class MyDeleteSubscriptionGroupCommand implements SubCommand {
+ public static final String COMMAND_NAME = "myDeleteSubGroup";
+
+ DefaultMQAdminExt defaultMQAdminExt;
+
+ MyDeleteSubscriptionGroupCommand(DefaultMQAdminExt defaultMQAdminExt){
+ this.defaultMQAdminExt = defaultMQAdminExt;
+ }
+
+
+ @Override
+ public String commandName() {
+ return COMMAND_NAME;
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Delete subscription group from broker.";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "delete subscription group from which broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "delete subscription group from which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "groupName", true, "subscription group name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("r", "removeOffset", true, "remove offset");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ // groupName
+ String groupName = commandLine.getOptionValue('g').trim();
+ boolean cleanOffset = false;
+ if (commandLine.hasOption('r')) {
+ try {
+ cleanOffset = Boolean.valueOf(commandLine.getOptionValue('r').trim());
+ } catch (Exception e) {
+ }
+ }
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+// adminExt.start();
+
+ defaultMQAdminExt.deleteSubscriptionGroup(addr, groupName, cleanOffset);
+ System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName,
+ addr);
+
+ return;
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+// defaultMQAdminExt.start();
+
+ Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String master : masterSet) {
+ defaultMQAdminExt.deleteSubscriptionGroup(master, groupName, cleanOffset);
+ System.out.printf(
+ "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
+ groupName, master, clusterName);
+ }
+
+ try {
+ defaultMQAdminExt.deleteTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + groupName, clusterName);
+ defaultMQAdminExt.deleteTopic(MixAll.DLQ_GROUP_TOPIC_PREFIX + groupName, clusterName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+// defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/src/main/java/com/example/demo/rocketMq/MyDeleteTopicSubCommand.java b/src/main/java/com/example/demo/rocketMq/MyDeleteTopicSubCommand.java
new file mode 100644
index 0000000..1654b47
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketMq/MyDeleteTopicSubCommand.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example.demo.rocketMq;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class MyDeleteTopicSubCommand implements SubCommand {
+
+ public static final String COMMAND_NAME = "myDeleteTopic";
+
+ DefaultMQAdminExt defaultMQAdminExt;
+
+ MyDeleteTopicSubCommand(DefaultMQAdminExt defaultMQAdminExt){
+ this.defaultMQAdminExt = defaultMQAdminExt;
+ }
+
+
+ public static void deleteTopic(final DefaultMQAdminExt adminExt,
+ final String clusterName,
+ final String topic
+ ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+
+ Set masterBrokerAddressSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+ adminExt.deleteTopicInBroker(masterBrokerAddressSet, topic);
+ System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
+
+ Set nameServerSet = null;
+ if (adminExt.getNamesrvAddr() != null) {
+ String[] ns = adminExt.getNamesrvAddr().trim().split(";");
+ nameServerSet = new HashSet(Arrays.asList(ns));
+ }
+
+ adminExt.deleteTopicInNameServer(nameServerSet, clusterName, topic);
+ System.out.printf("delete topic [%s] from NameServer success.%n", topic);
+ }
+
+ @Override
+ public String commandName() {
+ return COMMAND_NAME;
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Delete topic from broker and NameServer.";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("t", "topic", true, "topic name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "delete topic from which cluster");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+// DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
+// adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ String topic = commandLine.getOptionValue('t').trim();
+
+ if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+// adminExt.start();
+ deleteTopic(defaultMQAdminExt, clusterName, topic);
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+// adminExt.shutdown();
+ }
+ }
+}
diff --git a/src/main/java/com/example/demo/rocketMq/MyMQAdminStartup.java b/src/main/java/com/example/demo/rocketMq/MyMQAdminStartup.java
new file mode 100644
index 0000000..17c4062
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketMq/MyMQAdminStartup.java
@@ -0,0 +1,331 @@
+//package com.example.demo.rocketMq;/*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//import ch.qos.logback.classic.LoggerContext;
+//import ch.qos.logback.classic.joran.JoranConfigurator;
+//import org.apache.commons.cli.CommandLine;
+//import org.apache.commons.cli.Options;
+//import org.apache.commons.cli.PosixParser;
+//import org.apache.rocketmq.acl.common.AclClientRPCHook;
+//import org.apache.rocketmq.acl.common.AclUtils;
+//import org.apache.rocketmq.acl.common.SessionCredentials;
+//import org.apache.rocketmq.client.exception.MQClientException;
+//import org.apache.rocketmq.common.MQVersion;
+//import org.apache.rocketmq.common.MixAll;
+//import org.apache.rocketmq.remoting.RPCHook;
+//import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+//import org.apache.rocketmq.srvutil.ServerUtil;
+//import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+//import org.apache.rocketmq.tools.command.SubCommand;
+//import org.apache.rocketmq.tools.command.SubCommandException;
+//import org.apache.rocketmq.tools.command.acl.*;
+//import org.apache.rocketmq.tools.command.broker.*;
+//import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
+//import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand;
+//import org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand;
+//import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand;
+//import org.apache.rocketmq.tools.command.consumer.*;
+//import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
+//import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
+//import org.apache.rocketmq.tools.command.controller.*;
+//import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
+//import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
+//import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
+//import org.apache.rocketmq.tools.command.ha.GetSyncStateSetSubCommand;
+//import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
+//import org.apache.rocketmq.tools.command.message.*;
+//import org.apache.rocketmq.tools.command.namesrv.*;
+//import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
+//import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
+//import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
+//import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
+//import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
+//import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
+//import org.apache.rocketmq.tools.command.topic.*;
+//import org.slf4j.LoggerFactory;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.PostConstruct;
+//import java.nio.file.Files;
+//import java.nio.file.Paths;
+//import java.util.HashSet;
+//import java.util.Set;
+//
+//@Component
+//public class MyMQAdminStartup {
+// protected static final Set SUB_COMMANDS = new HashSet<>();
+//
+// private static final String ROCKETMQ_HOME = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
+// System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+//
+//// @Value("${rocketmq.name-server}")
+// public String namesrvAddr = System.getProperty("nameServer");
+//// @Value("${rocketmq.producer.access-key:rocketmq2}")
+// private String accessKey = System.getProperty("ak");
+//// @Value("${rocketmq.producer.secret-key:12345678}")
+// private String secretKey = System.getProperty("sk");
+//
+// RPCHook rpcHook;
+//
+// DefaultMQAdminExt defaultMQAdminExt;
+//
+//
+// public void runACommand(String[] args) {
+// try {
+// main0(args, rpcHook);
+// } catch (SubCommandException e) {
+// throw new RuntimeException("mqAdmin command execute failed");
+// }
+// }
+//
+// public void main0(String[] args, RPCHook rpcHook) throws SubCommandException {
+// try {
+// switch (args.length) {
+// case 0:
+// printHelp();
+// break;
+// case 2:
+// if (args[0].equals("help")) {
+// SubCommand cmd = findSubCommand(args[1]);
+// if (cmd != null) {
+// Options options = ServerUtil.buildCommandlineOptions(new Options());
+// options = cmd.buildCommandlineOptions(options);
+// if (options != null) {
+// ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
+// }
+// } else {
+// System.out.printf("The sub command %s not exist.%n", args[1]);
+// }
+// break;
+// }
+// case 1:
+// default:
+// SubCommand cmd = findSubCommand(args[0]);
+// if (cmd != null) {
+// String[] subargs = parseSubArgs(args);
+//
+// Options options = ServerUtil.buildCommandlineOptions(new Options());
+// final CommandLine commandLine =
+// ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
+// new PosixParser());
+// if (null == commandLine) {
+// return;
+// }
+//
+// if (commandLine.hasOption('n')) {
+// String namesrvAddr = commandLine.getOptionValue('n');
+// System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
+// }
+// if (rpcHook != null) {
+// cmd.execute(commandLine, options, rpcHook);
+// } else {
+// cmd.execute(commandLine, options, AclUtils.getAclRPCHook(ROCKETMQ_HOME + MixAll.ACL_CONF_TOOLS_FILE));
+// }
+// } else {
+// System.out.printf("The sub command %s not exist.%n", args[0]);
+// }
+// break;
+// }
+// } catch (Exception e) {
+// e.printStackTrace();
+// throw e;
+// }
+// }
+//
+// @PostConstruct
+// public void init() {
+// System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+// initRPCHook();
+// initMQAdminExt();
+// initCommand();
+//// try {
+//// initLogback();
+//// } catch (Exception e) {
+//// e.printStackTrace();
+//// }
+// }
+//
+// private void initMQAdminExt() {
+// System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
+// defaultMQAdminExt = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
+// try {
+// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+// defaultMQAdminExt.start();
+// } catch (MQClientException e) {
+// e.printStackTrace();
+// }
+// }
+//
+// private void initRPCHook() {
+// if(accessKey!=null && accessKey!="" && secretKey!=null && secretKey!=""){
+// rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+// }else{
+// rpcHook = AclUtils.getAclRPCHook(ROCKETMQ_HOME + MixAll.ACL_CONF_TOOLS_FILE);
+// }
+// }
+//
+// public void initCommand() {
+// //myCommands
+// initCommand(new MyClusterListSubCommand(defaultMQAdminExt));
+// initCommand(new MyUpdateAccessConfigSubCommand(defaultMQAdminExt));
+// initCommand(new MyDeleteAccessConfigSubCommand(defaultMQAdminExt));
+// initCommand(new MyDeleteSubscriptionGroupCommand(defaultMQAdminExt));
+// initCommand(new MyUpdateTopicSubCommand(defaultMQAdminExt));
+// initCommand(new MyDeleteTopicSubCommand(defaultMQAdminExt));
+//
+// initCommand(new UpdateTopicSubCommand());
+// initCommand(new DeleteTopicSubCommand());
+// initCommand(new UpdateSubGroupSubCommand());
+// initCommand(new SetConsumeModeSubCommand());
+// initCommand(new DeleteSubscriptionGroupCommand());
+// initCommand(new UpdateBrokerConfigSubCommand());
+// initCommand(new UpdateTopicPermSubCommand());
+//
+// initCommand(new TopicRouteSubCommand());
+// initCommand(new TopicStatusSubCommand());
+// initCommand(new TopicClusterSubCommand());
+//
+// initCommand(new AddBrokerSubCommand());
+// initCommand(new RemoveBrokerSubCommand());
+// initCommand(new ResetMasterFlushOffsetSubCommand());
+// initCommand(new BrokerStatusSubCommand());
+// initCommand(new QueryMsgByIdSubCommand());
+// initCommand(new QueryMsgByKeySubCommand());
+// initCommand(new QueryMsgByUniqueKeySubCommand());
+// initCommand(new QueryMsgByOffsetSubCommand());
+// initCommand(new QueryMsgTraceByIdSubCommand());
+//
+// initCommand(new PrintMessageSubCommand());
+// initCommand(new PrintMessageByQueueCommand());
+// initCommand(new SendMsgStatusCommand());
+// initCommand(new BrokerConsumeStatsSubCommad());
+//
+// initCommand(new ProducerConnectionSubCommand());
+// initCommand(new ConsumerConnectionSubCommand());
+// initCommand(new ConsumerProgressSubCommand());
+// initCommand(new ConsumerStatusSubCommand());
+// initCommand(new CloneGroupOffsetCommand());
+// //for producer
+// initCommand(new ProducerSubCommand());
+//
+// initCommand(new ClusterListSubCommand());
+// initCommand(new TopicListSubCommand());
+//
+// initCommand(new UpdateKvConfigCommand());
+// initCommand(new DeleteKvConfigCommand());
+//
+// initCommand(new WipeWritePermSubCommand());
+// initCommand(new AddWritePermSubCommand());
+// initCommand(new ResetOffsetByTimeCommand());
+// initCommand(new SkipAccumulationSubCommand());
+//
+// initCommand(new UpdateOrderConfCommand());
+// initCommand(new CleanExpiredCQSubCommand());
+// initCommand(new DeleteExpiredCommitLogSubCommand());
+// initCommand(new CleanUnusedTopicCommand());
+//
+// initCommand(new StartMonitoringSubCommand());
+// initCommand(new StatsAllSubCommand());
+//
+// initCommand(new AllocateMQSubCommand());
+//
+// initCommand(new CheckMsgSendRTCommand());
+// initCommand(new CLusterSendMsgRTCommand());
+//
+// initCommand(new GetNamesrvConfigCommand());
+// initCommand(new UpdateNamesrvConfigCommand());
+// initCommand(new GetBrokerConfigCommand());
+// initCommand(new GetConsumerConfigSubCommand());
+//
+// initCommand(new QueryConsumeQueueCommand());
+// initCommand(new SendMessageCommand());
+// initCommand(new ConsumeMessageCommand());
+//
+// //for acl command
+// initCommand(new UpdateAccessConfigSubCommand());
+// initCommand(new DeleteAccessConfigSubCommand());
+// initCommand(new ClusterAclConfigVersionListSubCommand());
+// initCommand(new UpdateGlobalWhiteAddrSubCommand());
+// initCommand(new GetAccessConfigSubCommand());
+//
+// initCommand(new UpdateStaticTopicSubCommand());
+// initCommand(new RemappingStaticTopicSubCommand());
+//
+// initCommand(new ExportMetadataCommand());
+// initCommand(new ExportConfigsCommand());
+// initCommand(new ExportMetricsCommand());
+//
+// initCommand(new HAStatusSubCommand());
+//
+// initCommand(new GetSyncStateSetSubCommand());
+// initCommand(new GetBrokerEpochSubCommand());
+// initCommand(new GetControllerMetaDataSubCommand());
+//
+// initCommand(new GetControllerConfigSubCommand());
+// initCommand(new UpdateControllerConfigSubCommand());
+// initCommand(new ReElectMasterSubCommand());
+// initCommand(new CleanControllerBrokerDataSubCommand());
+// }
+//
+// private static void initLogback() throws Exception {
+// LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+// JoranConfigurator configurator = new JoranConfigurator();
+// configurator.setContext(lc);
+// lc.reset();
+//
+// //avoid the exception
+// if (ROCKETMQ_HOME != null
+// && Files.exists(Paths.get(ROCKETMQ_HOME + "/conf/logback_tools.xml"))) {
+// configurator.doConfigure(ROCKETMQ_HOME + "/conf/logback_tools.xml");
+// }
+// }
+//
+// private static void printHelp() {
+// System.out.printf("The most commonly used mqadmin commands are:%n");
+//
+// for (SubCommand cmd : SUB_COMMANDS) {
+// System.out.printf(" %-25s %s%n", cmd.commandName(), cmd.commandDesc());
+// }
+//
+// System.out.printf("%nSee 'mqadmin help ' for more information on a specific command.%n");
+// }
+//
+// private static SubCommand findSubCommand(final String name) {
+// for (SubCommand cmd : SUB_COMMANDS) {
+// if (cmd.commandName().equalsIgnoreCase(name) || cmd.commandAlias() != null && cmd.commandAlias().equalsIgnoreCase(name)) {
+// return cmd;
+// }
+// }
+//
+// return null;
+// }
+//
+// private static String[] parseSubArgs(String[] args) {
+// if (args.length > 1) {
+// String[] result = new String[args.length - 1];
+// for (int i = 0; i < args.length - 1; i++) {
+// result[i] = args[i + 1];
+// }
+// return result;
+// }
+// return null;
+// }
+//
+// public static void initCommand(SubCommand command) {
+// SUB_COMMANDS.add(command);
+// }
+//}
diff --git a/src/main/java/com/example/demo/rocketMq/MyUpdateAccessConfigSubCommand.java b/src/main/java/com/example/demo/rocketMq/MyUpdateAccessConfigSubCommand.java
new file mode 100644
index 0000000..c88bfff
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketMq/MyUpdateAccessConfigSubCommand.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example.demo.rocketMq;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class MyUpdateAccessConfigSubCommand implements SubCommand {
+
+ public static final String COMMAND_NAME = "myUpdateAclConfig";
+
+ DefaultMQAdminExt defaultMQAdminExt;
+
+ MyUpdateAccessConfigSubCommand(DefaultMQAdminExt defaultMQAdminExt){
+ this.defaultMQAdminExt = defaultMQAdminExt;
+ }
+
+ @Override
+ public String commandName() {
+ return COMMAND_NAME;
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Update acl config yaml file in broker";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option opt = new Option("b", "brokerAddr", true, "update acl config file to which broker");
+ optionGroup.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "update acl config file to which cluster");
+ optionGroup.addOption(opt);
+
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ opt = new Option("a", "accessKey", true, "set accessKey in acl config file");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("s", "secretKey", true, "set secretKey in acl config file");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("i", "defaultTopicPerm", true, "set default topicPerm in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("u", "defaultGroupPerm", true, "set default GroupPerm in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topicPerms", true, "set topicPerms list,eg: topicA=DENY,topicD=SUB");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "groupPerms", true, "set groupPerms list,eg: groupD=DENY,groupD=SUB");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "admin", true, "set admin flag in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ PlainAccessConfig accessConfig = new PlainAccessConfig();
+ accessConfig.setAccessKey(commandLine.getOptionValue('a').trim());
+ // Secretkey
+ if (commandLine.hasOption('s')) {
+ accessConfig.setSecretKey(commandLine.getOptionValue('s').trim());
+ }
+
+ // Admin
+ if (commandLine.hasOption('m')) {
+ accessConfig.setAdmin(Boolean.parseBoolean(commandLine.getOptionValue('m').trim()));
+ }
+
+ // DefaultTopicPerm
+ if (commandLine.hasOption('i')) {
+ accessConfig.setDefaultTopicPerm(commandLine.getOptionValue('i').trim());
+ }
+
+ // DefaultGroupPerm
+ if (commandLine.hasOption('u')) {
+ accessConfig.setDefaultGroupPerm(commandLine.getOptionValue('u').trim());
+ }
+
+ // WhiteRemoteAddress
+ if (commandLine.hasOption('w')) {
+ accessConfig.setWhiteRemoteAddress(commandLine.getOptionValue('w').trim());
+ }
+
+ // TopicPerms list value
+ if (commandLine.hasOption('t')) {
+ String[] topicPerms = commandLine.getOptionValue('t').trim().split(",");
+ List topicPermList = new ArrayList();
+ if (topicPerms != null) {
+ for (String topicPerm : topicPerms) {
+ topicPermList.add(topicPerm);
+ }
+ }
+ accessConfig.setTopicPerms(topicPermList);
+ }
+
+ // GroupPerms list value
+ if (commandLine.hasOption('g')) {
+ String[] groupPerms = commandLine.getOptionValue('g').trim().split(",");
+ List groupPermList = new ArrayList();
+ if (groupPerms != null) {
+ for (String groupPerm : groupPerms) {
+ groupPermList.add(groupPerm);
+ }
+ }
+ accessConfig.setGroupPerms(groupPermList);
+ }
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+
+// defaultMQAdminExt.start();
+ defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
+
+ System.out.printf("create or update plain access config to %s success.%n", addr);
+ System.out.printf("%s", accessConfig);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+// defaultMQAdminExt.start();
+ Set brokerAddrSet =
+ CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : brokerAddrSet) {
+ defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
+ System.out.printf("create or update plain access config to %s success.%n", addr);
+ }
+
+ System.out.printf("%s", accessConfig);
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+// defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/src/main/java/com/example/demo/rocketMq/MyUpdateTopicSubCommand.java b/src/main/java/com/example/demo/rocketMq/MyUpdateTopicSubCommand.java
new file mode 100644
index 0000000..d002c49
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketMq/MyUpdateTopicSubCommand.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example.demo.rocketMq;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.AttributeParser;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.Map;
+import java.util.Set;
+
+public class MyUpdateTopicSubCommand implements SubCommand {
+
+ public static final String COMMAND_NAME = "myUpdateTopic";
+
+ DefaultMQAdminExt defaultMQAdminExt;
+
+ MyUpdateTopicSubCommand(DefaultMQAdminExt defaultMQAdminExt){
+ this.defaultMQAdminExt = defaultMQAdminExt;
+ }
+
+
+
+ @Override
+ public String commandName() {
+ return COMMAND_NAME;
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Update or create topic";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
+ optionGroup.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "create topic to which cluster");
+ optionGroup.addOption(opt);
+
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ opt = new Option("a", "attributes", true, "attribute(+a=b,+c=d,-e)");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "topic name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("r", "readQueueNums", true, "set read queue nums");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("w", "writeQueueNums", true, "set write queue nums");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("o", "order", true, "set topic's order(true|false)");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("u", "unit", true, "is unit topic (true|false)");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(final CommandLine commandLine, final Options options,
+ RPCHook rpcHook) throws SubCommandException {
+// DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+// defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setWriteQueueNums(8);
+ topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
+
+ if (commandLine.hasOption('a')) {
+ String attributesModification = commandLine.getOptionValue('a').trim();
+ Map attributes = AttributeParser.parseToMap(attributesModification);
+ topicConfig.setAttributes(attributes);
+ }
+
+ // readQueueNums
+ if (commandLine.hasOption('r')) {
+ topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
+ }
+
+ // writeQueueNums
+ if (commandLine.hasOption('w')) {
+ topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
+ }
+
+ // perm
+ if (commandLine.hasOption('p')) {
+ topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
+ }
+
+ boolean isUnit = false;
+ if (commandLine.hasOption('u')) {
+ isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
+ }
+
+ boolean isCenterSync = false;
+ if (commandLine.hasOption('s')) {
+ isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
+ }
+
+ int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
+ topicConfig.setTopicSysFlag(topicCenterSync);
+
+ boolean isOrder = false;
+ if (commandLine.hasOption('o')) {
+ isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
+ }
+ topicConfig.setOrder(isOrder);
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+
+// defaultMQAdminExt.start();
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+
+ if (isOrder) {
+ String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
+ String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
+ defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
+ System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
+ isOrder, orderConf.toString()));
+ }
+ System.out.printf("create topic to %s success.%n", addr);
+ System.out.printf("%s", topicConfig);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+// defaultMQAdminExt.start();
+
+ Set masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ System.out.printf("create topic to %s success.%n", addr);
+ }
+
+ if (isOrder) {
+ Set brokerNameSet =
+ CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
+ StringBuilder orderConf = new StringBuilder();
+ String splitor = "";
+ for (String s : brokerNameSet) {
+ orderConf.append(splitor).append(s).append(":")
+ .append(topicConfig.getWriteQueueNums());
+ splitor = ";";
+ }
+ defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
+ orderConf.toString(), true);
+ System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
+ }
+
+ System.out.printf("%s", topicConfig);
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+// defaultMQAdminExt.shutdown();
+ }
+ }
+
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
new file mode 100644
index 0000000..54375d4
--- /dev/null
+++ b/src/main/resources/application.properties
@@ -0,0 +1,8 @@
+spring.application.name=demo
+spring.thymeleaf.prefix=classpath:/templates/
+spring.thymeleaf.suffix=.html
+
+spring.data.mongodb.uri=mongodb://admin:123@192.168.9.137:27017/admin
+
+
+
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..a7b235c
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,34 @@
+rocketmq:
+ name-server: 192.168.74.248:28081
+ producer:
+ group: testGroup_dev
+ send-message-timeout: 300000
+ access-key: rocketmq2
+ secret-key: 12345678
+ consumer:
+ access-key: rocketmq2
+ secret-key: 12345678
+
+
+spring:
+ application:
+ name: yuan-kafka-boot
+ # 连接kafka集群
+ kafka:
+ jaas:
+ loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
+ controlFlag: required
+ options: username="kafkaAdmin" password="admin"
+ enabled: true
+ # 多个主机用逗号隔开
+ bootstrap-servers: localhost:9092
+ # 生产者
+ producer:
+ # key与value的序列化
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ security:
+ protocol: SASL_PLAINTEXT
+
+
+
diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml
new file mode 100644
index 0000000..66f06ed
--- /dev/null
+++ b/src/main/resources/logback-spring.xml
@@ -0,0 +1,34 @@
+
+
+
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+
+
+
+
+
+
+ ${LOG_HOME}/TestWeb.log.%d{yyyy-MM-dd}.log
+
+ 30
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+
+
+
+ 10MB
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/resources/templates/message.html b/src/main/resources/templates/message.html
new file mode 100644
index 0000000..771221d
--- /dev/null
+++ b/src/main/resources/templates/message.html
@@ -0,0 +1,68 @@
+
+
+
+
+
+ Consume Messages
+
+
+
+Consume Messages
+
+
+
+
+
+
+
diff --git a/src/main/resources/templates/message0.html b/src/main/resources/templates/message0.html
new file mode 100644
index 0000000..863b652
--- /dev/null
+++ b/src/main/resources/templates/message0.html
@@ -0,0 +1,112 @@
+
+
+
+
+ Consume Messages
+
+
+
+
+
+Consume Messages
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/test/java/com/example/demo/DemoApplicationTests.java b/src/test/java/com/example/demo/DemoApplicationTests.java
new file mode 100644
index 0000000..eaa9969
--- /dev/null
+++ b/src/test/java/com/example/demo/DemoApplicationTests.java
@@ -0,0 +1,13 @@
+package com.example.demo;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class DemoApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}