package datasource import ( "WorldEpcho/src/config" "context" "errors" "fmt" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "net/url" //"gopkg.in/mgo.v2/bson" "go.mongodb.org/mongo-driver/bson" "log" "time" ) // ChatRecord 表示聊天记录的结构体 type ChatRecord struct { //ID bson.ObjectId `bson:"_id,omitempty"` SenderId int64 `bson:"senderId"` Timestamp int64 `bson:"timestamp"` Message string `bson:"message"` Status map[string]interface{} `bson:"status"` // 新增字段,表示消息发送者的聊天表情状态 SenderType string `bson:"senderType"` // 新增字段,表示消息发送方类型 } // 聊天记录数组 type ChatRecordArray struct { ConversationId string `bson:"conversationId"` Timestamp int64 `bson:"timestamp"` Records []ChatRecord `bson:"records"` } // 所有历史聊天记录 type AllHistoryChatRecordsArray struct { ConversationId string `bson:"conversationId"` StartTimestamp int64 `bson:"startTimestamp"` EndTimestamp int64 `bson:"endTimestamp"` Records []ChatRecord `bson:"records"` } //添加全局变量 const ( DatabaseName = "chat_db" CollectionName = "chat_records" TimeoutDuration = 100 * time.Second ) var ( ErrNotFound = errors.New("not found") MongoDBClient *mongo.Client ) // 连接到 MongoDB /* mongoDB 包含密码认证*/ // setupMongoDB 创建并返回一个MongoDB客户端以及用于管理连接的上下文和取消函数 // 用户名和密码现在作为参数传递,以便更灵活地配置连接 func setupMongoDB() (*mongo.Client, context.Context, context.CancelFunc) { username := url.QueryEscape(config.Conf.Mongo_UserName) password := url.QueryEscape(config.Conf.Mongo_PWD) host := config.Conf.Mongo_Host port := config.Conf.Mongo_Port uri := fmt.Sprintf("mongodb://%s:%s@%s:%s", username, password, host, port) //fmt.Println("uri ==> ", uri) clientOptions := options.Client().ApplyURI(uri) client, err := mongo.NewClient(clientOptions) if err != nil { fmt.Println("连接MongoDB出错!") log.Fatal(err) return nil, nil, nil } ctx, cancel := context.WithTimeout(context.Background(), TimeoutDuration) err = client.Connect(ctx) if err != nil { fmt.Println("连接MongoDB出错!") log.Fatal(err) return nil, nil, nil } err = client.Ping(ctx, nil) if err != nil { fmt.Println("无法连接到MongoDB!") log.Fatal(err) return nil, nil, nil } fmt.Println("连接MongoDB数据库成功!") return client, ctx, cancel } //保存单条聊天记录 func SaveChatRecord(record ChatRecord) error { client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) collection := client.Database(DatabaseName).Collection(CollectionName) _, err := collection.InsertOne(ctx, record) if err != nil { return err } return nil } // 保存聊天记录数组 func SaveChatRecordArray(recordArray ChatRecordArray) error { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) // 获取要操作的集合 collection := client.Database(DatabaseName).Collection(CollectionName) // 插入 ChatRecordArray _, err := collection.InsertOne(ctx, recordArray) if err != nil { return err } return nil } // 根据会话Id,查询聊天记录 func GetChatRecordByConversationID(conversationID string) (*ChatRecord, error) { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) // 获取要操作的集合 collection := client.Database(DatabaseName).Collection(CollectionName) // 查询会话记录 filter := bson.M{"conversationId": conversationID} var chatRecordArray ChatRecordArray err := collection.FindOne(ctx, filter).Decode(&chatRecordArray) if err == mongo.ErrNoDocuments { return nil, nil } else if err != nil { return nil, fmt.Errorf("查询记录失败: %v", err) } if len(chatRecordArray.Records) > 0 { return &chatRecordArray.Records[len(chatRecordArray.Records)-1], nil } else { return nil, nil } } //添加用户和数字人的聊天记录到会话表 func AddChatRecordToConversation(conversationID string, chatRecords []ChatRecord) error { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) // 获取要操作的集合 collection := client.Database(DatabaseName).Collection(CollectionName) // 创建要添加的记录 // 更新时间戳并添加新的聊天记录 update := bson.M{ //"$set": bson.M{"timestamp": time.Now().Unix()}, // 更新整个会话的时间戳 "$push": bson.M{"records": bson.M{"$each": chatRecords}}, // 添加新的聊天记录 } // 根据会话 Id 更新记录 _, err := collection.UpdateOne(ctx, bson.M{"conversationId": conversationID}, update) if err != nil { return err } return nil } //查询用户与数字人的历史聊天记录 func GetChatHistory(userID, digitalPersonID, appId string) (ChatRecordArray, error) { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) collection := client.Database(DatabaseName).Collection(CollectionName) filter := bson.M{"conversationId": userID + "_" + digitalPersonID + "_" + appId} var chatRecordArray ChatRecordArray log.Println("找到了匹配的记录") err := collection.FindOne(ctx, filter).Decode(&chatRecordArray) if err == mongo.ErrNoDocuments { return chatRecordArray, ErrNotFound } else if err != nil { log.Printf("Error in GetChatHistory: %v", err) return chatRecordArray, err } return chatRecordArray, nil } //获取最新的聊天记录 func GetChatHistoryLatest2(userID, digitalPersonID, appId string) (ChatRecordArray, error) { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) collection := client.Database(DatabaseName).Collection(CollectionName) // 设置查询过滤器 filter := bson.M{"conversationId": userID + "_" + digitalPersonID + "_" + appId} // 设置排序参数,按照时间戳降序排序 //options := options.FindOne().SetSort(bson.D{{"timestamp", -1}}) options := options.FindOne().SetSort(bson.D{{"timestamp", -1}}) //opts := bson.D{{"timestamp", -1}} var chatRecordLatest ChatRecordArray log.Println(config.ColorBlue, "正在查询最新的会话记录", config.ColorReset) err := collection.FindOne(ctx, filter, options).Decode(&chatRecordLatest) //err := collection.Find(ctx,filter).Sort(opts).One(&chatRecordArray) if err == mongo.ErrNoDocuments { log.Println(config.ColorYellow, "没有找到记录", config.ColorReset) return chatRecordLatest, ErrNotFound } else if err != nil { log.Printf(config.ColorRed, "在 GetChatHistory 中发生错误: %v", err, config.ColorReset) return chatRecordLatest, err } //fmt.Println(config.ColorYellow, "最近一次聊天记录", chatRecordLatest, config.ColorReset) return chatRecordLatest, nil } // GetAllHistoryChatRecords 获取特定用户和数字人的某应用的所有聊天记录,并按时间顺序合并到一个数组中 func GetAllHistoryChatRecords(userID, digitalPersonID, appId string) ([]ChatRecordArray, error) { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) collection := client.Database(DatabaseName).Collection(CollectionName) // 构建查询 filter := bson.M{"conversationId": bson.M{"$eq": userID + "_" + digitalPersonID + "_" + appId}} opts := options.Find().SetSort(bson.D{{"timestamp", 1}}) // 按时间戳升序排序 // 查询数据 cursor, err := collection.Find(ctx, filter, opts) if err != nil { if err == mongo.ErrNoDocuments { log.Println("没有找到聊天记录") return nil, err } log.Printf("在执行 MongoDB 查找操作时出错: %s\n", err) return nil, err } defer cursor.Close(ctx) // 解析数据 var ChatRecordArray []ChatRecordArray if err := cursor.All(ctx, &ChatRecordArray); err != nil { log.Printf("解析聊天记录出错: %s\n", err) return nil, err } return ChatRecordArray, nil } // GetChatHistoryLatest 获取最新的聊天记录,根据mergeAll参数获取最新一条或全部记录 func GetChatHistoryLatest(userID, digitalPersonID, appId string, mergeAll bool) (ChatRecordArray, error) { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) collection := client.Database(DatabaseName).Collection(CollectionName) // 设置查询过滤器 filter := bson.M{"conversationId": userID + "_" + digitalPersonID + "_" + appId} var chatRecordLatest ChatRecordArray if mergeAll { // 使用 GetAllHistoryChatRecords 函数获取所有记录 allRecords, err := GetAllHistoryChatRecords(userID, digitalPersonID, appId) if err != nil { log.Printf("获取全部聊天记录时出错: %v\n", err) return ChatRecordArray{}, err } // 合并所有记录到一个数组中 var mergedRecords ChatRecordArray mergedRecords.ConversationId = userID + "_" + digitalPersonID + "_" + appId for _, recordArray := range allRecords { mergedRecords.Records = append(mergedRecords.Records, recordArray.Records...) } if len(mergedRecords.Records) > 0 { mergedRecords.Timestamp = mergedRecords.Records[len(mergedRecords.Records)-1].Timestamp } return mergedRecords, nil } else { // 只获取最新的一条记录 result := collection.FindOne(ctx, filter, options.FindOne().SetSort(bson.D{{"timestamp", -1}})) if err := result.Decode(&chatRecordLatest); err != nil { if err == mongo.ErrNoDocuments { log.Println(config.ColorYellow, "没有找到记录", config.ColorReset) return chatRecordLatest, ErrNotFound } log.Printf(config.ColorRed, "在查询最新记录时发生错误: %v", err, config.ColorReset) return chatRecordLatest, err } // 必须设置Records为仅包含这一条最新记录 if len(chatRecordLatest.Records) > 0 { chatRecordLatest.Records = chatRecordLatest.Records[:1] } } log.Println(config.ColorBlue, "查询会话记录成功", config.ColorReset) return chatRecordLatest, nil } //获取最新的聊天记录,并从聊天记录数组中获取最后一条 func GetChatHistoryRecordLatest(userID, digitalPersonID, appId string) (ChatRecord, error) { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) collection := client.Database(DatabaseName).Collection(CollectionName) //设置查询过滤器 filter := bson.M{"conversationId": userID + "_" + digitalPersonID + "_" + appId} // 设置排序参数,按照时间戳降序排序 //options := options.FindOne().SetSort(bson.D{{"timestamp", -1}}) options := options.FindOne().SetSort(bson.D{{"timestamp", -1}}) //opts := bson.D{{"timestamp", -1}} var chatRecordArray ChatRecordArray log.Println(config.ColorBlue, "正在查询最新的会话记录", config.ColorReset) err := collection.FindOne(ctx, filter, options).Decode(&chatRecordArray) if err == mongo.ErrNoDocuments { log.Println(config.ColorRed, "没有找到记录", config.ColorReset) return ChatRecord{}, ErrNotFound } else if err != nil { log.Printf(config.ColorRed, "在 GetChatHistoryRecordLatest 中发生错误: %v", err, config.ColorReset) return ChatRecord{}, err } //fmt.Println(config.ColorYellow, "最近一次聊天记录", chatRecordArray, config.ColorReset) //最后一次聊天记录 var latestRecord ChatRecord var foundBotRecord bool // New variable to track if we have found a bot record if len(chatRecordArray.Records) > 0 { for _, record := range chatRecordArray.Records { if record.SenderType == "bot" && (!foundBotRecord || record.Timestamp > latestRecord.Timestamp) { latestRecord = record foundBotRecord = true // We have found a record with senderType "bot" } } } if !foundBotRecord { log.Println(config.ColorRed, "没有找到数字人的聊天记录", config.ColorReset) return ChatRecord{}, ErrNotFound } fmt.Println(config.ColorYellow, "最新的数字人聊天记录", latestRecord, config.ColorReset) fmt.Println(config.ColorGreen, "最新的数字人聊天记录", latestRecord.Status, config.ColorReset) return latestRecord, nil } //更新最近的聊天记录 func UpdateChatLatest(conversationID string, chatRecords []ChatRecord) error { // 连接 MongoDB client, ctx, cancel := setupMongoDB() defer cancel() defer client.Disconnect(ctx) collection := client.Database(DatabaseName).Collection(CollectionName) // 设置查询过滤器 filter := bson.M{"conversationId": conversationID} // 设置排序参数,按照时间戳降序排序 options := options.FindOneAndUpdate().SetSort(bson.D{{"timestamp", -1}}) // 更新操作 /* update := bson.M{ "$push": bson.M{"records": record}, } */ update := bson.M{ "$push": bson.M{"records": bson.M{"$each": chatRecords}}, } // 执行更新操作 result := collection.FindOneAndUpdate(ctx, filter, update, options) if result.Err() != nil { log.Printf(config.ColorRed, "在 UpdateChatLatest 中发生错误: %v", result.Err(), config.ColorReset) return result.Err() } log.Println(config.ColorGreen, "成功更新最新的会话记录", config.ColorReset) return nil } //保存聊天会话到MongoDB数据库 func SaveChatRecordMongoDB(conversationId string, senderId int64, message string, status map[string]interface{}, senderType string) error { chatRecord, err := GetChatRecordByConversationID(conversationId) if err != nil { log.Println(config.ColorRed, "保存聊天会话到MongoDB数据库,err: ", err, config.ColorReset) return err } //timeString := utils.GetNormalTimeString(time.Now()) // 如果数据库中,聊天记录表没有这个聊天记录则插入一条 if chatRecord == nil { chatRecordArray := ChatRecordArray{ ConversationId: conversationId, Timestamp: time.Now().Unix(), Records: []ChatRecord{ {SenderId: senderId, Timestamp: time.Now().Unix(), Message: message, Status: status, SenderType: senderType}, }, } err = SaveChatRecordArray(chatRecordArray) if err != nil { log.Println(config.ColorRed, "MongoDB数据库插入一条数据,err: ", err, config.ColorReset) return err } fmt.Println(config.ColorGreen, "ChatRecordArray first saved successfully", config.ColorReset) } else { // 如果数据库中已经存在聊天记录,则更新记录 /* chatRecords := []ChatRecord{ {SenderId: senderId, Timestamp: time.Now().Unix(), Message: message, Status: status, SenderType: senderType}, } */ chatRecords := []ChatRecord{ { SenderId: senderId, Timestamp: time.Now().Unix(), Message: message, Status: status, SenderType: senderType, }, } //err := AddChatRecordToConversation(conversationId, chatRecords) err := UpdateChatLatest(conversationId, chatRecords) if err != nil { log.Println(config.ColorRed, "MongoDB数据库更新一条数据,err: ", err, config.ColorReset) return err } fmt.Println(config.ColorGreen, "ChatRecordArray saveAndUpdate successfully", config.ColorReset) } return nil }